From 1443abac789f104a034983361dd1913138157213 Mon Sep 17 00:00:00 2001 From: Maaike Date: Mon, 30 Sep 2024 22:23:22 +0200 Subject: [PATCH] process message in batches --- .../mqtt_metrics_collector.py | 95 +++++++++++++------ 1 file changed, 64 insertions(+), 31 deletions(-) diff --git a/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py b/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py index e57c2b5ca..b0e2afa2f 100644 --- a/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py +++ b/wis2box-mqtt-metrics-collector/mqtt_metrics_collector.py @@ -31,11 +31,17 @@ import json import time +from threading import Lock +from threading import Thread + from prometheus_client import start_http_server, Counter, Gauge # de-register default-collectors from prometheus_client import REGISTRY, PROCESS_COLLECTOR, PLATFORM_COLLECTOR +message_buffer = [] +buffer_lock = Lock() + REGISTRY.unregister(PROCESS_COLLECTOR) REGISTRY.unregister(PLATFORM_COLLECTOR) @@ -160,39 +166,61 @@ def sub_mqtt_metrics(client, userdata, msg): if str(msg.topic).startswith('$SYS'): return - m = json.loads(msg.payload.decode('utf-8')) - if str(msg.topic).startswith('wis2box/stations'): - update_stations_gauge(m['station_list']) - elif str(msg.topic).startswith('wis2box/notifications'): - wsi = 'none' - if 'wigos_station_identifier' in m['properties']: - wsi = m['properties']['wigos_station_identifier'] - # if label wsi is not in notify_wsi_total, set to 0 and sleep 5s - if wsi not in notify_wsi_total._metrics: - logger.info(f"new station: {wsi}, sleep 5s before incrementing") - notify_wsi_total.labels(wsi).inc(0) + with buffer_lock: + message_buffer.append((msg.topic, msg)) + # Process buffered messages if buffer size reaches a threshold + if len(message_buffer) >= 10: # Adjust this threshold as needed + process_buffered_messages() + + +def process_buffered_messages(): + global message_buffer + + with buffer_lock: + messages_to_process = message_buffer + message_buffer = [] + + for topic, msg in messages_to_process: + m = json.loads(msg.payload.decode('utf-8')) + if str(topic).startswith('wis2box/stations'): + update_stations_gauge(m['station_list']) + elif str(topic).startswith('wis2box/notifications'): + wsi = 'none' + if 'wigos_station_identifier' in m['properties']: + wsi = m['properties']['wigos_station_identifier'] + # if label wsi is not in notify_wsi_total, set to 0 and sleep 5s + if wsi not in notify_wsi_total._metrics: + logger.info(f"new station={wsi}, sleep 5s before incrementing") + notify_wsi_total.labels(wsi).inc(0) + failure_wsi_total.labels(wsi).inc(0) + station_wsi.labels(wsi).set(1) + time.sleep(5) + notify_wsi_total.labels(wsi).inc(1) failure_wsi_total.labels(wsi).inc(0) station_wsi.labels(wsi).set(1) - time.sleep(5) - notify_wsi_total.labels(wsi).inc(1) - failure_wsi_total.labels(wsi).inc(0) - station_wsi.labels(wsi).set(1) - notify_total.inc(1) - elif str(msg.topic).startswith('wis2box/failure'): - descr = m['description'] if 'description' in m else 'none' - wsi = 'none' - if 'wigos_station_identifier' in m: - wsi = m['wigos_station_identifier'] - failure_descr_wsi_total.labels(descr, wsi).inc(1) - notify_wsi_total.labels(wsi).inc(0) - failure_wsi_total.labels(wsi).inc(1) - station_wsi.labels(wsi).set(1) - failure_total.inc(1) - elif str(msg.topic).startswith('wis2box/storage'): - if str(m["Key"]).startswith('wis2box-incoming'): - storage_incoming_total.inc(1) - if str(m["Key"]).startswith('wis2box-public'): - storage_public_total.inc(1) + notify_total.inc(1) + elif str(topic).startswith('wis2box/failure'): + descr = m['description'] if 'description' in m else 'none' + wsi = 'none' + if 'wigos_station_identifier' in m: + wsi = m['wigos_station_identifier'] + failure_descr_wsi_total.labels(descr, wsi).inc(1) + notify_wsi_total.labels(wsi).inc(0) + failure_wsi_total.labels(wsi).inc(1) + station_wsi.labels(wsi).set(1) + failure_total.inc(1) + elif str(topic).startswith('wis2box/storage'): + if str(m["Key"]).startswith('wis2box-incoming'): + storage_incoming_total.inc(1) + if str(m["Key"]).startswith('wis2box-public'): + storage_public_total.inc(1) + + +# Call this function periodically, e.g., in a separate thread +def periodic_buffer_processing(): + while True: + process_buffered_messages() + time.sleep(1) # Adjust sleep time as needed def gather_mqtt_metrics(): @@ -226,7 +254,12 @@ def gather_mqtt_metrics(): def main(): start_http_server(8001) + init_stations_gauge() + + # Start periodic buffer processing + Thread(target=periodic_buffer_processing, daemon=True).start() + gather_mqtt_metrics()