Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime Streaming Data Integration #61

Open
cjkini opened this issue Aug 23, 2019 · 6 comments
Open

Realtime Streaming Data Integration #61

cjkini opened this issue Aug 23, 2019 · 6 comments

Comments

@cjkini
Copy link

cjkini commented Aug 23, 2019

Hi Guys.
Thanks for this RCCF library. It is so useful.
I have a question here.

How do we integrate this with realtime streaming data such as from mqtt server.? (e.g:mqtt server sends data every one second)

In this rccf streaming example, the generated sine wave data need to be completely generated first, then later it will be pumped into rccf for anomaly detection.

My question here is how do we implement it, if the data keep continuously generated?

I am thinking of using que, or buffer that collect the data point first before it is submitted to rccf for anomaly scoring.

Regards,
cj

@mdbartos
Copy link
Member

mdbartos commented Aug 23, 2019

Hi @cjkini

Here's a minimal example of streaming data to a server with rrcf.

Server code:

from collections import deque
import rrcf
from sanic import Sanic, response

app = Sanic()
NULL = b''

# Set tree parameters
num_trees = 40
tree_size = 256

# Create a forest of empty trees
forest = []
for _ in range(num_trees):
    tree = rrcf.RCTree()
    forest.append(tree)

# Create deque to keep track of indices
indices = deque([], maxlen=tree_size)

@app.route("/", methods=['POST'])
async def feed_rrcf(request):
    # Get request data
    json = request.json
    index = json.setdefault('index')
    point = json.setdefault('point')
    is_valid = (index is not None) and (point is not None)
    # If point is valid...
    if is_valid:
        # Check if tree size is maxed out
        if len(indices) == tree_size:
            oldest_index = indices.popleft()
        else:
            oldest_index = None
        # Add new index to queue
        indices.append(index)
        # Initialize anomaly score
        avg_codisp = 0
        # For each tree...
        for tree in forest:
            # If tree is above permitted size, drop the oldest point (FIFO)
            if oldest_index is not None:
                tree.forget_point(oldest_index)
            # Insert the new point into the tree
            tree.insert_point(point, index=index)
            # Compute codisp on the new point and take the average among all trees
            avg_codisp += tree.codisp(index) / num_trees
        print('CoDisp for point ({index}) is {avg_codisp}'.format(index=index,
                                                                  avg_codisp=avg_codisp))
    return response.raw(NULL)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

Code to send data

import requests
import time
import numpy as np

# Set parameters
num_points = 200
ndim = 3
endpoint = 'http://localhost:8000/'

# Set random seed
np.random.seed(0)

# Send random data points to server
for index in range(num_points):
    point = np.random.randn(ndim).tolist()
    json = {'index' : index, 'point' : point}
    requests.post(endpoint, json=json)
    time.sleep(0.01)

Output

>>> python3 server_code.py

[2019-08-23 00:50:15 -0400] [49148] [INFO] Goin' Fast @ http://0.0.0.0:8000
[2019-08-23 00:50:15 -0400] [49148] [INFO] Starting worker [49148]
CoDisp for point (0) is 0.0
[2019-08-23 00:50:34 -0400] - (sanic.access)[INFO][127.0.0.1:49509]: POST http://localhost:8000/  200 0
CoDisp for point (1) is 1.0000000000000004
[2019-08-23 00:50:34 -0400] - (sanic.access)[INFO][127.0.0.1:49511]: POST http://localhost:8000/  200 0
CoDisp for point (2) is 1.2750000000000001
...
CoDisp for point (198) is 3.739963294254606
[2019-08-23 00:51:32 -0400] - (sanic.access)[INFO][127.0.0.1:50110]: POST http://localhost:8000/  200 0
CoDisp for point (199) is 4.481089816800935

@sdlis
Copy link

sdlis commented Aug 26, 2019

@mdbartos

Your example above is calculating a CoDisp for each point in the stream but it's not determining whether a point is an anomaly.

  1. Would it make sense to create a dictionary that stores the CoDisp for each point seen? This would allow for the calculation of a CoDisp threshold (e.g. median + 3*std). If the CoDisp of the next point is greater than the threshold then it'd be an anomaly. This threshold would be recalculated with each new CoDisp added to the dictionary.
  2. Should the RRCF reach maximum size (i.e. tree_size==256 in your example) prior to calculating the CoDisp of any points? I see that you're calculating a point's CoDisp before you reach the maximum tree size.

Thanks!

@mdbartos
Copy link
Member

@sdlis

Thanks for pointing this out.

The code I posted is just intended as a minimal example for showing the mechanics of running rrcf on a server with streaming data. It should definitely be modified to suit the user's needs.

  1. Agreed. Also, computing the average CoDisp over a point's entire lifetime in the tree would be a better anomaly score than just the CoDisp on ingress. There are also probably better sampling strategies than just FIFO sampling (e.g. reservoir sampling), but the correct method to choose would be application-dependent.

  2. I could see a case for doing it either way. If your threshold is set as a percentile of observed CoDisps, then it could still function as a useful anomaly score for n < 256.

@sdlis
Copy link

sdlis commented Aug 27, 2019

Thanks for the quick reply!

Building on the idea of maintaining a CoDisp dictionary for threshold calculation, should the CoDisps of points from this dictionary be removed when their corresponding points are removed from the tree?

For example, a tree with tree_size==256 is about to receive its 257th point. The first point is removed from the tree using forget_point() and then the 257th point is inserted. Should the first CoDisp be removed from the CoDisp dictionary?

@mdbartos
Copy link
Member

mdbartos commented Sep 3, 2019

@sdlis

That sounds reasonable. You can base your threshold on a longer record if needed though.

@sdlis
Copy link

sdlis commented Sep 27, 2019

@mdbartos

I have 2 questions regarding reservoir sampling with RRCF:

  1. Is the codisp (and not avg_codisp) of a point used since there's no guarantee the point will be in multiple trees in the forest let alone at the same index?

  2. My understanding of RRCF is that a point can't be scored without first being inserted into a tree. However, when using reservoir sampling, there's a chance [1-(tree_size/index)] (https://en.wikipedia.org/wiki/Reservoir_sampling) that an incoming point in a stream isn't selected to be inserted into a tree. However, for my use case I need to score all incoming points. Therefore, when a point isn't selected for insertion but still needs to be scored, is it correct to simply insert the new point, calculate the codisp, and then remove the new point from the tree?

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants