Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write Cache #36

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ The beautiful network analyzer, mapper, and monitor.
1. `sudo bash install.sh` - this will walk you through the setup needed for Auth0 information.
2. If you are running docker as non-root, then remove the top section from `install.sh` and re-run.

## Redis notes
Redis is also going to be used a write cache for incoming metrics. This way, the load on the metric database server will be greatly reduced. We can tune the time to write the metrics as well.

## Database Notes

### Attempt 2
Expand Down
6 changes: 5 additions & 1 deletion backend/alive.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ def check_all_hosts():

if not result:
watcher.send_alert(
"Check Alive", alive_type, host_name, summary=summary
"Check Alive",
alive_type,
host_name,
summary=summary,
severity="warning"
)


Expand Down
111 changes: 91 additions & 20 deletions backend/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from werkzeug.utils import secure_filename
from flask_cors import CORS
from PIL import Image
from pid import PidFile

import ansible_helper

Expand Down Expand Up @@ -1743,39 +1744,106 @@ def insert_metric(inp=""):
return "Invalid data", 421

for item in data["metrics"]:

if "tags" in item and "name" in item:

a = redis.Redis(host=os.environ.get("REDIS_HOST") or "redis")

invalid_names = ("agent_name")
parsed_tags = {x : item["tags"][x] for x in item["tags"] if x not in invalid_names}

name = json.dumps({"name" : item["name"], "tags" : parsed_tags}, default=str)
a.set(f"METRIC-{name}", json.dumps(item, default=str))

return "Success", 200

@app.route("/bulk_insert/", methods=["GET"])
@requires_auth_write
def bulk_insert():
"""
Bulk insert of the redis entries into mongo
- Can be called manually or periodically
"""
metrics_latest_updates = []
metrics_updates = []

a = redis.Redis(host=os.environ.get("REDIS_HOST") or "redis")

# Find all metrics from Redis
metrics = a.keys(pattern="METRIC-*")
for metric in metrics:
print("Inserting", metric)
item = json.loads(a.get(metric))
print("Item:", item)
last_time = a.get("last_metric_{}".format(item["tags"]["ip"]))

if "tags" not in item or "name" not in item:
continue

if "timestamp" in item:
try:
# item["timestamp"] = datetime.datetime.fromtimestamp(item["timestamp"])
item["timestamp"] = datetime.datetime.now()
except Exception:
print("Problem with timestamp - ", sys.exc_info())

if "tags" in item and "name" in item:
a = redis.Redis(host=os.environ.get("REDIS_HOST") or "redis")
last_time = a.get("last_metric_{}".format(item["tags"]["ip"]))

if type(item["tags"]) == type({}):
item["tags"]["labyrinth_name"] = item["name"]
item["tags"]["agent_name"] = socket.gethostname()

try:
if last_time and (time.time() - float(last_time)) <= 15:
pass
else:
mongo_client["labyrinth"]["metrics-latest"].replace_one(
{"tags": item["tags"], "name": item["name"]}, item, upsert=True
)
except Exception:
raise Exception(item)
if type(item["tags"]) == type({}):
item["tags"]["labyrinth_name"] = item["name"]
item["tags"]["agent_name"] = socket.gethostname()

if last_time and (time.time() - float(last_time)) <= 120:
try:
if last_time and (time.time() - float(last_time)) <= 15:
pass
else:
mongo_client["labyrinth"]["metrics"].insert_one(item)
"""
mongo_client["labyrinth"]["metrics-latest"].replace_one(
{"tags": item["tags"], "name": item["name"]}, item, upsert=True
)
"""
replacements = {
"name" : item["name"]
}
tags = ("agent_name")
for tag in [x for x in item["tags"] if x not in tags]:
replacements[f"tags.{tag}"] = item["tags"][tag]



metrics_latest_updates.append(
pymongo.ReplaceOne(
replacements, item, upsert=True
)
)
except Exception:
raise Exception(item)

if last_time and (time.time() - float(last_time)) <= 120:
pass
else:

a.set("last_metric_{}".format(item["tags"]["ip"]), time.time())
"""
mongo_client["labyrinth"]["metrics"].insert_one(item)
"""
metrics_updates.append(
pymongo.InsertOne(
item
)
)

a.set("last_metric_{}".format(item["tags"]["ip"]), time.time())

# Bulk writes
if metrics_latest_updates:
mongo_client["labyrinth"]["metrics-latest"].bulk_write(metrics_latest_updates)

if metrics_updates:
mongo_client["labyrinth"]["metrics"].bulk_write(metrics_updates)



return len(metrics), 200

return "Success", 200


if __name__ == "__main__": # pragma: no cover
Expand All @@ -1784,6 +1852,9 @@ def insert_metric(inp=""):

if len(sys.argv) > 1 and sys.argv[1] == "watcher":
unwrap(dashboard)(report=True)
elif len(sys.argv) > 1 and sys.argv[1] == "updater":
with PidFile("labyrinth-bulk-insert") as p:
unwrap(bulk_insert)()
else:
app.debug = True
app.config["ENV"] = "development"
Expand Down
74 changes: 73 additions & 1 deletion backend/test/test_03_serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pytest
import serve

import redis


from common.test import unwrap, delete_keys_recursive

Expand All @@ -22,6 +24,10 @@ def tearDown():
serve.mongo_client["labyrinth"]["settings"].delete_many({})
serve.mongo_client["labyrinth"]["metrics"].delete_many({})

a = redis.Redis(host=os.environ.get("REDIS_HOST"))
for key in a.keys(pattern="METRIC-*"):
a.delete(key)


@pytest.fixture
def setup():
Expand Down Expand Up @@ -618,7 +624,7 @@ def test_insert_metric(setup):
"diskio": 884284,
},
"name": "check_hd",
"tags": {"host": "00-00-00-00-01", "ip": "172.19.0.6"},
"tags": {"host": "00-00-00-00-01", "ip": "172.19.0.6", "random-tag" : 5},
"timestamp": 1625683390,
},
]
Expand All @@ -632,6 +638,9 @@ def test_insert_metric(setup):
a = unwrap(serve.insert_metric)(sample_data)
assert a[1] == 200

# NOTE: We are changing to redis write cache layer

"""
b = serve.mongo_client["labyrinth"]["metrics-latest"].find({})
c = [x for x in b]
assert len(c) == 1
Expand All @@ -645,6 +654,69 @@ def test_insert_metric(setup):
][0][item].replace(microsecond=0, second=0)
else:
assert c[0][item] == sample_data["metrics"][0][item]
"""
a = redis.Redis(host=os.environ.get("REDIS_HOST"))

parsed_tags = {x : sample_data["metrics"][0]["tags"][x] for x in sample_data["metrics"][0]["tags"] if x != "random-tag"}

b = json.dumps({
"name" : sample_data["metrics"][0]["name"],
"tags" : parsed_tags
}, default=str)
print("test key:", b)
c = json.loads(a.get(f"METRIC-{b}"))
print(c)
del c["timestamp"]
del sample_data["metrics"][0]["timestamp"]
assert c == sample_data["metrics"][0]
return sample_data

def test_redis_bulk_insert(setup):
"""
Test for Redis bulk insert of metrics
- NOTE: I specifically do not use the dashboard to read from the Redis server directly, since we may need remote agents as well (who would not have access to this Redis instance)
"""
sample_data = {
"metrics": [
{
"fields": {
"boot_time": 1625587759,
"context_switches": 4143261228,
"entropy_avail": 3760,
"interrupts": 1578002983,
"diskio": 884284,
},
"name": "check_hd",
"tags": {"host": "00-00-00-00-01", "ip": "172.19.0.6"},
"timestamp": 1625683390,
},
]
}
a = unwrap(serve.insert_metric)(sample_data)
assert a[1] == 200

sample_data["metrics"][0]["tags"]["new_tag"] = 7
a = unwrap(serve.insert_metric)(sample_data)
assert a[1] == 200

sample_data["metrics"][0]["tags"]["new_tag"] = 234
a = unwrap(serve.insert_metric)(sample_data)
assert a[1] == 200



# Check the state of the metrics-latest beforehand
b = serve.mongo_client["labyrinth"]["metrics-latest"].find({})
current_length = len(list(b))


a = unwrap(serve.bulk_insert)()
assert a[1] == 200


b = serve.mongo_client["labyrinth"]["metrics-latest"].find({})
c = [True for x in b if "tags" in x and x["tags"]["host"] == "00-00-00-00-01"]
assert len(c) == 3


def test_list_dashboard(setup):
Expand Down
10 changes: 10 additions & 0 deletions cron/bulk_write.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/sh
cd /src
if [ -f .env ]; then
set -a;
source .env;
set +a;
fi

# MONGO_HOST=mongo REDIS_HOST=redis MONGO_USERNAME=root MONGO_PASSWORD=temp python3 serve.py watcher 2>&1
python3 serve.py updater 2>&1
1 change: 1 addition & 0 deletions cron/cron.d/crontab
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
* * * * * /bin/bash /cron/run.sh > /var/log/finder
* * * * * /bin/bash /cron/bulk_write.sh > /var/log/bulk_write
*/10 * * * * /bin/bash /cron/watcher.sh > /var/log/watcher
*/2 * * * * /bin/bash /cron/alive.sh > /var/log/alive

Expand Down
Loading