diff --git a/gateway/gateway.py b/gateway/gateway.py index 7e93f74..031ae2e 100644 --- a/gateway/gateway.py +++ b/gateway/gateway.py @@ -12,6 +12,7 @@ import flask import paho.mqtt.client as mqtt import werkzeug.security as wsecurity +import structlog from urllib.parse import urlparse import os @@ -19,16 +20,62 @@ import time import queue import functools - import logging -logging.basicConfig() -log_level = os.environ.get('LOGLEVEL', 'info') -level = getattr(logging, log_level.upper()) +import uuid + +def make_id(): + import datetime + import uuid + + t = datetime.datetime.now().strftime('%Y%m%d-%H%M') + u = str(uuid.uuid4())[0:4] + id = f'{t}-{u}' + return id + +def configure_logging(log_path, filename=None): + + if not os.path.exists(log_path): + os.makedirs(log_path) + + if filename is None: + filename = f'{make_id()}.gateway.log' + + key_order = ['door', 'method', 'path', 'user', 'request_id', 'topic', 'payload'] + + # FIXME: add timestamping + structlog.configure( + processors=[ + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], + logger_factory=structlog.stdlib.LoggerFactory(), + ) + + formatter = structlog.stdlib.ProcessorFormatter( + processor=structlog.dev.ConsoleRenderer(), + ) + + file_formatter = structlog.stdlib.ProcessorFormatter( + structlog.processors.KeyValueRenderer(key_order=key_order, drop_missing=True) + ) + + file = logging.FileHandler(filename) + stdout = logging.StreamHandler() + + stdout.setFormatter(formatter) + file.setFormatter(file_formatter) -log_mqtt = logging.getLogger('mqtt') -log_mqtt.setLevel(level) -log = logging.getLogger('gateway') -log.setLevel(level) + logging.basicConfig( + format="%(message)s", + level=logging.DEBUG, + handlers=[file, stdout], + ) + + +log_path = os.environ.get('DLOCK_LOG_PATH', 'logs') + +configure_logging(log_path) + +log = structlog.get_logger() class DoorBoltStatus: @@ -58,13 +105,13 @@ def check_match(self, topic, data): self.queue.put((topic, data)) return match except Exception as e: - log_mqtt.warning('MesssageWaiter: Exception match check: {}'.format(e)) + log.warning('MesssageWaiter: Exception match check: {}'.format(e)) def mqtt_message_received(client, u, message): try: mqtt_handle_message(client, u, message) except Exception as e: - log_mqtt.exception('Failed to handle message %: %s '.format(message.topic, message.payload)) + log.exception('Failed to handle message %: %s '.format(message.topic, message.payload)) def door_id_from_mqtt(prefix): door_id = None @@ -73,8 +120,15 @@ def door_id_from_mqtt(prefix): door_id = d return door_id + +def elide(data, length=30, suffix=b'...'): + return (data[:length] + suffix) if len(data) > length else data + def mqtt_handle_message(client, u, message): - log_mqtt.debug('received {}: {}'.format(message.topic, message.payload)) + + start_time = time.time() + matches = [] + door = door_from_topic(message.topic) if message.topic == 'fbp': # Heartbeat messages @@ -82,8 +136,8 @@ def mqtt_handle_message(client, u, message): m = json.loads(d) device = m['payload']['role'] + door = door_id_from_mqtt(device) t = time.time() - log_mqtt.debug('saw device {} at {}'.format(device, t)) m['time_received'] = t # Persist so we can query for device status @@ -105,24 +159,35 @@ def mqtt_handle_message(client, u, message): else: # Check responses - matches = [] for waiter in mqtt_message_waiters: m = waiter.check_match(message.topic, message.payload.decode('utf8')) if m: matches.append(m) - if len(matches) == 0: - log_mqtt.debug('No matches for message on: {}. {} waiters'.format(message.topic, len(mqtt_message_waiters))) + + end_time = time.time() + + duration = 1000.0*(end_time-start_time) + payload = elide(message.payload) + + log.info('mqtt-receive', + door=door, + topic=message.topic, + payload=payload, + duration_ms=duration, + matches=len(matches), + waiters=len(mqtt_message_waiters), + ) def mqtt_subscribed(client, u, mid, granted_qos): - log_mqtt.info('subscribed') + log.info('mqtt-subscribed', mid=mid) def mqtt_disconnected(client, u, rc): - log_mqtt.info('disconnected: {}'.format(rc)) + log.info('mqtt-disconnected', rc=rc) # client automatically handles reconnect def mqtt_connected(client, u, f, rc): - log_mqtt.info('connected') + log.info('mqtt-connected', rc=rc) subscriptions = [ ('fbp', 0), ] @@ -135,7 +200,6 @@ def mqtt_connected(client, u, f, rc): subscriptions.append((topic, 0)) client.subscribe(subscriptions) - log_mqtt.info('subscribe()') def setup_mqtt_client(): broker_url = os.environ.get('MSGFLO_BROKER', 'mqtt://localhost') @@ -147,7 +211,6 @@ def setup_mqtt_client(): client.on_subscribe = mqtt_subscribed client.connect(host, port, 60) - log_mqtt.info('connect() done') return client def create_mqtt_client(broker_url): @@ -178,7 +241,10 @@ def mqtt_send(topic, payload): client = mqtt_client client.publish(topic, payload) - log_mqtt.debug('sent {}: {}'.format(topic, payload)) + + door = door_from_topic(topic) + # FIXME: also provide request_id + log.info('mqtt-send', door=door, topic=topic, payload=payload) def seen_since(messages, time : float): devices = {} @@ -269,11 +335,81 @@ def __init__(self, mqtt_prefix, bolt_sensor=False): } api_users = {} + +def door_from_path(path): + # /doors/virtual-1/unlock?... + if not path.startswith('/doors'): + return None + + tok = path.split('/') + door = tok[2] + return door + +def door_from_topic(topic): + # doors/device-100/unlock + #m = re.match(r"doors\/(.*)\/.*") + tok = topic.split('/') + if len(tok) <= 2: + return None + prefix = '/'.join(tok[0:2]) + door = door_id_from_mqtt(prefix) + return door + +def request_log_params(): + r, g = flask.request, flask.g + + user = None if not r.authorization else r.authorization.username + query = f'{r.path}?{r.query_string.decode("utf-8")}' + door = door_from_path(r.path) + + log_params = dict( + request_id=g.request_id, + method=r.method, + path=query, + user=user, + door=door, + ) + return log_params + + +## Logging helpers +def log_request(sender, **extra): + r, g = flask.request, flask.g + + # add info + g.request_id = r.headers.get('X-Request-Id', str(uuid.uuid4())) + g.request_start_time = time.time() + + log_params = request_log_params() + log_params.update(dict( + )) + + log.info('http-request-start', **log_params) + +def log_response(sender, response, **extra): + r, g = flask.request, flask.g + + g.request_end_time = time.time() + duration = 1000.0 * (flask.g.request_end_time - flask.g.request_start_time) + + log_params = request_log_params() + log_params.update(dict( + status_code=response.status_code, + duration_ms=duration, + )) + + log.info('http-request-end', **log_params) + + +flask.request_started.connect(log_request, app) +flask.request_finished.connect(log_response, app) + + ## System functionality # No auth @app.route('/') def index(): - return 'UnlockOslo device gateway' + return 'UnlockOslo device gateway' @app.route('/status') @@ -457,8 +593,8 @@ def main(): port = os.environ.get('PORT', 5000) ip = os.environ.get('INTERFACE', '127.0.0.1') - server = gevent.pywsgi.WSGIServer((ip, port), app) - log.info('Gateway running on {}:{}'.format(ip, port)) + server = gevent.pywsgi.WSGIServer((ip, port), app, log=None) + log.info('started', ip=ip, port=port) server.serve_forever() if __name__ == "__main__": diff --git a/gateway/requirements.txt b/gateway/requirements.txt index 73083f3..8284a1e 100644 --- a/gateway/requirements.txt +++ b/gateway/requirements.txt @@ -1,3 +1,4 @@ flask>=0.12 gevent>=1.2 paho-mqtt>=1.3 +structlog>=19.2.0