This repository has been archived by the owner on Sep 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
iot_sensor.py
84 lines (62 loc) · 2.07 KB
/
iot_sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""
Efficiency write data from IOT sensor - write changed temperature every minute
"""
import atexit
import platform
from datetime import timedelta
import psutil as psutil
import rx
from rx import operators as ops
from influxdb_client import WriteApi, WriteOptions
from influxdb_client.client.influxdb_client import InfluxDBClient
def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
"""Close clients after terminate a script.
:param db_client: InfluxDB client
:param write_api: WriteApi
:return: nothing
"""
write_api.close()
db_client.close()
def sensor_temperature():
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
:return: actual CPU temperature
"""
os_name = platform.system()
if os_name == "Darwin":
from subprocess import check_output
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
import re
return re.findall(r"\d+", str(output))[0]
else:
return psutil.cpu_count()
def line_protocol(temperature):
"""Create a InfluxDB line protocol with structure:
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
:param temperature: the sensor temperature
:return: Line protocol to write into InfluxDB
"""
import socket
return "iot_sensor,hostname={},type=temperature value={}".format(
socket.gethostname(), temperature
)
"""
Read temperature every minute; distinct_until_changed - produce only if temperature change
"""
data = rx.interval(period=timedelta(seconds=10)).pipe(
ops.map(lambda t: sensor_temperature()),
# ops.distinct_until_changed(),
ops.map(lambda temperature: line_protocol(temperature)),
)
_db_client = InfluxDBClient(
url="http://localhost:8086", token="mytoken", org="myorg", debug=True
)
"""
Create client that writes data into InfluxDB
"""
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
_write_api.write(bucket="mybucket", record=data)
"""
Call after terminate a script
"""
atexit.register(on_exit, _db_client, _write_api)
input()