This folders contains various test files (non project code) for running a Python program with InfluxDB. The current plan is to move forward with an implementation with RxPy
that queries data from InfluxDB on a rapid schedule, run anomaly detection, and writes back to InfluxDB. This implementation can likely be modified in the future but should provide an initial framework for implementing the near real-time anomaly detection system.
The files/methods currently include:
Both of these scripts are taken from examples provided with the influxdb-client-python
package. They have been modified slightly as needed.
An instance of InfluxDB/telegraf needs to be running to use these scripts.
RxPy
is a package allowing reactive programming with Python. The files tested include:
-
iot_sensors.py
which reads cpu data (usingpsutil
) and writes to InfluxDB every 10 seconds which is scheduled byRxPy
- This is a similar implementation to what telegraf does and could be a potential replacement for telegraf - reading JSON files, and processing them to go into InfluxDB (however, it may be slower that telegraf). There are also likely methods where RxPy listens for files instead of running on a timed basis but this would required additional research.
- Alternatively, a setup could be implemented where RxPy is being used to both query data from InfluxDB, process for anomalies, and write back to InfluxDB.
- Note that the script is currently terminated by user input, a better termination method is likely required.
-
realtime-stream.ipynb
usesRxPy
to query the latest 10 seconds of data from InfluxDB and uses thestreamz
package to create a sink that can be read and plotted in real-time usinghvplot
.- I do not believe that
streamz
is required for our purposes, the purpose in this file was to allowhvplot
to plot the streaming data. - It is also not entirely clear how to clear a sink created by
streamz
. - The main point of this script is showing how
RxPy
could be used with querying.
- I do not believe that
In general, using RxPy
to query InfluxDB, run an anomaly detection script, and write back to InfluxDB seems like a doable process. Items that need to be figured out would be:
- How to query multiple sensors.
- How to run different anomaly detection models on different sensors.
- How/where to save anomaly detection model parameters for future use.
Using RxPy
in place of telegraf to read the initial files prior to InfluxDB seems possible but time would need to be spent figuring out reactive programming in Python. This should only be explored if the project timeline permits. A couple of resources identified include:
- Udemy course: Reactive Programming in Python
- Book: Hands-On Reactive Programming with Python
Using InfluxDB scheduled tasks would allow InfluxDB to provide data using http POST. This could then be picked up by a program for anomaly detection that then writes back to InfluxDB.
This was tested using the the two-telegraf
docker setup.
- One telegraf container is reading cpu/mem data and sending it to influxDB.
- A task was then written into InfluxDB (through logging into the visual interface). The query for the task is in
tasks-qury
. Thetasks-test.ipynb
file is just a test of the query. - The second telegraf container is setup to listen for the task; however, I was not able to get this to work.
If this method is to be used, the following needs to be figured out:
- How to get telegraf to pick-up the task http post from InfluxDB and then have telegraf initiate a python program.
- Or, create an API that receives the InfluxDB task and runs a python program.
This method seems like a higher effort route than using RxPy
. We've had trouble with telegraf so far and relying on telegraf to receive the http POST from InfluxDB may be difficult. It also required telegraf to run a Python program. The alternative method of setting up an API is interesting but as the group does not have experience with this, it also is a higher effort route.
Similar to the discussion above, we are not sure about the level of effort required to get a Python program to successfully run from Telegraf. Going this route could result in a high-effort unsuccessful method (however, it should be noted that this may ultimately be the fastest method).
There may be other methods (using PySpark etc) that could work. However, given the timeline unless an obvious method is identified, additional methods are not being pursued.