diff --git a/.gitignore b/.gitignore index e67a40e..698830d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .DS_Store .idea/ __pycache__/ +.vscode/ +edge2ai.code-workspace \ No newline at end of file diff --git a/images/FLINK_running_jobs_lite.png b/images/FLINK_running_jobs_lite.png new file mode 100644 index 0000000..ba1664e Binary files /dev/null and b/images/FLINK_running_jobs_lite.png differ diff --git a/images/add_ConsumeMQTT_lite.png b/images/add_ConsumeMQTT_lite.png new file mode 100644 index 0000000..267b6a6 Binary files /dev/null and b/images/add_ConsumeMQTT_lite.png differ diff --git a/images/add_updateattribute_lite.png b/images/add_updateattribute_lite.png new file mode 100644 index 0000000..ea3957f Binary files /dev/null and b/images/add_updateattribute_lite.png differ diff --git a/images/additional_controller_services_lite.png b/images/additional_controller_services_lite.png new file mode 100644 index 0000000..522cf3c Binary files /dev/null and b/images/additional_controller_services_lite.png differ diff --git a/images/create_pgroup_lite.png b/images/create_pgroup_lite.png new file mode 100644 index 0000000..43e42a2 Binary files /dev/null and b/images/create_pgroup_lite.png differ diff --git a/images/flink_ssl_lite.png b/images/flink_ssl_lite.png new file mode 100644 index 0000000..fae1d0c Binary files /dev/null and b/images/flink_ssl_lite.png differ diff --git a/images/from_kafka_to_kudu_flow_lite.png b/images/from_kafka_to_kudu_flow_lite.png new file mode 100644 index 0000000..01a1e43 Binary files /dev/null and b/images/from_kafka_to_kudu_flow_lite.png differ diff --git a/images/hue_timestamp_fixed.png b/images/hue_timestamp_fixed.png new file mode 100644 index 0000000..0f374a8 Binary files /dev/null and b/images/hue_timestamp_fixed.png differ diff --git a/images/hue_timestamp_issue.png b/images/hue_timestamp_issue.png new file mode 100644 index 0000000..a79b58d Binary files /dev/null and b/images/hue_timestamp_issue.png differ diff --git a/images/kafka_success_lite.png b/images/kafka_success_lite.png new file mode 100644 index 0000000..89a3a21 Binary files /dev/null and b/images/kafka_success_lite.png differ diff --git a/images/kudu_success_lite.png b/images/kudu_success_lite.png new file mode 100644 index 0000000..d5bbfbd Binary files /dev/null and b/images/kudu_success_lite.png differ diff --git a/images/publishKafka_flow_lite.png b/images/publishKafka_flow_lite.png new file mode 100644 index 0000000..0c76ccf Binary files /dev/null and b/images/publishKafka_flow_lite.png differ diff --git a/images/ssb-sql-results-2.png b/images/ssb-sql-results-2.png new file mode 100644 index 0000000..d32f1d4 Binary files /dev/null and b/images/ssb-sql-results-2.png differ diff --git a/images/table_select_lite.png b/images/table_select_lite.png new file mode 100644 index 0000000..b8d5ca6 Binary files /dev/null and b/images/table_select_lite.png differ diff --git a/images/viz_add_expression.png b/images/viz_add_expression.png new file mode 100644 index 0000000..3b40c4e Binary files /dev/null and b/images/viz_add_expression.png differ diff --git a/images/viz_clone_field.png b/images/viz_clone_field.png new file mode 100644 index 0000000..043a6ed Binary files /dev/null and b/images/viz_clone_field.png differ diff --git a/images/viz_connection_explorer.png b/images/viz_connection_explorer.png new file mode 100644 index 0000000..e4dbb61 Binary files /dev/null and b/images/viz_connection_explorer.png differ diff --git a/images/viz_connection_explorer_table.png b/images/viz_connection_explorer_table.png new file mode 100644 index 0000000..8c6e92f Binary files /dev/null and b/images/viz_connection_explorer_table.png differ diff --git a/images/viz_create_dashboard.png b/images/viz_create_dashboard.png new file mode 100644 index 0000000..db971c1 Binary files /dev/null and b/images/viz_create_dashboard.png differ diff --git a/images/viz_dashboard_add_dimensions.png b/images/viz_dashboard_add_dimensions.png new file mode 100644 index 0000000..1ecd76f Binary files /dev/null and b/images/viz_dashboard_add_dimensions.png differ diff --git a/images/viz_dashboard_add_measures.png b/images/viz_dashboard_add_measures.png new file mode 100644 index 0000000..3726ec1 Binary files /dev/null and b/images/viz_dashboard_add_measures.png differ diff --git a/images/viz_dashboard_auto_refresh.png b/images/viz_dashboard_auto_refresh.png new file mode 100644 index 0000000..cdd986f Binary files /dev/null and b/images/viz_dashboard_auto_refresh.png differ diff --git a/images/viz_dashboard_view.png b/images/viz_dashboard_view.png new file mode 100644 index 0000000..5132395 Binary files /dev/null and b/images/viz_dashboard_view.png differ diff --git a/images/viz_new_connection_advanced.png b/images/viz_new_connection_advanced.png new file mode 100644 index 0000000..5eb7b0d Binary files /dev/null and b/images/viz_new_connection_advanced.png differ diff --git a/images/viz_new_connection_basic.png b/images/viz_new_connection_basic.png new file mode 100644 index 0000000..40b2fda Binary files /dev/null and b/images/viz_new_connection_basic.png differ diff --git a/images/viz_new_dashboard.png b/images/viz_new_dashboard.png new file mode 100644 index 0000000..1ba4251 Binary files /dev/null and b/images/viz_new_dashboard.png differ diff --git a/images/viz_new_dataset.png b/images/viz_new_dataset.png new file mode 100644 index 0000000..17aeb20 Binary files /dev/null and b/images/viz_new_dataset.png differ diff --git a/images/viz_save_dataset.png b/images/viz_save_dataset.png new file mode 100644 index 0000000..37e1d06 Binary files /dev/null and b/images/viz_save_dataset.png differ diff --git a/sensor_enhanced.avsc b/sensor_enhanced.avsc new file mode 100644 index 0000000..290abf4 --- /dev/null +++ b/sensor_enhanced.avsc @@ -0,0 +1,115 @@ +{ + "type": "record", + "name": "SensorReading", + "namespace": "com.cloudera.example", + "doc": "This is a sample sensor reading", + "fields": [ + { + "name": "sensor_id", + "doc": "Sensor identification number.", + "type": "int" + }, + { + "name": "sensor_ts", + "doc": "Timestamp of the collected readings.", + "type": "long" + }, + { + "name": "is_healthy", + "doc": "Flag indicating health (healthy == 1)", + "type": "int" + }, + { + "name": "response", + "doc": "response record", + "type": { + "type": "record", + "name": "CDSWResponse", + "namespace": "com.cloudera.example", + "doc": "This is a CDSW Model response", + "fields": [ + { + "name": "result", + "doc": "Result", + "type": "int" + } + ] + } + }, + { + "name": "sensor_0", + "doc": "Reading #0.", + "type": "int" + }, + { + "name": "sensor_1", + "doc": "Reading #1.", + "type": "int" + }, + { + "name": "sensor_2", + "doc": "Reading #2.", + "type": "int" + }, + { + "name": "sensor_3", + "doc": "Reading #3.", + "type": "int" + }, + { + "name": "sensor_4", + "doc": "Reading #4.", + "type": "int" + }, + { + "name": "sensor_5", + "doc": "Reading #5.", + "type": "int" + }, + { + "name": "sensor_6", + "doc": "Reading #6.", + "type": "int" + }, + { + "name": "sensor_7", + "doc": "Reading #7.", + "type": "int" + }, + { + "name": "sensor_8", + "doc": "Reading #8.", + "type": "int" + }, + { + "name": "sensor_9", + "doc": "Reading #9.", + "type": "int" + }, + { + "name": "sensor_10", + "doc": "Reading #10.", + "type": "int" + }, + { + "name": "sensor_11", + "doc": "Reading #11.", + "type": "int" + }, + { + "name": "city", + "doc": "closest city", + "type": "string" + }, + { + "name": "lat", + "doc": "geo location", + "type": "double" + }, + { + "name": "lon", + "doc": "geo location", + "type": "double" + } + ] +} diff --git a/setup/README.adoc b/setup/README.adoc index 968ef41..0cf00a4 100644 --- a/setup/README.adoc +++ b/setup/README.adoc @@ -180,7 +180,7 @@ docker pull asdaraujo/edge2ai-workshop:latest + [source,shell] ---- -# Install YUM, skip if you already have it +# Install Git, skip if you already have it sudo yum install -y git # Clone the repo diff --git a/setup/terraform/check-services.sh b/setup/terraform/check-services.sh index 37f2016..870e522 100755 --- a/setup/terraform/check-services.sh +++ b/setup/terraform/check-services.sh @@ -9,6 +9,20 @@ function cleanup() { rm -f .curl.*.$$ } +function get_model_status() { + local ip=$1 + CDSW_API="cdsw.$ip.nip.io/api/v1" + CDSW_ALTUS_API="cdsw.$ip.nip.io/api/altus-ds-1" + status="" + for scheme in http https; do + token=$("${CURL[@]}" -X POST --cookie-jar .curl.cj.$$ --cookie .curl.cj.$$ -H "Content-Type: application/json" --data '{"_local":false,"login":"admin","password":"'"${THE_PWD}"'"}' "$scheme://$CDSW_API/authenticate" 2>/dev/null | jq -r '.auth_token' 2> /dev/null) + [[ ! -n $token ]] && continue + status=$("${CURL[@]}" -X POST --cookie-jar .curl.cj.$$ --cookie .curl.cj.$$ -H "Content-Type: application/json" -H "Authorization: Bearer $token" --data '{"projectOwnerName":"admin","latestModelDeployment":true,"latestModelBuild":true}' "$scheme://$CDSW_ALTUS_API/models/list-models" 2>/dev/null | jq -r '.[].latestModelDeployment | select(.model.name == "IoT Prediction Model").status' 2>/dev/null) + [[ -n $status ]] && break + done + echo -n $status +} + if [ $# != 1 ]; then echo "Syntax: $0 " show_namespaces @@ -17,26 +31,27 @@ fi NAMESPACE=$1 load_env $NAMESPACE -printf "%-30s %-30s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %s\n" "instance" "ip address" "WEB" "CM" "CEM" "NIFI" "NREG" "SREG" "SMM" "HUE" "CDSW" "Model Status" +printf "%-30s %-20s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-14s %s\n" "instance" "ip address" "WEB" "CM" "CEM" "NIFI" "NREG" "SREG" "SMM" "HUE" "CDSW" "Model Status" "Viz Status" ensure_tf_json_file if [ -s $TF_JSON_FILE ]; then cat $TF_JSON_FILE | \ - jq -r '.values.root_module.resources[] | select(.type == "aws_instance" and .name == "web" or .type == "aws_instance" and .name == "cluster") | "\(.type).\(.name)[\(.index)] \(.values.public_ip) \(.values.public_dns)"' | \ - while read instance ip host; do + jq -r '.values.root_module.resources[] | select(.type == "aws_instance" and .name == "web" or .type == "aws_instance" and .name == "cluster") | "\(.type).\(.name)[\(.index)] \(.values.public_ip)"' | \ + while read instance ip; do + host="cdp.$ip.nip.io" CDSW_API="http://cdsw.$ip.nip.io/api/v1" CDSW_ALTUS_API="http://cdsw.$ip.nip.io/api/altus-ds-1" ("${CURL[@]}" http://$host/api/ping 2>/dev/null | grep 'Pong!' > /dev/null 2>&1 && echo Ok) > .curl.web.$$ & ("${CURL[@]}" http://$host:7180/cmf/login 2>/dev/null | grep "Cloudera Manager" > /dev/null 2>&1 && echo Ok) > .curl.cm.$$ & - (("${CURL[@]}" http://$host:10080/efm/ui/ || "${CURL[@]}" https://$host:10080/efm/ui/) 2>/dev/null | grep "CEM" > /dev/null 2>&1 && echo Ok) > .curl.cem.$$ & + (("${CURL[@]}" http://$host:10088/efm/ui/ || "${CURL[@]}" https://$host:10088/efm/ui/) 2>/dev/null | grep "CEM" > /dev/null 2>&1 && echo Ok) > .curl.cem.$$ & (("${CURL[@]}" http://$host:8080/nifi/ || "${CURL[@]}" https://$host:8443/nifi/) 2>/dev/null | grep "NiFi" > /dev/null 2>&1 && echo Ok) > .curl.nifi.$$ & (("${CURL[@]}" http://$host:18080/nifi-registry/ || "${CURL[@]}" https://$host:18433/nifi-registry/) 2>/dev/null | grep "NiFi Registry" > /dev/null 2>&1 && echo Ok) > .curl.nifireg.$$ & (("${CURL[@]}" http://$host:7788/ || "${CURL[@]}" https://$host:7790/) 2>/dev/null | egrep "Schema Registry|Error 401 Authentication required" > /dev/null 2>&1 && echo Ok) > .curl.schreg.$$ & (("${CURL[@]}" http://$host:9991/ || "${CURL[@]}" https://$host:9991/) 2>/dev/null | grep "STREAMS MESSAGING MANAGER" > /dev/null 2>&1 && echo Ok) > .curl.smm.$$ & (("${CURL[@]}" http://$host:8888/ ; "${CURL[@]}" https://$host:8889/) 2>/dev/null | grep "Hue" > /dev/null 2>&1 && echo Ok) > .curl.hue.$$ & (("${CURL[@]}" http://cdsw.$ip.nip.io/ || "${CURL[@]}" https://cdsw.$ip.nip.io/) 2>/dev/null | egrep "(Cloudera Machine Learning|Cloudera Data Science Workbench)" > /dev/null 2>&1 && echo Ok) > .curl.cml.$$ & - (token=$("${CURL[@]}" -X POST --cookie-jar .curl.cj.$$ --cookie .curl.cj.$$ -H "Content-Type: application/json" --data '{"_local":false,"login":"admin","password":"'"${THE_PWD}"'"}' "$CDSW_API/authenticate" 2>/dev/null | jq -r '.auth_token' 2> /dev/null) && \ - "${CURL[@]}" -X POST --cookie-jar .curl.cj.$$ --cookie .curl.cj.$$ -H "Content-Type: application/json" -H "Authorization: Bearer $token" --data '{"projectOwnerName":"admin","latestModelDeployment":true,"latestModelBuild":true}' "$CDSW_ALTUS_API/models/list-models" 2>/dev/null | jq -r '.[].latestModelDeployment | select(.model.name == "IoT Prediction Model").status' 2>/dev/null) > .curl.model.$$ & + (get_model_status $ip) > .curl.model.$$ & + ("${CURL[@]}" http://viz.cdsw.$ip.nip.io/arc/adminapi/users 2>/dev/null | grep 'user' > /dev/null 2>&1 && echo running) > .curl.viz.$$ & wait - printf "%-30s %-30s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %s\n" "$instance" "$ip" "$(cat .curl.web.$$)" "$(cat .curl.cm.$$)" "$(cat .curl.cem.$$)" "$(cat .curl.nifi.$$)" "$(cat .curl.nifireg.$$)" "$(cat .curl.schreg.$$)" "$(cat .curl.smm.$$)" "$(cat .curl.hue.$$)" "$(cat .curl.cml.$$)" "$(cat .curl.model.$$)" + printf "%-30s %-20s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-5s %-14s %s\n" "$instance" "$ip" "$(cat .curl.web.$$)" "$(cat .curl.cm.$$)" "$(cat .curl.cem.$$)" "$(cat .curl.nifi.$$)" "$(cat .curl.nifireg.$$)" "$(cat .curl.schreg.$$)" "$(cat .curl.smm.$$)" "$(cat .curl.hue.$$)" "$(cat .curl.cml.$$)" "$(cat .curl.model.$$)" "$(cat .curl.viz.$$)" done | sort -t\[ -k1,1r -k2,2n fi \ No newline at end of file diff --git a/setup/terraform/resources/cdsw_setup.py b/setup/terraform/resources/cdsw_setup.py index 9ed0239..ae245c5 100644 --- a/setup/terraform/resources/cdsw_setup.py +++ b/setup/terraform/resources/cdsw_setup.py @@ -22,172 +22,252 @@ print('# Prepare CDSW for workshop') r = None -try: - s = requests.Session() - - print('# Create user') - while True: - try: - r = s.post(CDSW_API + '/users', json={ - 'email': EMAIL, - 'name': FULL_NAME, - 'username': USERNAME, - 'password': PASSWORD, - 'type': 'user' - }, timeout=10) - if r.status_code == 201: - break - elif r.status_code == 422: - print('WARNING: User admin already exists') - break - except requests.exceptions.ConnectTimeout: - pass - except requests.exceptions.ConnectionError: - pass - if r: - print('Waiting for CDSW to be ready... (error code: %s)' % (r.status_code,)) - else: - print('Waiting for CDSW to be ready... (connection timed out)') - time.sleep(10) - - print('# Authenticate') - r = s.post(CDSW_API + '/authenticate', json={'login': USERNAME, 'password': PASSWORD}) - s.headers.update({'Authorization': 'Bearer ' + r.json()['auth_token']}) - - print('# Check if model is already running') - model_name = 'IoT Prediction Model' - r = s.post(CDSW_ALTUS_API + '/models/list-models', json={'projectOwnerName': 'admin', 'latestModelDeployment': True, 'latestModelBuild': True}) - models = [m for m in r.json() if m['name'] == model_name] - if models and models[0]['latestModelDeployment']['status'] == 'deployed': - print('Model is already deployed!! Skipping.') - exit(0) - - print('# Add engine') - r = s.post(CDSW_API + '/site/engine-profiles', json={'cpu': 1, 'memory': 4}) - engine_id = r.json()['id'] - print('Engine ID: %s'% (engine_id,)) - - print('# Add environment variable') - r = s.patch(CDSW_API + '/site/config', json={'environment': '{"HADOOP_CONF_DIR":"/etc/hadoop/conf/"}'}) - - print('# Add project') - project_name = 'Edge2AI Workshop' - project_id = None - r = s.get(CDSW_API + '/users/admin/projects') - for project in r.json(): - if project['name'] == project_name: - project_id = project['id'] + +s = requests.Session() + +print('# Create user') +while True: + try: + r = s.post(CDSW_API + '/users', json={ + 'email': EMAIL, + 'name': FULL_NAME, + 'username': USERNAME, + 'password': PASSWORD, + 'type': 'user' + }, timeout=10) + if r.status_code == 201: + break + elif r.status_code == 422: + print('WARNING: User admin already exists') break - if not project_id: - r = s.post(CDSW_API + '/users/admin/projects', json={'template': 'git', - 'project_visibility': 'private', - 'name': project_name, - 'gitUrl': 'https://github.com/cloudera-labs/edge2ai-workshop'}) - project_id = r.json()['id'] - print('Project ID: %s'% (project_id,)) - - print('# Upload setup script') - setup_script = """!pip3 install --upgrade pip scikit-learn + except requests.exceptions.ConnectTimeout: + pass + except requests.exceptions.ConnectionError: + pass + if r: + print('Waiting for CDSW to be ready... (error code: %s)' % (r.status_code,)) + else: + print('Waiting for CDSW to be ready... (connection timed out)') + time.sleep(10) + +print('# Authenticate') +r = s.post(CDSW_API + '/authenticate', json={'login': USERNAME, 'password': PASSWORD}) +s.headers.update({'Authorization': 'Bearer ' + r.json()['auth_token']}) + +print('# Check if model is already running') +model_name = 'IoT Prediction Model' +r = s.post(CDSW_ALTUS_API + '/models/list-models', json={'projectOwnerName': 'admin', 'latestModelDeployment': True, 'latestModelBuild': True}) +models = [m for m in r.json() if m['name'] == model_name] +if models and models[0]['latestModelDeployment']['status'] == 'deployed': + print('Model is already deployed!! Skipping.') + #exit(0) + +print('# Add engine') +r = s.post(CDSW_API + '/site/engine-profiles', json={'cpu': 1, 'memory': 4}) +engine_id = r.json()['id'] +print('Engine ID: %s'% (engine_id,)) + +print('# Add environment variable') +r = s.patch(CDSW_API + '/site/config', json={'environment': '{"HADOOP_CONF_DIR":"/etc/hadoop/conf/"}'}) + +print('# Add project') +project_name = 'Edge2AI Workshop' +project_id = None +r = s.get(CDSW_API + '/users/admin/projects') +for project in r.json(): + if project['name'] == project_name: + project_id = project['id'] + break +if not project_id: + r = s.post(CDSW_API + '/users/admin/projects', json={'template': 'git', + 'project_visibility': 'private', + 'name': project_name, + 'gitUrl': 'https://github.com/cloudera-labs/edge2ai-workshop'}) + project_id = r.json()['id'] +print('Project ID: %s'% (project_id,)) + +print('# Upload setup script') +setup_script = """!pip3 install --upgrade pip scikit-learn !HADOOP_USER_NAME=hdfs hdfs dfs -mkdir /user/$HADOOP_USER_NAME !HADOOP_USER_NAME=hdfs hdfs dfs -chown $HADOOP_USER_NAME:$HADOOP_USER_NAME /user/$HADOOP_USER_NAME !hdfs dfs -put data/historical_iot.txt /user/$HADOOP_USER_NAME !hdfs dfs -ls -R /user/$HADOOP_USER_NAME """ - r = s.put(CDSW_API + '/projects/admin/edge2ai-workshop/files/setup_workshop.py', files={'name': setup_script}) - - print('# Upload model') - model_pkl = open(MODEL_PKL_FILE, 'r') - r = s.put(CDSW_API + '/projects/admin/edge2ai-workshop/files/iot_model.pkl', files={'name': model_pkl}) - - print('# Create job to run the setup script') - r = s.post(CDSW_API + '/projects/admin/edge2ai-workshop/jobs', json={ - 'name': 'Setup workshop', - 'type': 'manual', - 'script': 'setup_workshop.py', - 'timezone': 'America/Los_Angeles', - 'environment':{}, - 'kernel': 'python3', - 'cpu': 1, - 'memory': 4, - 'nvidia_gpu': 0, - 'notifications': [{ - 'user_id': 1, - 'success': False, - 'failure': False, - 'timeout': False, - 'stopped': False - }], - 'recipients': {}, - 'attachments': [], - }) - job_id = r.json()['id'] - print('Job ID: %s' % (job_id,)) - - print('# Start job') - job_url = '%s/projects/admin/edge2ai-workshop/jobs/%s' % (CDSW_API, job_id) - start_url = '%s/start' % (job_url,) - - r = s.post(start_url, json={}) - - while True: - r = s.get(job_url) - status = r.json()['latest']['status'] - print('Job %s status: %s' % (job_id, status)) - if status == 'succeeded': +r = s.put(CDSW_API + '/projects/admin/edge2ai-workshop/files/setup_workshop.py', files={'name': setup_script}) + +print('# Upload model') +model_pkl = open(MODEL_PKL_FILE, 'r') +r = s.put(CDSW_API + '/projects/admin/edge2ai-workshop/files/iot_model.pkl', files={'name': model_pkl}) + +print('# Create job to run the setup script') +r = s.post(CDSW_API + '/projects/admin/edge2ai-workshop/jobs', json={ + 'name': 'Setup workshop', + 'type': 'manual', + 'script': 'setup_workshop.py', + 'timezone': 'America/Los_Angeles', + 'environment':{}, + 'kernel': 'python3', + 'cpu': 1, + 'memory': 4, + 'nvidia_gpu': 0, + 'notifications': [{ + 'user_id': 1, + 'success': False, + 'failure': False, + 'timeout': False, + 'stopped': False + }], + 'recipients': {}, + 'attachments': [], + }) +job_id = r.json()['id'] +print('Job ID: %s' % (job_id,)) + +print('# Start job') +job_url = '%s/projects/admin/edge2ai-workshop/jobs/%s' % (CDSW_API, job_id) +start_url = '%s/start' % (job_url,) + +r = s.post(start_url, json={}) + +while True: + r = s.get(job_url) + status = r.json()['latest']['status'] + print('Job %s status: %s' % (job_id, status)) + if status == 'succeeded': + break + elif status == 'failed': + raise RuntimeError('Job failed') + time.sleep(10) + +print('# Get engine image to use for model') +r = s.get(CDSW_API + '/projects/admin/edge2ai-workshop/engine-images') +engine_image_id = r.json()['id'] +print('Engine image ID: %s' % (engine_image_id,)) + +print('# Deploy model') +r = s.post(CDSW_ALTUS_API + '/models/create-model', json={ + 'projectId': project_id, + 'name': model_name, + 'description': model_name, + 'visibility': 'private', + 'targetFilePath': 'cdsw.iot_model.py', + 'targetFunctionName': 'predict', + 'engineImageId': engine_image_id, + 'kernel': 'python3', + 'examples': [{'request': {'feature': '0, 65, 0, 137, 21.95, 83, 19.42, 111, 9.4, 6, 3.43, 4'}}], + 'cpuMillicores': 1000, + 'memoryMb': 4096, + 'replicationPolicy': {'type': 'fixed', 'numReplicas': 1}, + 'environment': {}, + }) +model_id = r.json()['id'] +print('Model ID: %s' % (model_id,)) + +# ================================================================================ + +# See https://docs.cloudera.com/cdsw/latest/analytical-apps/topics/cdsw-application-limitations.html + +print('# Allow applications to be configured with unauthenticated access') +r = s.patch(CDSW_API + '/site/config', json={"allow_unauthenticated_access_to_app": True}) +print('Set unauthenticated access flag to: %s'% (r.json()["allow_unauthenticated_access_to_app"],)) + +print('# Add project for Data Visualization server') +project_name = 'viz' +viz_project_id = None +r = s.get(CDSW_API + '/users/admin/projects') +for project in r.json(): + if project['name'] == project_name: + viz_project_id = project['id'] + break +if not viz_project_id: + r = s.post(CDSW_API + '/users/admin/projects', json={'template': 'blank', + 'project_visibility': 'private', + 'name': project_name}) + viz_project_id = r.json()['id'] +print('Viz project ID: %s'% (viz_project_id,)) + +print('# Add custom engine for Data Visualization server') +params = { + "engineImage": { + "description": "dataviz-623", + "repository": "docker.repository.cloudera.com/cloudera/cdv/cdswdataviz", + "tag": "6.2.3-b18"} +} +r = s.post(CDSW_API + '/engine-images', json=params) +engine_image_id = r.json()['id'] +print('Engine Image ID: %s'% (engine_image_id,)) + +print('# Set new engine image as default for the viz project') +r = s.patch(CDSW_API + '/projects/admin/viz/engine-images', + json={'engineImageId': engine_image_id}) +r = s.get(CDSW_API + '/projects/admin/viz/engine-images') +project_engine_image_id = r.json()['id'] +print('Project image default engine Image ID set to: %s'% (project_engine_image_id)) + +print('# Create application with Data Visualization server') +application_name = 'viz' + +params = { + 'bypass_authentication': True, + 'cpu': 1, + 'environment': {}, + 'description': 'viz server app', + 'kernel': 'python3', + 'memory': 2, + 'name': 'viz-server', + 'nvidia_gpu': 0, + 'script': '/opt/vizapps/tools/arcviz/startup_app.py', + 'subdomain': 'viz', + 'type': 'manual' +} + +r = s.post(CDSW_API + '/projects/admin/viz/applications', json=params) +r = s.get(CDSW_API + '/projects/admin/viz/applications') +r.json() + + +# ================================================================================ + +while True: + r = s.post(CDSW_ALTUS_API + '/models/list-models', json={ + 'project': str(project_id), + 'latestModelDeployment': True, + 'latestModelBuild': True, + }) + models = [m for m in r.json() if m['id'] == str(model_id)] + if models: + build_status = models[0]['latestModelBuild']['status'] + build_id = models[0]['latestModelBuild']['id'] + deployment_status = models[0]['latestModelDeployment']['status'] + print('Model %s: build status: %s, deployment status: %s' % (model_id, build_status, deployment_status)) + if build_status == 'built' and deployment_status == 'deployed': break - elif status == 'failed': - raise RuntimeError('Job failed') - time.sleep(10) - - print('# Get engine image to use for model') - r = s.get(CDSW_API + '/projects/admin/edge2ai-workshop/engine-images') - engine_image_id = r.json()['id'] - print('Engine image ID: %s' % (engine_image_id,)) - - print('# Deploy model') - r = s.post(CDSW_ALTUS_API + '/models/create-model', json={ - 'projectId': project_id, - 'name': model_name, - 'description': model_name, - 'visibility': 'private', - 'targetFilePath': 'cdsw.iot_model.py', - 'targetFunctionName': 'predict', - 'engineImageId': engine_image_id, - 'kernel': 'python3', - 'examples': [{'request': {'feature': '0, 65, 0, 137, 21.95, 83, 19.42, 111, 9.4, 6, 3.43, 4'}}], - 'cpuMillicores': 1000, - 'memoryMb': 4096, - 'replicationPolicy': {'type': 'fixed', 'numReplicas': 1}, - 'environment': {}, - }) - model_id = r.json()['id'] - print('Model ID: %s' % (model_id,)) - - while True: - r = s.post(CDSW_ALTUS_API + '/models/list-models', json={ - 'project': str(project_id), - 'latestModelDeployment': True, - 'latestModelBuild': True, - }) - models = [m for m in r.json() if m['id'] == str(model_id)] - if models: - build_status = models[0]['latestModelBuild']['status'] - build_id = models[0]['latestModelBuild']['id'] - deployment_status = models[0]['latestModelDeployment']['status'] - print('Model %s: build status: %s, deployment status: %s' % (model_id, build_status, deployment_status)) - if build_status == 'built' and deployment_status == 'deployed': - break - elif build_status == 'built' and deployment_status == 'stopped': - # If the deployment stops for any reason, try to give it a little push - r = s.post(CDSW_ALTUS_API + '/models/deploy-model', json={ - 'modelBuildId': build_id, - 'cpuMillicores': 1000, - 'memoryMb': 4096, - }) - elif build_status == 'failed' or deployment_status == 'failed': - raise RuntimeError('Model deployment failed') - time.sleep(10) -except Exception as e: - if r: - print(r.text) - raise + elif build_status == 'built' and deployment_status == 'stopped': + # If the deployment stops for any reason, try to give it a little push + r = s.post(CDSW_ALTUS_API + '/models/deploy-model', json={ + 'modelBuildId': build_id, + 'cpuMillicores': 1000, + 'memoryMb': 4096, + }) + elif build_status == 'failed' or deployment_status == 'failed': + raise RuntimeError('Model deployment failed') + time.sleep(10) + +while True: + r = s.get(CDSW_API + '/projects/admin/viz/applications') + app_status = r.json()[0]['status'] + print('Data Visualization app status: %s' % (app_status)) + if app_status == 'running': + print('# Viz server app is running. CDSW setup complete!') + break + elif build_status == 'stopped': + # Additional error handling - if the app exists and is stopped, start it? + break + elif build_status == 'failed': + raise RuntimeError('Application deployment failed') + time.sleep(10) + +# Connect to Viz App API, validate status + + + diff --git a/setup/terraform/resources/common.sh b/setup/terraform/resources/common.sh index a4c0a29..d63e190 100644 --- a/setup/terraform/resources/common.sh +++ b/setup/terraform/resources/common.sh @@ -783,6 +783,7 @@ function get_service_urls() { fi if [[ ${HAS_CDSW:-0} == 1 ]]; then echo "CDSW=${protocol}://cdsw.{ip_address}.nip.io/" + echo "CDP Data Visualization=${protocol}://viz.cdsw.{ip_address}.nip.io/" fi ) | sort ) | tr "\n" "," | sed 's/,$//' diff --git a/setup/terraform/web/app/forms.py b/setup/terraform/web/app/forms.py index d06f45f..0dc7491 100644 --- a/setup/terraform/web/app/forms.py +++ b/setup/terraform/web/app/forms.py @@ -10,8 +10,8 @@ class LoginForm(FlaskForm): """Login form """ email = StringField('Email', validators=[DataRequired(), Email()]) - password = PasswordField('Password', validators=[DataRequired()]) - login_submit = SubmitField('Sign In') + password = PasswordField('Registration code / Password', validators=[DataRequired()]) + login_submit = SubmitField('Submit') class RegistrationForm(FlaskForm): """Registration form for new users diff --git a/setup/terraform/web/app/templates/login.html b/setup/terraform/web/app/templates/login.html index 136cdf9..aa4f2e0 100644 --- a/setup/terraform/web/app/templates/login.html +++ b/setup/terraform/web/app/templates/login.html @@ -2,10 +2,11 @@ {% import 'bootstrap/wtf.html' as wtf %} {% block app_content %} -<h1>Sign In</h1> +<h1>Register new user / Log in</h1> <div class="row"> <div class="col-md-4"> {{ wtf.quick_form(form, button_map={'login_submit': 'primary'}) }} </div> </div> + {% endblock %} \ No newline at end of file diff --git a/setup/terraform/web/app/templates/register.html b/setup/terraform/web/app/templates/register.html index 0d45b97..653396e 100644 --- a/setup/terraform/web/app/templates/register.html +++ b/setup/terraform/web/app/templates/register.html @@ -2,7 +2,7 @@ {% import 'bootstrap/wtf.html' as wtf %} {% block app_content %} - <h1>Register</h1> + <h1>Register new user</h1> <div class="row"> <div class="col-md-4"> {{ wtf.quick_form(form, button_map={'register_submit': 'primary'}) }} diff --git a/sql_stream_builder_lite.adoc b/sql_stream_builder_lite.adoc new file mode 100644 index 0000000..9dcfc91 --- /dev/null +++ b/sql_stream_builder_lite.adoc @@ -0,0 +1,262 @@ += Querying streams with SQL + +NOTE: This lab assumes that the link:streaming.adoc[From Edge to Streams Processing] lab has been completed. If you haven't done so, please ask your instructor to set your cluster state for you so that you can perform the steps in this lab (or you can do this yourself by SSH'ing to your cluster host and running the script `/tmp/resources/reset-to-lab.sh`) + +In this workshop you will use SQL Stream Builder to query and manipulate data streams using SQL language. SQL Stream Builder is a powerful service that enables you to create Flink jobs without having to write Java/Scala code. + +== Labs summary + +* *Lab 1* - Create a Data Source +* *Lab 2* - Create a Source Virtual Table +* *Lab 3* - Run a simple query +* *Lab 4* - Computing and storing agregation results + +== Introduction + +In this lab, and the subsequent ones, we will use the `iot` topic created and populated in previous labs and contains a datastream of computer performance data points. + +So let's start with a straightforward goal: to query the contents of the `iot` topic using SQL to examine the data that is being streamed. + +Albeit simple, this task will show the ease of use and power of SQL Stream Builder (SSB). + +[[lab_1, Lab 1]] +== Lab 1 - Create a Data Source + +Before we can start querying data from Kafka topics we need to register the Kafka clusters as _data sources_ in SSB. + +. On the Cloudera Manager console, click on the Cloudera logo at the top-left corner to ensure you are at the home page and then click on the *SQL Stream Builder* service. + +. Click on the *SQLStreamBuilder Console* link to open the SSB UI. +.. On the first access you will be prompted to accept the use of cookies on this page. Click *Accept* to dismiss that message. ++ +image::images/ssb-console.png[width=800] + +. To register our Kafka cluster, click *Data Sources > Register Kafka Provider*: ++ +image::images/ssb-register-kafka-provider.png[width=800] + +. In the *Add Kafka Provider* window, enter the details of our Kafka cluster and click *Save changes*. ++ +[source,yaml] +---- +Name: edge2ai-kafka +Brokers: edge2ai-1.dim.local:9092 +Connection protocol: PLAINTEXT +Use Schema Registry: No +---- ++ +image::images/ssb-add-kafka-provider.png[width=400] + +[[lab_2, Lab 2]] +== Lab 2 - Create a Source Virtual Table + +Now we can _map_ the `iot` topic to a _virtual table_ that we can reference in our query. _Virtual Tables_ on SSB are a way to associate a Kafka topic with a schema so that we can use that as a table in our queries. There are two types of virtual tables in SSB: _Source_ and _Sink_. + +We will use a Source Virtual Table now to read from the topic. Later we will look into Sink Virtual Tables to write data to Kafka. + +. To create our first Source Virtual Table, click on *Console* (on the left bar) *> Virtual Tables > Source Virtual Table > Add Source > Apache Kafka*. ++ +image::images/ssb-add-source-virtual-table.png[width=800] + +. On the *Kafka Source* window, enter the following information: ++ +[source,yaml] +---- +Virtual table name: iot_source +Kafka Cluster: edge2ai-kafka +Topic Name: iot +Data Format: JSON +---- ++ +image::images/ssb-kafka-source.png[width=400] + +. Ensure the *Schema* tab is selected. Scroll to the bottom of the tab and click *Detect Schema*. SSB will take a sample of the data flowing through the topic and will infer the schema used to parse the content. Alternatively you could also specify the schema in this tab. ++ +image::images/ssb-detect-schema.png[width=400] + +. If we need to manipulate the source data to fix, cleanse or convert some values, we can define transformations for the data source to perform those changes. These transformations are defined in Javascript. ++ +The serialized record read from Kafka is provided to the Javascript code in the `record` variable. The last command of the transformation must return the serialized content of the modified record. ++ +The data in the `iot` topic has a timestamp expressed in microseconds. Let's say we need the value in seconds. Let's write a transformation to perform that conversion for us at the source. ++ +Click on the *Transformations* tab and enter the following code in the *Code* field: ++ +[source,javascript] +---- +// parse the JSON record +var parsedVal = JSON.parse(record); +// Convert sensor_ts from micro to milliseconds +parsedVal['sensor_ts'] = Math.round(parsedVal['sensor_ts']/1000000); +// serialize output as JSON +JSON.stringify(parsedVal); +---- ++ +image::images/ssb-source-transformations.png[width=400] + +. Click on the *Properties* tab, enter the following value for the *Consumer Group* property and click *Save changes*. ++ +[source,yaml] +---- +Consumer Group: ssb-iot-1 +---- ++ +image::images/ssb-source-properties.png[width=400] ++ +NOTE: Setting the *Consumer Group* properties for a virtual table will ensure that if you stop a query and restart it later, the second query execute will continue to read the data from the point where the first query stopped, without skipping data. *However*, if _multiple queries_ use the same virtual table, setting this property will effectively distribute the data across the queries so that each record is only read by a single query. If you want to share a virtual table with multiple distinct queries, ensure that the Consumer Group property is unset. + +[[lab_3, Lab 3]] +== Lab 3 - Run a simple query + +We have now all that we need to run our first query in SSB. We want to simply query the raw contents of topic to ensure that the everything is working correctly before we proceed to do more complex things. + +If your environment is healthy and all the steps from previous labs were completed correctly you should be able to visualize the data with the steps below. + +. On the SSB UI, click on *Console* (on the left bar) *> Compose > SQL* and type the following query: ++ +[source,sql] +---- +select * +from iot_source +---- ++ +image::images/ssb-compose-sql.png[width=800] + +. Set a *SQL Job Name* for your job or use the random name provided. + +. Do *not* add a Sink Virtual Table. + +. Click *Execute* + +. Scroll to the bottom of the page and you will see the log messages generated by your query execution. ++ +image::images/ssb-sql-execution.png[width=800] + +. After a few seconds the SQL Console will start showing the results of the query coming from the `iot` topic. ++ +The data displayed on the screen is only a sample of the data returned by the query, not the full data. ++ +image::images/ssb-sql-results.png[width=800] ++ +Note that the values of the column `sensor_ts` now show in seconds instead of microseconds, thanks to the transformation we created for the `iot_source` virtual table. + +[[lab_4, Lab 4]] +== Lab 4 - Computing and storing agregation results + +We want to start computing window aggregates for our incoming data stream and make the aggregation results available for downstream applications. SQL Stream Builder's Sink Virtual Tables give us the ability to publish/store streaming data to several different services (Kafka, AWS S3, Google GCS, Elastic Search and generic webhooks). In this lab we'll use a Kafka sink to publish the results of our aggregation to another Kafka topic. + +. Let's first create a topic (`sensor6_stats`) where to publish our aggregation results: +.. Navigate to the SMM UI (*Cloudera Manager > SMM* service *> +Streams Messaging Manager Web UI*). +.. On the SMM UI, click the *Topics* tab (image:images/topics_icon.png[width=25]). +.. Click the *Add New* button. +.. Enter the following details for the topic and click *Save* when ready: +... Topic name: `sensor6_stats` +... Partitions: `10` +... Availability: `Low` +... Cleanup Policy: `delete` + +. To create the Sink Virtual Table, click on *Console* (on the left bar) *> Virtual Tables > Sink Virtual Table > Add Source > Apache Kafka*. ++ +image::images/ssb-add-sink-virtual-table.png[width=800] + +. On the *Kafka Sink* window, enter the following information and click *Save changes*: ++ +[source,yaml] +---- +Virtual table name: sensor6_stats_sink +Kafka Cluster: edge2ai-kafka +Topic Name: sensor6_stats +---- ++ +image::images/ssb-kafka-sink.png[width=400] + +. On the SSB UI, click on *Console* (on the left bar) *> Compose > SQL* and type the query shown below. ++ +This query will compute aggregates over 30-seconds windows that slide forward every second. For a specific sensor value in the record (`sensor_6`) it computes the following aggregations for each window: ++ +-- +* Number of events received +* Sum of the `sensor_6` value for all the events +* Average of the `sensor_6` value across all the events +* Min and max values of the `sensor_6` field +* Number of events for which the `sensor_6` value exceeds `70` +-- ++ +[source,sql] +---- +SELECT + HOP_END(eventTimestamp, INTERVAL '1' SECOND, INTERVAL '30' SECOND) as windowEnd, + count(*) as sensorCount, + sum(sensor_6) as sensorSum, + avg(cast(sensor_6 as float)) as sensorAverage, + min(sensor_6) as sensorMin, + max(sensor_6) as sensorMax, + sum(case when sensor_6 > 70 then 1 else 0 end) as sensorGreaterThan60 +FROM iot_source +GROUP BY + HOP(eventTimestamp, INTERVAL '1' SECOND, INTERVAL '30' SECOND) +---- ++ +image::images/ssb-sql-aggregation.png[width=800] + +. Enter `Sensor 6 stats` for the *SQL Job Name* field. + +. On the *Sink Virtual Table* field, click on the *None* drop-down and select the Virtual Sink Table that you created previously (`sensor6_stats`) ++ +image::images/ssb-select-sink.png[width=800] + +. Click *Execute*. After a few seconds you should see the sampled output of the query in the *Results* tab. + +. Scroll to the bottom of the page and you will see the log messages generated by your query execution. ++ +image::images/ssb-sql-execution.png[width=800] + +. After a few seconds the SQL Console will start showing the results of the query coming from the `iot` topic. ++ +The data displayed on the screen is only a sample of the data returned by the query, not the full data. ++ +image::images/ssb-sql-results-2.png[width=800] + +. Check the job execution details and logs by clicking on *Console* (on the left bar) *> SQL Jobs* tab. Explore the options on this screen: ++ +-- +.. Click on the `Sensor 6 stats` job +.. Click on the *Details* tab to see job details. +.. Click on the *Log* tab to see log messages generated by the job execution. +-- ++ +image::images/ssb-job-details.png[width=800] + +. Let's query the `sensor6_stats` table to examine the data that is being written to it. First we need to define a Source Virtual Table associated with the `sensor6_stats` topic. ++ +-- +.. Click on *Console* (on the left bar) *> Virtual Tables > Source Virtual Table > Add Source > Apache Kafka*. +.. On the *Kafka Source* window, enter the following information and click *Save changes*: ++ +[source,yaml] +---- +Virtual table name: sensor6_stats_source +Kafka Cluster: edge2ai-kafka +Topic Name: sensor6_stats +Data Format: JSON +---- +-- +.. Click on *Detect Schema* and wait for the schema to be updated. +.. Click *Save changes*. + +. Click on *Console* (on the left bar) to refresh the screen and clear the SQL Compose field, which may still show the running aggregation job. ++ +Note that the job will continue to run in the background and you can continue to monitor it through the *Job Logs* page. + +. Enter the following query in the SQL field and execute it: ++ +[source,sql] +---- +SELECT * +FROM sensor6_stats_source +---- + +. After a few seconds you should see the contents of the `sensor6_stats` topic displayed on the screen: ++ +image::images/ssb-stats-results.png[width=800] diff --git a/streaming_flink.adoc b/streaming_flink.adoc new file mode 100644 index 0000000..0f6f725 --- /dev/null +++ b/streaming_flink.adoc @@ -0,0 +1,135 @@ += Streams Processing with Apache Flink + +In this workshop is build on top of the "edge2AI Workshop" (pre-condition) and add enhanced features in stream processing. + +== Labs summary + +* *Lab 1* - Count by sensor_id +* *Lab 2* - Filtering on sensor_0 value + + +[[lab_1, Lab 1]] +== Lab 1 - Apache Flink - Count by sensor_id + +. Let's use the #iot stream form the sensors from the previous lab ++ +.. *Dataflow:* ++ +image::images/iot_streamingFlinkDataflowCount.png[width=800] ++ +.. Open two SSH connections to your environment ++ +image::images/flink_ssl_lite.png[width=800] ++ +let's have look at the code: ++ +Local Execution Environment ++ +[source,java] +---- +// get iot stream from kafka - topic "iot" + DataStream<String> iotStream = env.addSource( + new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties)); +---- ++ +Collection Data Sources ++ +[source,java] +---- +// split and sum on 'sensor_id' + DataStream<Tuple5<Long, Integer, Integer, Integer, Integer>> aggStream = iotStream + .flatMap(new trxJSONDeserializer()) + .keyBy(1) // = sensor_id + .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) + .sum(4) ; +---- ++ +Iterator Data Sink ++ +---- +// write the aggregated data stream to a Kafka sink + FlinkKafkaProducer<Tuple5<Long, Integer, Integer, Integer, Integer>> myProducer = new FlinkKafkaProducer<Tuple5<Long, Integer, Integer, Integer, Integer>>( + topic, new serializeSum2String(), propertiesProducer); + + aggStream.addSink(myProducer); +---- ++ +. Let's run the application + ++ +use the first SSH connection to run the Flink application ++ +[source,shell] +---- +cd /opt/cloudera/parcels/FLINK +sudo wget https://github.com/zBrainiac/streaming-flink/releases/download/0.3.1/streaming-flink-0.3.1.0.jar -P /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming +./bin/flink run -m yarn-cluster -c consumer.IoTUC1CountEventsPerSensorId -ynm IoTUC1CountEventsPerSensorId lib/flink/examples/streaming/streaming-flink-0.3.1.0.jar edge2ai-1.dim.local:9092 +---- ++ +. Let's see how the application works ++ +.. use the second SSH connection to see the result ++ +[source,shell] +---- +$ cd /opt/cloudera/parcels/CDH +$ ./bin/kafka-console-consumer --bootstrap-server edge2ai-1.dim.local:9092 --topic result_iot_uc1_Count_EventsPerSensorId +---- ++ +SSH connection ++ +image::images/Kafka_topic_simulation_sum.png[width=800] ++ +.. SMM view: ++ +image::images/SMM_topic_simulation_sum.png[width=800] ++ +.. YARN & FLINK UI view: ++ +Flink UI provide more details and monitoring of the job's ++ +image::images/FLINK_running_jobs_lite.png[width=800] + +[[lab_2, Lab 2]] +== Lab 2 - Filtering on sensor_0 value +. Let’s use the #iot stream form the sensors from the previous lab ++ +.. *Dataflow:* ++ +image::images/iot_streamingFlinkDataflowFilter.png[width=800] +Collection Data Sources ++ +[source,java] +---- +// split on 'sensor_id' & filter on sensor_0 +DataStream<Tuple5<Long, Integer, Integer, Integer, Integer>> aggStream = iotStream + .flatMap(new trxJSONDeserializer()) + .keyBy(1) // sensor_id + .sum(4) + .filter(new FilterFunction<Tuple5<Long, Integer, Integer, Integer, Integer>>() + @Override + public boolean filter(Tuple5<Long, Integer, Integer, Integer, Integer> value) throws Exception { + return value.f2 >= 50 ; + } + }); +---- ++ +. Let's run the application ++ +use the new SSH connection to run the Flink application ++ +[source,shell] +---- +cd /opt/cloudera/parcels/FLINK +./bin/flink run -m yarn-cluster -c consumer.IoTUC2CountEventsPerSensorIdFilter -ynm IoTUC2CountEventsPerSensorIdFilter lib/flink/examples/streaming/streaming-flink-0.3.1.0.jar edge2ai-1.dim.local:9092 +---- ++ +. Let's see how the application works ++ +.. use the second SSH connection to see the result ++ +[source,shell] +---- +$ cd /opt/cloudera/parcels/CDH +$ ./bin/kafka-console-consumer --bootstrap-server edge2ai-1.dim.local:9092 --topic result_iot_Consumer_Filter +---- \ No newline at end of file diff --git a/streaming_lite.adoc b/streaming_lite.adoc new file mode 100644 index 0000000..9f4c02b --- /dev/null +++ b/streaming_lite.adoc @@ -0,0 +1,566 @@ += From Edge to Streams Processing + +In this workshop you'll implement a data pipeline, using NiFi to ingest data from an IoT device into Kafka and then consume data from Kafka and write it to Kudu tables. + +== Labs summary + +* *Lab 1* - On the Apache NiFi, run a simulator to send IoT sensors data to the MQTT broker. +* *Lab 2* - On Schema Registry, register the schema describing the data generated by the IoT sensors. +* *Lab 3* - On the NiFi cluster, prepare the data and send it to the *Kafka* cluster. +* *Lab 4* - On the *Streams Messaging Manager (SMM)* Web UI, monitor the Kafka cluster and confirm data is being ingested correctly. +* *Lab 5* - Use NiFi to consume each record from *Kafka* and save results to *Kudu*. +* *Lab 6* - Visualize Kudu data in real-time +* *Lab 7* - Apache Flink- your friend for streaming use cases + +[[lab_1, Lab 1]] +== Lab 1 - Apache NiFi: setup machine sensors simulator + +predictionIn this lab you will run a simple Python script that simulates IoT sensor data from some hypothetical machines, and send the data to a MQTT broker (link:https://mosquitto.org/[mosquitto]). The gateway host is connected to many and different type of sensors, but they generally all share the same transport protocol, "mqtt". + +. Go to Apache NiFi and add a Processor (ExecuteProcess) to the canvas. ++ +image::images/simulate1.png[width=800] + +. Right-click the processor, select *Configure* (or, alternatively, just double-click the processor). On the *PROPERTIES* tab, set the properties shown below to run our Python simulate script. ++ +[source] +---- +Command: python3 +Command Arguments: /opt/demo/simulate.py +---- ++ +image::images/simulate2.png[width=500] + +. In the *SCHEDULING* tab, set to *Run Schedule: 1 sec* ++ +Alternatively, you could set that to other time intervals: 1 sec, 30 sec, 1 min, etc... ++ +image::images/runSimulator1or30.png[width=500] + +. In the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*. ++ +image::images/nifiTerminateRelationships.png[width=600] + +. You can then right-click to *Start* this simulator runner. ++ +image::images/nifiDemoStart.png[width=400] + +. Right-click and select *Stop* after a few seconds and look at the *provenance*. You'll see that it has run a number of times and produced results. ++ +image::images/NiFiViewDataProvenance.png[width=400] ++ +image::images/NiFiDataProvenance.png[width=800] + + + +[[lab_2, Lab 2]] +== Lab 2 - Registering our schema in Schema Registry + +The data produced by the temperature sensors is described by the schema in file `link:https://raw.githubusercontent.com/asdaraujo/edge2ai-workshop/master/sensor.avsc[sensor.avsc]`. In this lab we will register this schema in Schema Registry so that our flows in NiFi can refer to schema using an unified service. This will also allow us to evolve the schema in the future, if needed, keeping older versions under version control, so that existing flows and flowfiles will continue to work. + +. Go the following URL, which contains the schema definition we'll use for this lab. Select all contents of the page and copy it. ++ +`link:https://raw.githubusercontent.com/asdaraujo/edge2ai-workshop/master/sensor.avsc[https://raw.githubusercontent.com/asdaraujo/edge2ai-workshop/master/sensor.avsc, window="_blank"]` + +. In the Schema Registry Web UI, click the `+` sign to register a new schema. + +. Click on a blank area in the *Schema Text* field and paste the contents you copied. + +. Complete the schema creation by filling the following properties: ++ +[source] +---- +Name: SensorReading +Description: Schema for the data generated by the IoT sensors +Type: Avro schema provider +Schema Group: Kafka +Compatibility: Backward +Evolve: checked +---- ++ +image::images/register_schema.png[width=800] + +. Save the schema + +[[lab_3, Lab 3]] +== Lab 3 - Configuring the NiFi flow and pushing data to Kafka + +In this lab, you will create a NiFi flow to receive the data from mqtt gateways and push it to **Kafka**. + +=== Creating a Process Group + +Before we start building our flow, let's create a Process Group to help organizing the flows in the NiFi canvas and also to enable flow version control. + +. Open the NiFi Web UI, create a new Process Group and name it something like *Process Sensor Data*. ++ +image::images/create_pgroup_lite.png[width=800] + +. We want to be able to version control the flows we will add to the Process Group. In order to do that, we first need to connect NiFi to the *NiFi Registry*. On the NiFi global menu, click on "Controller Settings", navigate to the "Registry Clients" tab and add a Registry client with the following URL: ++ +---- +Name: NiFi Registry +URL: http://edge2ai-1.dim.local:18080 +---- ++ +image::images/global_controller_settings.png[width=800] ++ +image::images/add_registry_client.png[width=800] + +. On the *NiFi Registry* Web UI, add another bucket for storing the Sensor flow we're about to build'. Call it `SensorFlows`: ++ +image::images/sensor_flows_bucket.png[width=800] + +. Back on the *NiFi* Web UI, to enable version control for the Process Group, right-click on it and select *Version > Start version control* and enter the details below. Once you complete, a image:images/version_control_tick.png[width=20] will appear on the Process Group, indicating that version control is now enabled for it. ++ +[source] +---- +Registry: NiFi Registry +Bucket: SensorFlows +Flow Name: SensorProcessGroup +---- + +. Let's also enable processors in this Process Group to use schemas stored in Schema Registry. Right-click on the Process Group, select *Configure* and navigate to the *Controller Services* tab. Click the *`+`* icon and add a *HortonworksSchemaRegistry* service. After the service is added, click on the service's _cog_ icon (image:images/cog_icon.png[width=20]), go to the *Properties* tab and configure it with the following *Schema Registry URL* and click *Apply*. ++ +[source] +---- +URL: http://edge2ai-1.dim.local:7788/api/v1 +---- ++ +image::images/added_hwx_sr_service.png[width=800] + +. Click on the _lightning bolt_ icon (image:images/enable_icon.png[width=20]) to *enable* the *HortonworksSchemaRegistry* Controller Service. + +. Still on the *Controller Services* screen, let's add two additional services to handle the reading and writing of JSON records. Click on the image:images/plus_button.png[width=25] button and add the following two services: +** *`JsonTreeReader`*, with the following properties: ++ +[source] +---- +Schema Access Strategy: Use 'Schema Name' Property +Schema Registry: HortonworksSchemaRegistry +Schema Name: ${schema.name} -> already set by default! +---- + +** *`JsonRecordSetWriter`*, with the following properties: ++ +[source] +---- +Schema Write Strategy: HWX Schema Reference Attributes +Schema Access Strategy: Inherit Record Schema +Schema Registry: HortonworksSchemaRegistry +---- + +. Enable the *JsonTreeReader* and the *JsonRecordSetWriter* Controller Services you just created, by clicking on their respective _lightning bolt_ icons (image:images/enable_icon.png[width=20]). ++ +image::images/controller_services.png[width=500] + +=== Creating the flow + +. Double-click on the newly created process group to expand it. + +. Inside the process group, add a new _ConsumeMQTT_ processor. ++ +image::images/add_ConsumeMQTT_lite.png[width=500] + ++ +*PROPERTIES* tab: ++ +[source] +---- +Broker URI: tcp://edge2ai-1.dim.local:1883 +Client ID: sensor-iot +Topic Filter: iot/# +Max Queue Size: 60 +---- + +. We need to tell NiFi which schema should be used to read and write the Sensor data. For this we'll use an _UpdateAttribute_ processor to add an attribute to the FlowFile indicating the schema name. ++ +Add an _UpdateAttribute_ processor by dragging the processor icon to the canvas: ++ +image::images/add_updateattribute_lite.png[width=800] + +. Double-click the _UpdateAttribute_ processor and configure it as follows: +.. In the _SETTINGS_ tab: ++ +[source] +---- +Name: Set Schema Name +---- +.. In the _PROPERTIES_ tab: +** Click on the image:images/plus_button.png[width=25] button and add the following property: ++ +[source] +---- +Property Name: schema.name +Property Value: SensorReading +---- +.. Click *Apply* + +. Connect the *Consume mqtt* input port to the *Set Schema Name* processor. + +. Add a _PublishKafkaRecord_2.0_ processor and configure it as follows: ++ +*SETTINGS* tab: ++ +[source] +---- +Name: Publish to Kafka topic: iot +---- ++ +*PROPERTIES* tab: ++ +[source] +---- +Kafka Brokers: edge2ai-1.dim.local:9092 +Topic Name: iot +Record Reader: JsonTreeReader +Record Writer: JsonRecordSetWriter +Use Transactions: false +Attributes to Send as Headers (Regex): schema.* +---- ++ +NOTE: Make sure you use the PublishKafkaRecord_2.0 processor and *not* the PublishKafka_2.0 one + +. While still in the _PROPERTIES_ tab of the _PublishKafkaRecord_2.0_ processor, click on the image:images/plus_button.png[width=25] button and add the following property: ++ +[source] +---- +Property Name: client.id +Property Value: nifi-sensor-data +---- ++ +Later, this will help us clearly identify who is producing data into the Kafka topic. + +. Connect the *Set Schema Name* processor to the *Publish to Kafka topic: iot* processor. + +. Add a new _Funnel_ to the canvas and connect the PublishKafkaRecord processor to it. When the "Create connection" dialog appears, select "*failure*" and click *Add*. ++ +image::images/add_kafka_failure_connection.png[width=600] + +. Double-click on the *Publish to Kafka topic: iot* processor, go to the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*. ++ +image::images/terminate_publishkafka_relationship.png[width=600] + +. Start all three processors. Your canvas should now look like the one below: ++ +image::images/publishKafka_flow_lite.png[width=800] + + +. Refresh the screen (`Ctrl+R` on Linux/Windows; `Cmd+R` on Mac) and you should see that the records were processed by the _PublishKafkaRecord_ processor and there should be no records queued on the "failure" output queue. ++ +image::images/kafka_success_lite.png[width=800] ++ +At this point, the messages are already in the Kafka topic. You can add more processors as needed to process, split, duplicate or re-route your FlowFiles to all other destinations and processors. + +. To complete this Lab, let's commit and version the work we've just done. Go back to the NiFi root canvas, clicking on the "Nifi Flow" breadcrumb. Right-click on the *Process Sensor Data* Process Group and select *Version > Commit local changes*. Enter a descriptive comment and save. + +[[lab_4, Lab 4]] +== Lab 4 - Use SMM to confirm that the data is flowing correctly + +Now that our NiFi flow is pushing data to Kafka, it would be good to have a confirmation that everything is running as expected. In this lab you will use Streams Messaging Manager (SMM) to check and monitor Kafka. + +. Start the *NiFi ExecuteProcess* simulator again and confirm you can see the messages queued in NiFi. Leave it running. + +. Go to the Stream Messaging Manager (SMM) Web UI and familiarize yourself with the options there. Notice the filters (blue boxes) at the top of the screen. ++ +image::images/smm.png[width=800] + +. Click on the *Producers* filter and select only the *`nifi-sensor-data`* producer. This will hide all the irrelevant topics and show only the ones that producer is writing to. + +. If you filter by *Topic* instead and select the `iot` topic, you'll be able to see all the *producers* and *consumers* that are writing to and reading from it, respectively. Since we haven't implemented any consumers yet, the consumer list should be empty. + +. Click on the topic to explore its details. You can see more details, metrics and the break down per partition. Click on one of the partitions and you'll see additional information and which producers and consumers interact with that partition. ++ +image::images/producers.png[width=800] + +. Click on the *EXPLORE* link to visualize the data in a particular partition. Confirm that there's data in the Kafka topic and it looks like the JSON produced by the sensor simulator. ++ +image::images/explore_partition.png[width=800] + +. Check the data from the partition. You'll notice something odd. These are readings from temperature sensors and we don't expect any of the sensors to measure temperatures greater than 150 degrees in the conditions they are used. It seems, though, that `sensor_0` and `sensor_1` are intermittently producing noise and some of the measurements have very high values for these measurements. ++ +image::images/troubled_sensors.png[width=800] + +. Stop the *NiFi ExecuteProcess* simulator again. + +. In the next Lab we'll eliminate with these problematic measurements to avoid problems later in our data flow. + +[[lab_5, Lab 5]] +== Lab 5 - Use NiFi to consume each record from *Kafka* and save results to *Kudu*. + +In this lab, you will use NiFi to consume the Kafka messages containing the IoT data we ingested in the previous lab. + +=== Add new Controller Services + +When the sensor data was sent to Kafka using the _PublishKafkaRecord_ processor, we chose to attach the schema information in the header of Kafka messages. Now, instead of hard-coding which schema we should use to read the message, we can leverage that metadata to dynamically load the correct schema for each message. + +To do this, though, we need to configure a different _JsonTreeReader_ that will use the schema properties in the header, instead of the `${schema.name}` attribute, as we did before. + + +. If you're not in the *Process Sensor Data* process group, double-click on it to expand it. On the *Operate* panel (left-hand side), click on the _cog_ icon (image:images/cog_icon.png[width=25]) to access the *Process Sensor Data* process group's configuration page. ++ +image::images/operate_panel_cog.png[width=300] + +. Click on the _plus_ button (image:images/plus_button.png[width=25]), add a new *JsonTreeReader*, configure it as shown below and click *Apply* when you're done: ++ +On the *SETTINGS* tab: ++ +[source] +---- +Name: JsonTreeReader - With schema identifier +---- ++ +On the *PROPERTIES* tab: ++ +[source] +---- +Schema Access Strategy: HWX Schema Reference Attributes +Schema Registry: HortonworksSchemaRegistry +---- + +. Click on the _lightning bolt_ icon (image:images/enable_icon.png[width=20]) to *enable* the *JsonTreeReader - With schema identifier* controller service. + + ++ +image::images/additional_controller_services_lite.png[width=800] + +. Close the *Process Sensor Data Configuration* page. + +=== Create the flow + +We'll now create the flow to read the sensor data from Kafka and write the results to Kudu. At the end of this section you flow should look like the one below: + +image::images/from_kafka_to_kudu_flow_lite.png[width=800] + +==== ConsumeKafkaRecord_2_0 processor + +. We'll add a new flow to the same canvas we were using before (inside the *Process Sensor Data* Process Group). Click on an empty area of the canvas and drag it to the side to give you more space to add new processors. + +. Add a *ConsumeKafkaRecord_2_0* processor to the canvas and configure it as shown below: ++ +*SETTINGS* tab: ++ +[source] +---- +Name: Consume Kafka iot messages +---- ++ +*PROPERTIES* tab: ++ +[source] +---- +Kafka Brokers: edge2ai-1.dim.local:9092 +Topic Name(s): iot +Topic Name Format: names +Record Reader: JsonTreeReader - With schema identifier +Record Writer: JsonRecordSetWriter +Honor Transactions: false +Group ID: iot-sensor-consumer +Offset Reset: latest +Headers to Add as Attributes (Regex): schema.* +---- + +. Reuse existing _Funnel_ to the canvas and connect the *Consume Kafka iot messages* to it. When prompted, check the *parse.failure* relationship for this connection: ++ +image:images/parse_failure_relationship.png[width=500] + +==== PutKudu processor + +. Add a *PutKudu* processor to the canvas and configure it as shown below: ++ +*SETTINGS* tab: ++ +[source] +---- +Name: Write to Kudu +---- ++ +*PROPERTIES* tab: ++ +[source] +---- +Kudu Masters: edge2ai-1.dim.local:7051 +Table Name: impala::default.sensors +Record Reader: JsonTreeReader - With schema identifier +---- + +. Connect the *Consume Kafka iot message* processor to the *Write to Kudu* one. When prompted, check the *success* relationship for this connection. + +. Connect the *Write to Kudu* to the same _Funnel_ you had created above. When prompted, check the *failure* relationship for this connection. + +. Double-click on the *Write to Kudu* processor, go to the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*. + +==== Create the Kudu table + +NOTE: If you already created this table in a previous workshop, please skip the table creation here. + +. Go to the Hue Web UI and login. The first user to login to a Hue installation is automatically created and granted admin privileges in Hue. + +. The Hue UI should open with the Impala Query Editor by default. If it doesn't, you can always find it by clicking on *Query button > Editor -> Impala*: ++ +image::images/impala_editor.png[width=800] + +. First, create the Kudu table. Login into Hue, and in the Impala Query, run this statement: ++ +[source,sql] +---- +CREATE TABLE sensors +( + sensor_id INT, + sensor_ts STRING, + sensor_0 DOUBLE, + sensor_1 DOUBLE, + sensor_2 DOUBLE, + sensor_3 DOUBLE, + sensor_4 DOUBLE, + sensor_5 DOUBLE, + sensor_6 DOUBLE, + sensor_7 DOUBLE, + sensor_8 DOUBLE, + sensor_9 DOUBLE, + sensor_10 DOUBLE, + sensor_11 DOUBLE, + is_healthy INT, + PRIMARY KEY (sensor_ID, sensor_ts) +) +PARTITION BY HASH PARTITIONS 16 +STORED AS KUDU +TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); +---- ++ +image::images/create_table.png[width=800] + +==== Running the flow + +We're ready now to run and test our flow. Follow the steps below: + +. Start all the processors in your flow. + +. Refresh your NiFi page and you should see messages passing through your flow. The failure queues should have no records queued up. ++ +image::images/kudu_success_lite.png[width=800] + +[[lab_6, Lab 6]] +== Lab 6 - Visualize Kudu data in real-time + +In this lab, you will run some SQL queries using the Impala engine and verify that the Kudu table is being updated as expected. + +. Login into Hue and run the following queries in the Impala Query Editor: ++ +[source,sql] +---- +SELECT count(*) +FROM sensors; +---- ++ +[source,sql] +---- +SELECT * +FROM sensors +ORDER by sensor_ts DESC +LIMIT 100; +---- ++ +. Run the queries a few times and verify that the number of sensor readings are increasing as the data is ingested into the Kudu table. ++ +image::images/hue_timestamp_issue.png[width=400] ++ +You will notice that the data in the sensor_ts field has a potential data quality issue. The values should be timestamps, but they arrive as UNIX timestamps with milliseconds. In order to show the timestamp properly, we need to run a slightly modified version of the query: ++ +[source,sql] +---- +SELECT microseconds_add(from_unixtime(cast(cast(sensor_ts as bigint)/1000000 as bigint),"yyyy-MM-dd HH:mm:ss"),cast(right(sensor_ts,6) as INT)) +FROM sensors; +---- ++ +image::images/hue_timestamp_fixed.png[width=800] ++ +. We will now create a simple interactive real-time dashboard. Open CDP Data Visualization and log in. Select the Data tab and click NEW CONNECTION. ++ +Similarly to the example above, we will use the Impala engine to tun the queries behind the dashboard. We will set up a new connection to the Impala engine running on our cluster. ++ +On the top of the form, please set: ++ +[source] +---- +Connection type: Impala +Connection name: Local Impala +---- ++ +On the *Basic* tab set the following: +(In the Hostname or IP address field please enter the full public hostname of your cluster - you can find this on your cluster details page) ++ +[source] +---- +Hostname or IP address: ec2-x-x-x-x.eu-central-1.compute.amazonaws.com (replace with your exact public hostname - *not* your cluster IP) +Port #: 21050 +Username: [leave blank] +Password: [leave blank] +---- ++ +image::images/viz_new_connection_basic.png[width=500] ++ +On the *Advanced* tab set the following: ++ +[source] +---- +Connection mode: binary +Socket type: normal +Authentication mode: NoSasl +---- ++ +image::images/viz_new_connection_advanced.png[width=500] ++ +When both tabs have been filled, test the connection. You should see "Connection Verified". Hit Connect. + +. We can explore the databases and tables of the newly added connection. From the newly created Impala connection open the default database and select "sensors". A preview with sample data will be loaded. ++ +image::images/viz_connection_explorer_table.png[width=800] ++ +Click on "New dataset" and name the dataset "sensor data" ++ +image::images/viz_new_dataset.png[width=400] + +. Before we create our dashboard, we will need to apply the timestamp fix that we have implemented earlier. In order to do that, open the new dataset and select the "Fields" tab and click "Edit fields". In the the Dimensions list you can find the field "sensor_ts". Open the drop-down menu and click "Clone". ++ +image::images/viz_clone_field.png[width=400] ++ +A new dimension "Copy of sensor_ts" will appear. Again, open the drop down in this new row, and select "Edit field". This will open a pop-up window. Change the name in Display name to "sensor_timestamp", and click the "Expression" tab. We can define an expression here which will be evaluated and the return value will be assigned to our newly created "sensor_timestamp" computed field. ++ +Paste the following in the Expression text field: ++ +[source,sql] +---- +microseconds_add(from_unixtime(cast(cast([sensor_ts] as bigint)/1000000 as bigint),"yyyy-MM-dd HH:mm:ss"),cast(right([sensor_ts],6) as INT)) +---- ++ +image::images/viz_add_expression.png[width=600] ++ +Validate the expression and click "Apply". You can see that the field has been updated. Save you changes by clicking the green "Save" button. ++ +image::images/viz_save_dataset.png[width=600] ++ +Now that our dataset is complete and we fixed the data quality issue on the fly, we can create our dashboard. Please click "New dashboard" in the top right corner. + +. When the dashboard editor opens we can see a table visual created by default with all fields displayed. ++ +image::images/viz_create_dashboard.png[width=800] ++ +Let's make some quick changes to this visual to create our real-time dashboard. With the table visual highlighter, open the Build tab on the right of the screen and highlight in the Measures box. Now you can click on the fields "sensor_0" and "sensor_1", these will be added as measures. We can change the default aggregation from max() to avg() by selecting one of the newly added field and opening "Aggregates" and selecting "Average". Let's do this for both measures. ++ +image::images/viz_dashboard_add_measures.png[width=400] ++ +As for dimensions, let's add the fields "sensor_timestamp" and "sensor_id". Highlight "sensor_timestamp" and from "Order and Top K" select "Descending". This will show the values in the table visual in descending order with the newest sensor readings on top. Click "Refresh visual. ++ +image::images/viz_dashboard_add_dimensions.png[width=400] ++ +Finally, select the "Settings" tab on the right of the screen and change the value for "Auto-refresh period (sec)" to 5. ++ +image::images/viz_dashboard_auto_refresh.png[width=400] ++ +Now our dashboard is complete. Notice that the data in the table is already updated automatically. As a final step, hit Save on the top of the dashboard to save the changes and click View to enter view / publish mode. This is what your dashboard consumers will see - the sensor reading coming in through our streaming timeline, displayed in an real-time dashboard. ++ +image::images/viz_dashboard_view.png[width=500] + +[[lab_7, Lab 7]] +== Lab 7 - Apache Flink - your friend for streaming use cases +* click here for the next Lab: link:sql_stream_builder_lite.adoc[Querying streams with SQL] diff --git a/streaming_lite_enhanced.adoc b/streaming_lite_enhanced.adoc new file mode 100644 index 0000000..6d16ef7 --- /dev/null +++ b/streaming_lite_enhanced.adoc @@ -0,0 +1,472 @@ += From Edge to Streams Processing + +In this workshop you'll implement a data pipeline, using NiFi to ingest data from an IoT device into Kafka and then consume data from Kafka and write it to Kudu tables. + +== Labs summary + +* *Lab 1* - On the Apache NiFi, run a simulator to send IoT sensors data to the MQTT broker. +* *Lab 2* - On Schema Registry, register the schema describing the data generated by the IoT sensors. +* *Lab 3* - On the NiFi cluster, prepare the data and send it to the *Kafka* cluster. +* *Lab 4* - On the *Streams Messaging Manager (SMM)* Web UI, monitor the Kafka cluster and confirm data is being ingested correctly. +* *Lab 5* - Use NiFi to consume each record from *Kafka* and save results to *Kudu*. +* *Lab 6* - Check the data on Kudu. +* *Lab 7* - Apache Flink- your friend for streaming use cases + +[[lab_1, Lab 1]] +== Lab 1 - Apache NiFi: setup machine sensors simulator + +predictionIn this lab you will run a simple Python script that simulates IoT sensor data from some hypothetical machines, and send the data to a MQTT broker (link:https://mosquitto.org/[mosquitto]). The gateway host is connected to many and different type of sensors, but they generally all share the same transport protocol, "mqtt". + +. Go to Apache NiFi and add a Processor (ExecuteProcess) to the canvas. ++ +image::images/simulate1.png[width=800] + +. Right-click the processor, select *Configure* (or, alternatively, just double-click the processor). On the *PROPERTIES* tab, set the properties shown below to run our Python simulate script. ++ +[source] +---- +Command: python3 +Command Arguments: /opt/demo/simulate.py +---- ++ +image::images/simulate2.png[width=500] + +. In the *SCHEDULING* tab, set to *Run Schedule: 1 sec* ++ +Alternatively, you could set that to other time intervals: 1 sec, 30 sec, 1 min, etc... ++ +image::images/runSimulator1or30.png[width=500] + +. In the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*. ++ +image::images/nifiTerminateRelationships.png[width=600] + +. You can then right-click to *Start* this simulator runner. ++ +image::images/nifiDemoStart.png[width=400] + +. Right-click and select *Stop* after a few seconds and look at the *provenance*. You'll see that it has run a number of times and produced results. ++ +image::images/NiFiViewDataProvenance.png[width=400] ++ +image::images/NiFiDataProvenance.png[width=800] + + + +[[lab_2, Lab 2]] +== Lab 2 - Registering our schema in Schema Registry + +The data produced by the temperature sensors is described by the schema in file `link:https://raw.githubusercontent.com/zBrainiac/edge2ai-workshop/master/sensor_enhanced.avsc[sensor.avsc]`. In this lab we will register this schema in Schema Registry so that our flows in NiFi can refer to schema using an unified service. This will also allow us to evolve the schema in the future, if needed, keeping older versions under version control, so that existing flows and flowfiles will continue to work. + +. Go the following URL, which contains the schema definition we'll use for this lab. Select all contents of the page and copy it. ++ +`link:https://raw.githubusercontent.com/zBrainiac/edge2ai-workshop/master/sensor_enhanced.avsc[https://raw.githubusercontent.com/zBrainiac/edge2ai-workshop/master/sensor_enhanced.avsc, window="_blank"]` + +. In the Schema Registry Web UI, click the `+` sign to register a new schema. + +. Click on a blank area in the *Schema Text* field and paste the contents you copied. + +. Complete the schema creation by filling the following properties: ++ +[source] +---- +Name: SensorReading +Description: Schema for the data generated by the IoT sensors +Type: Avro schema provider +Schema Group: Kafka +Compatibility: Backward +Evolve: checked +---- ++ +image::images/register_schema.png[width=800] + +. Save the schema + +[[lab_3, Lab 3]] +== Lab 3 - Configuring the NiFi flow and pushing data to Kafka + +In this lab, you will create a NiFi flow to receive the data from mqtt gateways and push it to **Kafka**. + +=== Creating a Process Group + +Before we start building our flow, let's create a Process Group to help organizing the flows in the NiFi canvas and also to enable flow version control. + +. Open the NiFi Web UI, create a new Process Group and name it something like *Process Sensor Data*. ++ +image::images/create_pgroup_lite.png[width=800] + +. We want to be able to version control the flows we will add to the Process Group. In order to do that, we first need to connect NiFi to the *NiFi Registry*. On the NiFi global menu, click on "Controller Settings", navigate to the "Registry Clients" tab and add a Registry client with the following URL: ++ +---- +Name: NiFi Registry +URL: http://edge2ai-1.dim.local:18080 +---- ++ +image::images/global_controller_settings.png[width=800] ++ +image::images/add_registry_client.png[width=800] + +. On the *NiFi Registry* Web UI, add another bucket for storing the Sensor flow we're about to build'. Call it `SensorFlows`: ++ +image::images/sensor_flows_bucket.png[width=800] + +. Back on the *NiFi* Web UI, to enable version control for the Process Group, right-click on it and select *Version > Start version control* and enter the details below. Once you complete, a image:images/version_control_tick.png[width=20] will appear on the Process Group, indicating that version control is now enabled for it. ++ +[source] +---- +Registry: NiFi Registry +Bucket: SensorFlows +Flow Name: SensorProcessGroup +---- + +. Let's also enable processors in this Process Group to use schemas stored in Schema Registry. Right-click on the Process Group, select *Configure* and navigate to the *Controller Services* tab. Click the *`+`* icon and add a *HortonworksSchemaRegistry* service. After the service is added, click on the service's _cog_ icon (image:images/cog_icon.png[width=20]), go to the *Properties* tab and configure it with the following *Schema Registry URL* and click *Apply*. ++ +[source] +---- +URL: http://edge2ai-1.dim.local:7788/api/v1 +---- ++ +image::images/added_hwx_sr_service.png[width=800] + +. Click on the _lightning bolt_ icon (image:images/enable_icon.png[width=20]) to *enable* the *HortonworksSchemaRegistry* Controller Service. + +. Still on the *Controller Services* screen, let's add two additional services to handle the reading and writing of JSON records. Click on the image:images/plus_button.png[width=25] button and add the following two services: +** *`JsonTreeReader`*, with the following properties: ++ +[source] +---- +Schema Access Strategy: Use 'Schema Name' Property +Schema Registry: HortonworksSchemaRegistry +Schema Name: ${schema.name} -> already set by default! +---- + +** *`JsonRecordSetWriter`*, with the following properties: ++ +[source] +---- +Schema Write Strategy: HWX Schema Reference Attributes +Schema Access Strategy: Inherit Record Schema +Schema Registry: HortonworksSchemaRegistry +---- + +. Enable the *JsonTreeReader* and the *JsonRecordSetWriter* Controller Services you just created, by clicking on their respective _lightning bolt_ icons (image:images/enable_icon.png[width=20]). ++ +image::images/controller_services.png[width=800] + +=== Creating the flow + +. Double-click on the newly created process group to expand it. + +. Inside the process group, add a new _ConsumeMQTT_ processor. ++ +image::images/add_ConsumeMQTT_lite.png[width=800] + ++ +*PROPERTIES* tab: ++ +[source] +---- +Broker URI: tcp://edge2ai-1.dim.local:1883 +Client ID: sensor-iot +Topic Filter: iot/# +Max Queue Size: 60 +---- + +. We need to tell NiFi which schema should be used to read and write the Sensor data. For this we'll use an _UpdateAttribute_ processor to add an attribute to the FlowFile indicating the schema name. ++ +Add an _UpdateAttribute_ processor by dragging the processor icon to the canvas: ++ +image::images/add_updateattribute_lite.png[width=800] + +. Double-click the _UpdateAttribute_ processor and configure it as follows: +.. In the _SETTINGS_ tab: ++ +[source] +---- +Name: Set Schema Name +---- +.. In the _PROPERTIES_ tab: +** Click on the image:images/plus_button.png[width=25] button and add the following property: ++ +[source] +---- +Property Name: schema.name +Property Value: SensorReading +---- +.. Click *Apply* + +. Connect the *Consume mqtt* input port to the *Set Schema Name* processor. + +. Add a _PublishKafkaRecord_2.0_ processor and configure it as follows: ++ +*SETTINGS* tab: ++ +[source] +---- +Name: Publish to Kafka topic: iot +---- ++ +*PROPERTIES* tab: ++ +[source] +---- +Kafka Brokers: edge2ai-1.dim.local:9092 +Topic Name: iot +Record Reader: JsonTreeReader +Record Writer: JsonRecordSetWriter +Use Transactions: false +Attributes to Send as Headers (Regex): schema.* +---- ++ +NOTE: Make sure you use the PublishKafkaRecord_2.0 processor and *not* the PublishKafka_2.0 one + +. While still in the _PROPERTIES_ tab of the _PublishKafkaRecord_2.0_ processor, click on the image:images/plus_button.png[width=25] button and add the following property: ++ +[source] +---- +Property Name: client.id +Property Value: nifi-sensor-data +---- ++ +Later, this will help us clearly identify who is producing data into the Kafka topic. + +. Connect the *Set Schema Name* processor to the *Publish to Kafka topic: iot* processor. + +. Add a new _Funnel_ to the canvas and connect the PublishKafkaRecord processor to it. When the "Create connection" dialog appears, select "*failure*" and click *Add*. ++ +image::images/add_kafka_failure_connection.png[width=600] + +. Double-click on the *Publish to Kafka topic: iot* processor, go to the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*. ++ +image::images/terminate_publishkafka_relationship.png[width=600] + +. Start all three processors. Your canvas should now look like the one below: ++ +image::images/publishKafka_flow_lite.png[width=800] + + +. Refresh the screen (`Ctrl+R` on Linux/Windows; `Cmd+R` on Mac) and you should see that the records were processed by the _PublishKafkaRecord_ processor and there should be no records queued on the "failure" output queue. ++ +image::images/kafka_success_lite.png[width=800] ++ +At this point, the messages are already in the Kafka topic. You can add more processors as needed to process, split, duplicate or re-route your FlowFiles to all other destinations and processors. + +. To complete this Lab, let's commit and version the work we've just done. Go back to the NiFi root canvas, clicking on the "Nifi Flow" breadcrumb. Right-click on the *Process Sensor Data* Process Group and select *Version > Commit local changes*. Enter a descriptive comment and save. + +[[lab_4, Lab 4]] +== Lab 4 - Use SMM to confirm that the data is flowing correctly + +Now that our NiFi flow is pushing data to Kafka, it would be good to have a confirmation that everything is running as expected. In this lab you will use Streams Messaging Manager (SMM) to check and monitor Kafka. + +. Start the *NiFi ExecuteProcess* simulator again and confirm you can see the messages queued in NiFi. Leave it running. + +. Go to the Stream Messaging Manager (SMM) Web UI and familiarize yourself with the options there. Notice the filters (blue boxes) at the top of the screen. ++ +image::images/smm.png[width=800] + +. Click on the *Producers* filter and select only the *`nifi-sensor-data`* producer. This will hide all the irrelevant topics and show only the ones that producer is writing to. + +. If you filter by *Topic* instead and select the `iot` topic, you'll be able to see all the *producers* and *consumers* that are writing to and reading from it, respectively. Since we haven't implemented any consumers yet, the consumer list should be empty. + +. Click on the topic to explore its details. You can see more details, metrics and the break down per partition. Click on one of the partitions and you'll see additional information and which producers and consumers interact with that partition. ++ +image::images/producers.png[width=800] + +. Click on the *EXPLORE* link to visualize the data in a particular partition. Confirm that there's data in the Kafka topic and it looks like the JSON produced by the sensor simulator. ++ +image::images/explore_partition.png[width=800] + +. Check the data from the partition. You'll notice something odd. These are readings from temperature sensors and we don't expect any of the sensors to measure temperatures greater than 150 degrees in the conditions they are used. It seems, though, that `sensor_0` and `sensor_1` are intermittently producing noise and some of the measurements have very high values for these measurements. ++ +image::images/troubled_sensors.png[width=800] + +. Stop the *NiFi ExecuteProcess* simulator again. + +. In the next Lab we'll eliminate with these problematic measurements to avoid problems later in our data flow. + +[[lab_5, Lab 5]] +== Lab 5 - Use NiFi to consume each record from *Kafka* and save results to *Kudu*. + +In this lab, you will use NiFi to consume the Kafka messages containing the IoT data we ingested in the previous lab. + +=== Add new Controller Services + +When the sensor data was sent to Kafka using the _PublishKafkaRecord_ processor, we chose to attach the schema information in the header of Kafka messages. Now, instead of hard-coding which schema we should use to read the message, we can leverage that metadata to dynamically load the correct schema for each message. + +To do this, though, we need to configure a different _JsonTreeReader_ that will use the schema properties in the header, instead of the `${schema.name}` attribute, as we did before. + + +. If you're not in the *Process Sensor Data* process group, double-click on it to expand it. On the *Operate* panel (left-hand side), click on the _cog_ icon (image:images/cog_icon.png[width=25]) to access the *Process Sensor Data* process group's configuration page. ++ +image::images/operate_panel_cog.png[width=300] + +. Click on the _plus_ button (image:images/plus_button.png[width=25]), add a new *JsonTreeReader*, configure it as shown below and click *Apply* when you're done: ++ +On the *SETTINGS* tab: ++ +[source] +---- +Name: JsonTreeReader - With schema identifier +---- ++ +On the *PROPERTIES* tab: ++ +[source] +---- +Schema Access Strategy: HWX Schema Reference Attributes +Schema Registry: HortonworksSchemaRegistry +---- + +. Click on the _lightning bolt_ icon (image:images/enable_icon.png[width=20]) to *enable* the *JsonTreeReader - With schema identifier* controller service. + + ++ +image::images/additional_controller_services_lite.png[width=800] + +. Close the *Process Sensor Data Configuration* page. + +=== Create the flow + +We'll now create the flow to read the sensor data from Kafka and write the results to Kudu. At the end of this section you flow should look like the one below: + +image::images/from_kafka_to_kudu_flow_lite.png[width=800] + +==== ConsumeKafkaRecord_2_0 processor + +. We'll add a new flow to the same canvas we were using before (inside the *Process Sensor Data* Process Group). Click on an empty area of the canvas and drag it to the side to give you more space to add new processors. + +. Add a *ConsumeKafkaRecord_2_0* processor to the canvas and configure it as shown below: ++ +*SETTINGS* tab: ++ +[source] +---- +Name: Consume Kafka iot messages +---- ++ +*PROPERTIES* tab: ++ +[source] +---- +Kafka Brokers: edge2ai-1.dim.local:9092 +Topic Name(s): iot +Topic Name Format: names +Record Reader: JsonTreeReader - With schema identifier +Record Writer: JsonRecordSetWriter +Honor Transactions: false +Group ID: iot-sensor-consumer +Offset Reset: latest +Headers to Add as Attributes (Regex): schema.* +---- + +. Reuse existing _Funnel_ to the canvas and connect the *Consume Kafka iot messages* to it. When prompted, check the *parse.failure* relationship for this connection: ++ +image:images/parse_failure_relationship.png[width=500] + +==== PutKudu processor + +. Add a *PutKudu* processor to the canvas and configure it as shown below: ++ +*SETTINGS* tab: ++ +[source] +---- +Name: Write to Kudu +---- ++ +*PROPERTIES* tab: ++ +[source] +---- +Kudu Masters: edge2ai-1.dim.local:7051 +Table Name: impala::default.sensors +Record Reader: JsonTreeReader - With schema identifier +---- + +. Connect the *Consume Kafka iot message* processor to the *Write to Kudu* one. When prompted, check the *success* relationship for this connection. + +. Connect the *Write to Kudu* to the same _Funnel_ you had created above. When prompted, check the *failure* relationship for this connection. + +. Double-click on the *Write to Kudu* processor, go to the *SETTINGS* tab, check the "*success*" relationship in the *AUTOMATICALLY TERMINATED RELATIONSHIPS* section. Click *Apply*. + +==== Create the Kudu table + +NOTE: If you already created this table in a previous workshop, please skip the table creation here. + +. Go to the Hue Web UI and login. The first user to login to a Hue installation is automatically created and granted admin privileges in Hue. + +. The Hue UI should open with the Impala Query Editor by default. If it doesn't, you can always find it by clicking on *Query button > Editor -> Impala*: ++ +image::images/impala_editor.png[width=800] + +. First, create the Kudu table. Login into Hue, and in the Impala Query, run this statement: ++ +[source,sql] +---- +CREATE TABLE sensors_enhanced +( + sensor_id INT, + sensor_ts TIMESTAMP, + sensor_0 DOUBLE, + sensor_1 DOUBLE, + sensor_2 DOUBLE, + sensor_3 DOUBLE, + sensor_4 DOUBLE, + sensor_5 DOUBLE, + sensor_6 DOUBLE, + sensor_7 DOUBLE, + sensor_8 DOUBLE, + sensor_9 DOUBLE, + sensor_10 DOUBLE, + sensor_11 DOUBLE, + is_healthy INT, + city STRING, + lat DOUBLE, + lon DOUBLE, + PRIMARY KEY (sensor_ID, sensor_ts) +) +PARTITION BY HASH PARTITIONS 16 +STORED AS KUDU +TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); +---- ++ +image::images/create_table.png[width=800] + +==== Running the flow + +We're ready now to run and test our flow. Follow the steps below: + +. Start all the processors in your flow. + +. Refresh your NiFi page and you should see messages passing through your flow. The failure queues should have no records queued up. ++ +image::images/kudu_success_lite.png[width=800] + +[[lab_6, Lab 6]] +== Lab 6 - Check the data on Kudu + +In this lab, you will run some SQL queries using the Impala engine and verify that the Kudu table is being updated as expected. + +. Login into Hue and run the following queries in the Impala Query Editor: ++ +[source,sql] +---- +SELECT count(*) +FROM sensors; +---- ++ +[source,sql] +---- +SELECT * +FROM sensors +ORDER by sensor_ts DESC +LIMIT 100; +---- ++ +. Run the queries a few times \and verify that the number of sensor readings are increasing as the data is ingested into the Kudu table. This allows you to build real-time reports for fast action. ++ +image::images/table_select_lite.png[width=800] + +[[lab_7, Lab 7]] +== Lab 7 - Apache Flink- your friend for streaming use cases +* click here for the next Lab: link:streaming_flink.adoc[Streams Processing with Flink]