This repository has been archived by the owner on Sep 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
influx_interact.py
148 lines (124 loc) · 4.51 KB
/
influx_interact.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""query/write from InfluxDB function"""
import datetime
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# import pytz
class influx_class:
"""
Class for querying/writing to InfluxDB. Includes creating client and holding tokens.
"""
def __init__(self, org, url, bucket, token):
"""
Initialize class with credentials and query location.
Keyword arguments:
org -- InfluxDB org, string
url -- URL to query , string (example: "http://localhost:8086//")
bucket -- InfluxDB bucket name to query from, string
token -- token to use for query, string
Returns:
None
"""
self.org = org
self.url = url
self.bucket = bucket
self.token = token
self.client = InfluxDBClient(url=url, token=token, org=org)
def make_query(
self,
location,
id=None,
measurement="READINGS",
field="val_num",
start=None,
end=None,
window=None,
fill=False,
):
"""
Make a query to InfluxDB
Keyword arguments:
location -- building to query, string (example: "Campus Energy Centre")
id -- ID of sensor to query, list of strings
(example: ["p:ubcv:r:205b0392-31f31280"])
measurements: measurement type to query, strings (default = "READINGS")
field -- field to query, string (default = "val_num")
start -- flux query language start, string (default = None)
if default is used, the start of available data is queried
end -- flux query language end, string (default = None)
if default is used, the end of available data is queried
window -- window to use for averaging data in minutes, int (default = None)
if default is used, no windows are applied and all available
data are returned
fill -- if window is used, add na to windows w/out data, boolean
(default = False)
Returns:
result -- result of query, Pandas DataFrame
"""
# convert start, end, windowing input for query
# TODO: potentially build out date parsing/user input checks
if start:
use_start = start
else:
use_start = 0
if end:
use_end = end
else:
use_end = datetime.datetime.now()
if window:
use_window = datetime.timedelta(minutes=window)
else:
use_window = None
# parameters to be used in query
p = {
"_bucket": self.bucket,
"_measurement": measurement,
"_start": use_start,
"_end": use_end,
"_location": location,
"_field": field,
"_every": use_window,
"_fill": fill,
}
query = """from(bucket: _bucket)
|> range(start: _start, stop: _end)
|> filter(fn: (r) => r["_measurement"] == _measurement)
|> filter(fn: (r) => r["siteRef"] == _location)
|> filter(fn: (r) => r["_field"] == _field)"""
# adds multiple ids
if id is not None:
for i in range(0, len(id)):
p["_id" + str(i)] = id[i]
query += """|> filter (fn: (r) => r["uniqueID"] == _id0"""
# filter on multiple ids
for i in range(1, len(id)):
query += """ or r["uniqueID"] == _id""" + str(i)
query += """)"""
if window:
query += (
"""|> aggregateWindow(every: _every, fn: mean, createEmpty: _fill)"""
)
query += (
"""|> drop(columns: ["_start", "_stop", "_measurement","""
""" "_field", "typeRef", "equipRef", "siteRef", "groupRef"])"""
)
# Instantiate the query client. Specify org and query.
result = self.client.query_api().query_data_frame(
org=self.org, query=query, params=p
)
return result
def write_data(self, df, measurement, tags):
"""
Write to InfluxDB
Keyword arguments:
df: data to write to InfluxDB, Pandas DataFrame
measurement: group name, string (ie. CHECK_ANOMALY, TRAINING)
Returns:
None
"""
self.client.write_api(write_options=SYNCHRONOUS).write(
self.bucket,
self.org,
record=df,
data_frame_measurement_name=measurement,
data_frame_tag_columns=tags,
)