-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Realtime Streaming Data Integration #61
Comments
Hi @cjkini Here's a minimal example of streaming data to a server with rrcf. Server code:from collections import deque
import rrcf
from sanic import Sanic, response
app = Sanic()
NULL = b''
# Set tree parameters
num_trees = 40
tree_size = 256
# Create a forest of empty trees
forest = []
for _ in range(num_trees):
tree = rrcf.RCTree()
forest.append(tree)
# Create deque to keep track of indices
indices = deque([], maxlen=tree_size)
@app.route("/", methods=['POST'])
async def feed_rrcf(request):
# Get request data
json = request.json
index = json.setdefault('index')
point = json.setdefault('point')
is_valid = (index is not None) and (point is not None)
# If point is valid...
if is_valid:
# Check if tree size is maxed out
if len(indices) == tree_size:
oldest_index = indices.popleft()
else:
oldest_index = None
# Add new index to queue
indices.append(index)
# Initialize anomaly score
avg_codisp = 0
# For each tree...
for tree in forest:
# If tree is above permitted size, drop the oldest point (FIFO)
if oldest_index is not None:
tree.forget_point(oldest_index)
# Insert the new point into the tree
tree.insert_point(point, index=index)
# Compute codisp on the new point and take the average among all trees
avg_codisp += tree.codisp(index) / num_trees
print('CoDisp for point ({index}) is {avg_codisp}'.format(index=index,
avg_codisp=avg_codisp))
return response.raw(NULL)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000) Code to send dataimport requests
import time
import numpy as np
# Set parameters
num_points = 200
ndim = 3
endpoint = 'http://localhost:8000/'
# Set random seed
np.random.seed(0)
# Send random data points to server
for index in range(num_points):
point = np.random.randn(ndim).tolist()
json = {'index' : index, 'point' : point}
requests.post(endpoint, json=json)
time.sleep(0.01) Output
|
Your example above is calculating a CoDisp for each point in the stream but it's not determining whether a point is an anomaly.
Thanks! |
Thanks for pointing this out. The code I posted is just intended as a minimal example for showing the mechanics of running rrcf on a server with streaming data. It should definitely be modified to suit the user's needs.
|
Thanks for the quick reply! Building on the idea of maintaining a CoDisp dictionary for threshold calculation, should the CoDisps of points from this dictionary be removed when their corresponding points are removed from the tree? For example, a tree with tree_size==256 is about to receive its 257th point. The first point is removed from the tree using forget_point() and then the 257th point is inserted. Should the first CoDisp be removed from the CoDisp dictionary? |
That sounds reasonable. You can base your threshold on a longer record if needed though. |
I have 2 questions regarding reservoir sampling with RRCF:
Thanks. |
Hi Guys.
Thanks for this RCCF library. It is so useful.
I have a question here.
How do we integrate this with realtime streaming data such as from mqtt server.? (e.g:mqtt server sends data every one second)
In this rccf streaming example, the generated sine wave data need to be completely generated first, then later it will be pumped into rccf for anomaly detection.
My question here is how do we implement it, if the data keep continuously generated?
I am thinking of using que, or buffer that collect the data point first before it is submitted to rccf for anomaly scoring.
Regards,
cj
The text was updated successfully, but these errors were encountered: