Skip to content

Latest commit

 

History

History
203 lines (146 loc) · 7.91 KB

live-demo-kafka-connect-iot-mqtt-connector.adoc

File metadata and controls

203 lines (146 loc) · 7.91 KB

Apache Kafka / Kafka Connect / MQTT / Mosquitto Live Demo

Kai Waehner <[email protected]> 22 Jul 2019

This script assumes that all components (Zookeeper, Kafka, Connect, MQTT Broker) use default values.

Please note that Confluent CLI changed with Confluent Platform 5.3+: 'confluent local start' Confluent Platform 5.2 and earlier: 'confluent start'. This guide is tested with Confluent Platform 5.4.0.

Starting backend services

Start the MQTT Broker and test publish / subscribe with 'dummy' topic:

brew services start mosquitto
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"

Make sure to have Confluent folder on PATH. Otherwise, go to $CONFLUENT_INSTALL/bin to execute commands these commands.

Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):

confluent local start connect

Create and deploy MQTT Connector Instance

We use Kafka Connect in distributed mode via REST call (only possible when Connect is running). This is highly recommend as you can do this for one single node (like in this example, but also deploy a distributed Connect cluster).

Important: If you copy&paste the below curl command, make sure that you copy it "plaintext only". Otherwise, it might not work. On my Mac, I copy the command to a text editor first and from there to the command line to execute it.

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source",
"config" : {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.server.uri" : "tcp://127.0.0.1:1883",
    "mqtt.topics" : "temperature",
    "kafka.topic" : "mqtt.temperature",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license":""
    }
}'

Please note that "distributed mode" can also be used for one single Connect instance like when using Confluent CLI. You could also use standalone mode (see more details at the end of the file).

Check Status of Connector

Check if connector is loaded and status is 'RUNNING':

// REST
curl -s "http://localhost:8083/connectors"
curl -s "http://localhost:8083/connectors/mqtt-source/status"
curl -s -X DELETE localhost:8083/connectors/mqtt-source

// Confluent CLI
confluent local status connectors
confluent local status mqtt-source

Next, create a Kafka Topic for consuming the MQTT messages via the MQTT Connector (needs to be the same as in the Connector Config):

kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mqtt.temperature

Starting Client services (Kafka Consumer and MQTT Publisher)

Use a CLI tool such as mosquitto_sub from the shell prompt to consume from MQTT topic (to ensure the MQTT infrastructure works):

$ mosquitto_sub -h 127.0.0.1 -t temperature
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]

Use a CLI tool such as kafka-console-consumer from the shell prompt to consume from Kafka topic:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --property print.key=true --from-beginning
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]

Send a MQTT message to the MQTT Broker, using mosquitto_pub:

mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"

// Send fraud messages. These are routed to a different Kafka Topic if you used the Connector config with the SMT above.
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "fraud"

mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"

Now run a script to generate and publish a continuous stream of MQTT messages:

./sensor_generator.sh

Stop services and destroy test data

Stop the sensor_generator script with Control-C.

Stop the Kafka and MQTT consumers with Control-C.

Finally, stop the backend services:

brew services stop mosquitto
confluent local destroy

Errors? Problems?

Here are some helpful commands if you have problems starting mosquitto or finding out if it is running and on which port:

Find the system process and port of MQTT Broker

// If brew does not work (or you are not on Mac):
// Start Mosquitto
/usr/local/sbin/mosquitto

// Start Moquitto (being in PATH) with a specific config file
mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf

// Mosquitto Broker running? => Find process:
ps -ef | grep mosquitto

// Which port?
lsof -n -P -i4|grep LISTEN

Change log level of Kafka Connect

Logging is a worker-level configuration, so there’s no way to set it per connector instance. To adjust the log levels you modify the connect-log4j.properties file and restart your worker.

Default log level is ERROR. You can change to INFO or DEBUG.

vi /Users/kai.waehner/confluent-5.1.0/etc/kafka/connect-log4j.properties
// => Change log level

// Restart Kafka Connect worker
confluent local stop connect
confluent local start connect

// Check the Connect log:
confluent local log kafka

Configuration of Standalone Mode via Properties File

As alternative to a HTTP call, you could also configure a property file for standalone mode. However, as REST can also be used for one single instance (in distributed mode), I always use this option to configure Kafka Connect. But here is some more information about using a property file instead of HTTP to configure Connect (in standalone mode).

Configure MQTT Connector properties file (for Kafka Connect standalone mode) /Users/kai.waehner/confluent-5.0.0/share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties with your values for

  • MQTT Broker URL

  • MQTT Topic(s) to consume from (comma-separated list)

  • Kafka Topic Mapping (prefix)

For example, if you want to consume the MQTT topics temperature and humidity, and you want the Kafka topics to be mqtt.temperature and mqtt.humidity, then do

    "mqtt.topics" : "temperature,humidity",
    "kafka.topics" : "mqtt.temperature"

This needs to be done before you start Kafka Connect. Example:

name=mqtt-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:32790
mqtt.topics=temperature
kafka.topics=mqtt.temperature