-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt-listener.py
36 lines (30 loc) · 1005 Bytes
/
mqtt-listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import datetime
import logging
import pprint
logging.basicConfig(level=logging.INFO)
def persists(msg):
if msg.payload == "nan":
logging.info("Skipping invalid measurement")
pass
current_time = datetime.datetime.utcnow().isoformat()
json_body = [
{
"measurement": msg.topic,
"tags": {},
"time": current_time,
"fields": {
"value": msg.payload
}
}
]
logging.info(json_body)
influx_client.write_points(json_body)
influx_client = InfluxDBClient('influxdb', 8086, database='iot')
client = mqtt.Client()
client.on_connect = lambda self, mosq, obj, rc: self.subscribe("#")
client.on_disconnect = lambda self, mosq, obj, rc: client.connect('mosquitto', 1883, keepalive=60)
client.on_message = lambda client, userdata, msg: persists(msg)
client.connect('mosquitto', 1883, keepalive=60)
client.loop_forever()