Skip to content

Latest commit

 

History

History
161 lines (131 loc) · 5.11 KB

File metadata and controls

161 lines (131 loc) · 5.11 KB

Email [email protected] and request Kibana to be enabled on the Compose Elasticsearch cluster.

Use Developer Tools in Kibana to add a mapping:

PUT _template/pos-transactions
{
  "index_patterns": "pos-transactions*",
  "mappings": {
    "logs" : {
        "dynamic": "strict",
        "properties" : {
            "InvoiceDateString": {
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            },
            "Description":   { "type": "text" },
            "InvoiceNo":     { "type": "long" },
            "CustomerID":    { "type": "long" },
            "TransactionID": { "type": "long" },
            "Quantity":      { "type": "long" },
            "UnitPrice":     { "type": "double" },
            "InvoiceTime":   { "type": "text" },
            "StoreID":       { "type": "long" },
            "Country":       { "type": "text" },
            "InvoiceDate":   { "type": "long" },
            "StockCode":     { "type": "text" },
            "LineNo":        { "type": "long" }
            
        }
    }
  }
}

Build the scala class and package to jar and copy it to IAE!

sbt package
scp target/scala-2.11/spark-structured-streaming-on-iae-to-elasticsearch_2.11-1.0.jar clsadmin@yourcluster:./

Create a truststore on IAE for Elasticsearch certificate

keytool --importkeystore -noprompt \
        -srckeystore /etc/pki/java/cacerts \
        -srcstorepass changeit \
        -destkeystore my.jks \
        -deststorepass changeit
        
wget https://letsencrypt.org/certs/lets-encrypt-x3-cross-signed.der  

keytool -trustcacerts \
    -keystore my.jks \
    -storepass changeit \
    -noprompt -importcert \
    -alias letsencryptauthorityx3 \
    -file lets-encrypt-x3-cross-signed.der  

Create a jaas.conf for spark to authenticate to Message Hub (paste this script into your IAE SSH session and edit the file to chnage the CHANGEME values):

cat << EOF > jaas.conf
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    serviceName="kafka"
    username="CHANGEME"
    password="CHANGEME";
};
EOF

Create a script with some variables needed by spark (paste this script into your IAE SSH session and edit the file to chnage the CHANGEME values):

cat << EOF > config_es_kafka_vars.sh

# ELASTICSEARCH #

ES_USER=CHANGEME
ES_PASS=CHANGEME
ES_NODES=HOST1,HOST2, ... # CHANGEME
ES_PORT=CHANGEME

# KAFKA #
KAFKA_BOOTSTRAP_SERVERS=HOST1:PORT1,HOST2:PORT2,... # E.g. kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093

EOF

Create a script to start spark interactively via yarn (paste this script into your IAE SSH session):

cat << 'EOF' > start_yarn_client_elasticsearch.sh
source config_es_kafka_vars.sh

spark-submit --class main.Main \
       --master yarn \
       --deploy-mode client \
       --files jaas.conf,my.jks \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:6.2.3 \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --conf spark.kafka_bootstrap_servers=$KAFKA_BOOTSTRAP_SERVERS \
       --conf spark.es_user=$ES_USER \
       --conf spark.es_pass=$ES_PASS \
       --conf spark.es_nodes=$ES_NODES \
       --conf spark.es_port=$ES_PORT \
       --num-executors 1 \
       --executor-cores 1 \
       spark-structured-streaming-on-iae-to-elasticsearch_2.11-1.0.jar
EOF
chmod +x start_yarn_client_elasticsearch.sh

Run the interactive script, you should see spark saving to COS and output sent to the terminal:

bash -x start_yarn_client_elasticsearch.sh

Create a script to start spark in the background via yarn (paste this script into your IAE SSH session):

cat << 'EOF' > start_yarn_cluster_elasticsearch.sh
source config_es_kafka_vars.sh

spark-submit --class main.Main \
       --master yarn \
       --deploy-mode cluster \
       --files jaas.conf,my.jks \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:6.2.3 \
       --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
       --conf spark.kafka_bootstrap_servers=$KAFKA_BOOTSTRAP_SERVERS \
       --conf spark.es_user=$ES_USER \
       --conf spark.es_pass=$ES_PASS \
       --conf spark.es_nodes=$ES_NODES \
       --conf spark.es_port=$ES_PORT \
       --conf spark.yarn.submit.waitAppCompletion=false \
       --num-executors 1 \
       --executor-cores 1 \
       spark-structured-streaming-on-iae-to-elasticsearch_2.11-1.0.jar
EOF
chmod +x start_yarn_cluster_elasticsearch.sh

Run the script. It will terminate after submitting the job to yarn.

bash -x start_yarn_cluster_elasticsearch.sh

Verify that the spark job is running on yarn:

yarn application -list