In this workshop you’ll implement a data pipeline, using NiFi to ingest data from an IoT device into Kafka and then consume data from Kafka and write it to Kudu tables.
-
Lab 1 - On the Apache NiFi, run a simulator to send IoT sensors data to the MQTT broker.
-
Lab 2 - On Schema Registry, register the schema describing the data generated by the IoT sensors.
-
Lab 3 - On the NiFi cluster, prepare the data and send it to the Kafka cluster.
-
Lab 4 - On the Streams Messaging Manager (SMM) Web UI, monitor the Kafka cluster and confirm data is being ingested correctly.
-
Lab 5 - Use NiFi to consume each record from Kafka and save results to Kudu.
-
Lab 6 - Check the data on Kudu.
-
Lab 7 - Apache Flink- your friend for streaming use cases
predictionIn this lab you will run a simple Python script that simulates IoT sensor data from some hypothetical machines, and send the data to a MQTT broker (mosquitto). The gateway host is connected to many and different type of sensors, but they generally all share the same transport protocol, "mqtt".
-
Go to Apache NiFi and add a Processor (ExecuteProcess) to the canvas.
-
Right-click the processor, select Configure (or, alternatively, just double-click the processor). On the PROPERTIES tab, set the properties shown below to run our Python simulate script.
Command: python3 Command Arguments: /opt/demo/simulate.py
-
In the SCHEDULING tab, set to Run Schedule: 1 sec
Alternatively, you could set that to other time intervals: 1 sec, 30 sec, 1 min, etc…
-
In the SETTINGS tab, check the "success" relationship in the AUTOMATICALLY TERMINATED RELATIONSHIPS section. Click Apply.
-
You can then right-click to Start this simulator runner.
-
Right-click and select Stop after a few seconds and look at the provenance. You’ll see that it has run a number of times and produced results.
The data produced by the temperature sensors is described by the schema in file sensor.avsc
. In this lab we will register this schema in Schema Registry so that our flows in NiFi can refer to schema using an unified service. This will also allow us to evolve the schema in the future, if needed, keeping older versions under version control, so that existing flows and flowfiles will continue to work.
-
Go the following URL, which contains the schema definition we’ll use for this lab. Select all contents of the page and copy it.
-
In the Schema Registry Web UI, click the
+
sign to register a new schema. -
Click on a blank area in the Schema Text field and paste the contents you copied.
-
Complete the schema creation by filling the following properties:
Name: SensorReading Description: Schema for the data generated by the IoT sensors Type: Avro schema provider Schema Group: Kafka Compatibility: Backward Evolve: checked
-
Save the schema
In this lab, you will create a NiFi flow to receive the data from mqtt gateways and push it to Kafka.
Before we start building our flow, let’s create a Process Group to help organizing the flows in the NiFi canvas and also to enable flow version control.
-
Open the NiFi Web UI, create a new Process Group and name it something like Process Sensor Data.
-
We want to be able to version control the flows we will add to the Process Group. In order to do that, we first need to connect NiFi to the NiFi Registry. On the NiFi global menu, click on "Controller Settings", navigate to the "Registry Clients" tab and add a Registry client with the following URL:
Name: NiFi Registry URL: http://edge2ai-1.dim.local:18080
-
On the NiFi Registry Web UI, add another bucket for storing the Sensor flow we’re about to build'. Call it
SensorFlows
: -
Back on the NiFi Web UI, to enable version control for the Process Group, right-click on it and select Version > Start version control and enter the details below. Once you complete, a will appear on the Process Group, indicating that version control is now enabled for it.
Registry: NiFi Registry Bucket: SensorFlows Flow Name: SensorProcessGroup
-
Let’s also enable processors in this Process Group to use schemas stored in Schema Registry. Right-click on the Process Group, select Configure and navigate to the Controller Services tab. Click the
+
icon and add a HortonworksSchemaRegistry service. After the service is added, click on the service’s cog icon (), go to the Properties tab and configure it with the following Schema Registry URL and click Apply.URL: http://edge2ai-1.dim.local:7788/api/v1
-
Click on the lightning bolt icon () to enable the HortonworksSchemaRegistry Controller Service.
-
Still on the Controller Services screen, let’s add two additional services to handle the reading and writing of JSON records. Click on the button and add the following two services:
-
JsonTreeReader
, with the following properties:Schema Access Strategy: Use 'Schema Name' Property Schema Registry: HortonworksSchemaRegistry Schema Name: ${schema.name} -> already set by default!
-
JsonRecordSetWriter
, with the following properties:Schema Write Strategy: HWX Schema Reference Attributes Schema Access Strategy: Inherit Record Schema Schema Registry: HortonworksSchemaRegistry
-
-
Enable the JsonTreeReader and the JsonRecordSetWriter Controller Services you just created, by clicking on their respective lightning bolt icons ().
-
Double-click on the newly created process group to expand it.
-
Inside the process group, add a new ConsumeMQTT processor.
PROPERTIES tab:
Broker URI: tcp://edge2ai-1.dim.local:1883 Client ID: sensor-iot Topic Filter: iot/# Max Queue Size: 60
-
We need to tell NiFi which schema should be used to read and write the Sensor data. For this we’ll use an UpdateAttribute processor to add an attribute to the FlowFile indicating the schema name.
Add an UpdateAttribute processor by dragging the processor icon to the canvas:
-
Double-click the UpdateAttribute processor and configure it as follows:
-
Connect the Consume mqtt input port to the Set Schema Name processor.
-
Add a PublishKafkaRecord_2.0 processor and configure it as follows:
SETTINGS tab:
Name: Publish to Kafka topic: iot
PROPERTIES tab:
Kafka Brokers: edge2ai-1.dim.local:9092 Topic Name: iot Record Reader: JsonTreeReader Record Writer: JsonRecordSetWriter Use Transactions: false Attributes to Send as Headers (Regex): schema.*
NoteMake sure you use the PublishKafkaRecord_2.0 processor and not the PublishKafka_2.0 one -
While still in the PROPERTIES tab of the PublishKafkaRecord_2.0 processor, click on the button and add the following property:
Property Name: client.id Property Value: nifi-sensor-data
Later, this will help us clearly identify who is producing data into the Kafka topic.
-
Connect the Set Schema Name processor to the Publish to Kafka topic: iot processor.
-
Add a new Funnel to the canvas and connect the PublishKafkaRecord processor to it. When the "Create connection" dialog appears, select "failure" and click Add.
-
Double-click on the Publish to Kafka topic: iot processor, go to the SETTINGS tab, check the "success" relationship in the AUTOMATICALLY TERMINATED RELATIONSHIPS section. Click Apply.
-
Start all three processors. Your canvas should now look like the one below:
-
Refresh the screen (
Ctrl+R
on Linux/Windows;Cmd+R
on Mac) and you should see that the records were processed by the PublishKafkaRecord processor and there should be no records queued on the "failure" output queue.At this point, the messages are already in the Kafka topic. You can add more processors as needed to process, split, duplicate or re-route your FlowFiles to all other destinations and processors.
-
To complete this Lab, let’s commit and version the work we’ve just done. Go back to the NiFi root canvas, clicking on the "Nifi Flow" breadcrumb. Right-click on the Process Sensor Data Process Group and select Version > Commit local changes. Enter a descriptive comment and save.
Now that our NiFi flow is pushing data to Kafka, it would be good to have a confirmation that everything is running as expected. In this lab you will use Streams Messaging Manager (SMM) to check and monitor Kafka.
-
Start the NiFi ExecuteProcess simulator again and confirm you can see the messages queued in NiFi. Leave it running.
-
Go to the Stream Messaging Manager (SMM) Web UI and familiarize yourself with the options there. Notice the filters (blue boxes) at the top of the screen.
-
Click on the Producers filter and select only the
nifi-sensor-data
producer. This will hide all the irrelevant topics and show only the ones that producer is writing to. -
If you filter by Topic instead and select the
iot
topic, you’ll be able to see all the producers and consumers that are writing to and reading from it, respectively. Since we haven’t implemented any consumers yet, the consumer list should be empty. -
Click on the topic to explore its details. You can see more details, metrics and the break down per partition. Click on one of the partitions and you’ll see additional information and which producers and consumers interact with that partition.
-
Click on the EXPLORE link to visualize the data in a particular partition. Confirm that there’s data in the Kafka topic and it looks like the JSON produced by the sensor simulator.
-
Check the data from the partition. You’ll notice something odd. These are readings from temperature sensors and we don’t expect any of the sensors to measure temperatures greater than 150 degrees in the conditions they are used. It seems, though, that
sensor_0
andsensor_1
are intermittently producing noise and some of the measurements have very high values for these measurements. -
Stop the NiFi ExecuteProcess simulator again.
-
In the next Lab we’ll eliminate with these problematic measurements to avoid problems later in our data flow.
In this lab, you will use NiFi to consume the Kafka messages containing the IoT data we ingested in the previous lab.
When the sensor data was sent to Kafka using the PublishKafkaRecord processor, we chose to attach the schema information in the header of Kafka messages. Now, instead of hard-coding which schema we should use to read the message, we can leverage that metadata to dynamically load the correct schema for each message.
To do this, though, we need to configure a different JsonTreeReader that will use the schema properties in the header, instead of the ${schema.name}
attribute, as we did before.
-
If you’re not in the Process Sensor Data process group, double-click on it to expand it. On the Operate panel (left-hand side), click on the cog icon () to access the Process Sensor Data process group’s configuration page.
-
Click on the plus button (), add a new JsonTreeReader, configure it as shown below and click Apply when you’re done:
On the SETTINGS tab:
Name: JsonTreeReader - With schema identifier
On the PROPERTIES tab:
Schema Access Strategy: HWX Schema Reference Attributes Schema Registry: HortonworksSchemaRegistry
-
Click on the lightning bolt icon () to enable the JsonTreeReader - With schema identifier controller service.
-
Close the Process Sensor Data Configuration page.
We’ll now create the flow to read the sensor data from Kafka and write the results to Kudu. At the end of this section you flow should look like the one below:
-
We’ll add a new flow to the same canvas we were using before (inside the Process Sensor Data Process Group). Click on an empty area of the canvas and drag it to the side to give you more space to add new processors.
-
Add a ConsumeKafkaRecord_2_0 processor to the canvas and configure it as shown below:
SETTINGS tab:
Name: Consume Kafka iot messages
PROPERTIES tab:
Kafka Brokers: edge2ai-1.dim.local:9092 Topic Name(s): iot Topic Name Format: names Record Reader: JsonTreeReader - With schema identifier Record Writer: JsonRecordSetWriter Honor Transactions: false Group ID: iot-sensor-consumer Offset Reset: latest Headers to Add as Attributes (Regex): schema.*
-
Reuse existing Funnel to the canvas and connect the Consume Kafka iot messages to it. When prompted, check the parse.failure relationship for this connection:
-
Add a PutKudu processor to the canvas and configure it as shown below:
SETTINGS tab:
Name: Write to Kudu
PROPERTIES tab:
Kudu Masters: edge2ai-1.dim.local:7051 Table Name: impala::default.sensors Record Reader: JsonTreeReader - With schema identifier
-
Connect the Consume Kafka iot message processor to the Write to Kudu one. When prompted, check the success relationship for this connection.
-
Connect the Write to Kudu to the same Funnel you had created above. When prompted, check the failure relationship for this connection.
-
Double-click on the Write to Kudu processor, go to the SETTINGS tab, check the "success" relationship in the AUTOMATICALLY TERMINATED RELATIONSHIPS section. Click Apply.
Note
|
If you already created this table in a previous workshop, please skip the table creation here. |
-
Go to the Hue Web UI and login. The first user to login to a Hue installation is automatically created and granted admin privileges in Hue.
-
The Hue UI should open with the Impala Query Editor by default. If it doesn’t, you can always find it by clicking on Query button > Editor → Impala:
-
First, create the Kudu table. Login into Hue, and in the Impala Query, run this statement:
CREATE TABLE sensors_enhanced ( sensor_id INT, sensor_ts TIMESTAMP, sensor_0 DOUBLE, sensor_1 DOUBLE, sensor_2 DOUBLE, sensor_3 DOUBLE, sensor_4 DOUBLE, sensor_5 DOUBLE, sensor_6 DOUBLE, sensor_7 DOUBLE, sensor_8 DOUBLE, sensor_9 DOUBLE, sensor_10 DOUBLE, sensor_11 DOUBLE, is_healthy INT, city STRING, lat DOUBLE, lon DOUBLE, PRIMARY KEY (sensor_ID, sensor_ts) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
In this lab, you will run some SQL queries using the Impala engine and verify that the Kudu table is being updated as expected.
-
Login into Hue and run the following queries in the Impala Query Editor:
SELECT count(*) FROM sensors;
SELECT * FROM sensors ORDER by sensor_ts DESC LIMIT 100;
-
Run the queries a few times \and verify that the number of sensor readings are increasing as the data is ingested into the Kudu table. This allows you to build real-time reports for fast action.
-
click here for the next Lab: Streams Processing with Flink