From 1fbabd8d5205e4bf21f31fe9342e5b2da6822b4d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 20 Jan 2022 00:08:02 +0100 Subject: [PATCH] Add TTS (The Things Stack) / TTN (The Things Network) decoder adapter This patch adds a basic decoder for TTS/TTN v3 Uplink Messages [1], submitted using Webhooks [2]. It can be improved by adding further commits. [1] https://www.thethingsindustries.com/docs/reference/data-formats/#uplink-messages [2] https://www.thethingsindustries.com/docs/integrations/webhooks/ --- CHANGES.rst | 1 + kotori/daq/decoder/__init__.py | 9 +++- kotori/daq/decoder/tts_ttn.py | 39 ++++++++++++++ kotori/daq/services/mig.py | 2 +- pytest.ini | 2 + test/test_tts_ttn.json | 99 ++++++++++++++++++++++++++++++++++ test/test_tts_ttn.py | 69 ++++++++++++++++++++++++ 7 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 kotori/daq/decoder/tts_ttn.py create mode 100644 test/test_tts_ttn.json create mode 100644 test/test_tts_ttn.py diff --git a/CHANGES.rst b/CHANGES.rst index a3393514..ec379d80 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -29,6 +29,7 @@ in progress ``name`` and ``title`` fields. - CI: Use Grafana 8.2.5 - Improve decoding fractional epoch timestamps +- Add TTS (The Things Stack) / TTN (The Things Network) decoder adapter .. _kotori-0.26.12: diff --git a/kotori/daq/decoder/__init__.py b/kotori/daq/decoder/__init__.py index 35b0e3fe..1a88b299 100644 --- a/kotori/daq/decoder/__init__.py +++ b/kotori/daq/decoder/__init__.py @@ -3,6 +3,7 @@ from kotori.daq.decoder.airrohr import AirrohrDecoder from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder from kotori.daq.decoder.schema import MessageType +from kotori.daq.decoder.tts_ttn import TheThingsStackDecoder class DecoderInfo: @@ -18,7 +19,7 @@ def __init__(self, topology): self.topology = topology self.info = DecoderInfo() - def probe(self): + def probe(self, payload: str = None): if 'slot' not in self.topology: return False @@ -41,4 +42,10 @@ def probe(self): self.info.decoder = TasmotaStateDecoder return True + # TTS/TTN: The Things Stack / The Things Network + if self.topology.slot.endswith('data.json') and "received_at" in payload and "uplink_message" in payload: + self.info.message_type = MessageType.DATA_CONTAINER + self.info.decoder = TheThingsStackDecoder + return True + return False diff --git a/kotori/daq/decoder/tts_ttn.py b/kotori/daq/decoder/tts_ttn.py new file mode 100644 index 00000000..ba127728 --- /dev/null +++ b/kotori/daq/decoder/tts_ttn.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# (c) 2022 Andreas Motl +import json +from collections import OrderedDict + + +class TheThingsStackDecoder: + """ + Decode JSON payloads in TTS-/TTN-webhook JSON format. + TTS/TTN means "The Things Stack" / "The Things Network". + + Documentation + ============= + - https://getkotori.org/docs/handbook/decoders/tts_ttn.html + - https://community.hiveeyes.org/t/more-data-acquisition-payload-formats-for-kotori/1421/2 + + Development + =========== + - https://community.hiveeyes.org/t/more-data-acquisition-payload-formats-for-kotori/1421 + - https://community.hiveeyes.org/t/ttn-daten-an-kotori-weiterleiten/1422/34 + """ + + @staticmethod + def decode(payload: str): + + # Decode from JSON. + message = json.loads(payload) + + # Use timestamp and decoded payload. + data = OrderedDict() + data["timestamp"] = message["received_at"] + data.update(message["uplink_message"]["decoded_payload"]) + + # TODO: Add more data / metadata. + # This is an implementation from scratch. It can be improved by + # cherry-picking more specific decoding routines from `ttnlogger`. + # https://github.com/daq-tools/ttnlogger + + return data diff --git a/kotori/daq/services/mig.py b/kotori/daq/services/mig.py index 2dcaf58d..3e6bd438 100644 --- a/kotori/daq/services/mig.py +++ b/kotori/daq/services/mig.py @@ -183,7 +183,7 @@ def decode_message(self, topic, payload): # Message can be handled by one of the device-specific decoders. decoder_manager = DecoderManager(topology) - if decoder_manager.probe(): + if decoder_manager.probe(payload): message.type = decoder_manager.info.message_type message.data = decoder_manager.info.decoder.decode(payload) return message diff --git a/pytest.ini b/pytest.ini index 4cd1dbbe..a88d6227 100644 --- a/pytest.ini +++ b/pytest.ini @@ -27,5 +27,7 @@ markers = homie: Tests specific to the Homie firmware framework. airrohr: Tests specific to Airrohr devices. weewx: Tests for WeeWX integration. + tts: Tests for TTS/TTN adapter. + ttn: Tests for TTS/TTN adapter. hiveeyes: Tests for vendor hiveeyes. legacy: Tests for legacy endpoints and such. diff --git a/test/test_tts_ttn.json b/test/test_tts_ttn.json new file mode 100644 index 00000000..0a17ab64 --- /dev/null +++ b/test/test_tts_ttn.json @@ -0,0 +1,99 @@ +{ + "end_device_ids": { + "device_id": "testdrive-foo-bar-baz", + "application_ids": { + "application_id": "acme" + }, + "dev_eui": "030E202BD5504B52", + "join_eui": "FB81CCAAE6D7B2F7", + "dev_addr": "4406826C" + }, + "correlation_ids": [ + "as:up:01FSSVG88HVN4WVXT5X75KG545", + "gs:conn:01FSRW4G4EK6JFFREN8CF04YAW", + "gs:up:host:01FSRW4G4NF8PN3HZP2MT482ED", + "gs:uplink:01FSSVG822Q4KP03T951ZFD9HZ", + "ns:uplink:01FSSVG8230BA2CGX4B5M6BQDJ", + "rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01FSSVG823BPTEQDJR7EQWA63E", + "rpc:/ttn.lorawan.v3.NsAs/HandleUplink:01FSSVG88GDGNTYP2X7MV3DXC9" + ], + "received_at": "2022-01-19T19:02:34.007345025Z", + "uplink_message": { + "session_key_id": "QR5vq9sPi2tZPtH8rDSjpA==", + "f_port": 1, + "f_cnt": 2289, + "frm_payload": "AQIXEAICFu0CZwAqAmidA2cAIgMCAV0=", + "decoded_payload": { + "analog_in_1": 59.04, + "analog_in_2": 58.69, + "analog_in_3": 3.49, + "relative_humidity_2": 78.5, + "temperature_2": 4.2, + "temperature_3": 3.4 + }, + "rx_metadata": [ + { + "gateway_ids": { + "gateway_id": "somewhere-ffp", + "eui": "FEA9BE611914F97D" + }, + "timestamp": 208623723, + "rssi": -107, + "channel_rssi": -107, + "snr": -6.5, + "location": { + "latitude": 52.21, + "longitude": 12.96, + "source": "SOURCE_REGISTRY" + }, + "uplink_token": "amLzqHY2UL1HQfYlwc+vByXUwkZ6FiwUwKh/5T29axuAfl+XOXnpMKzFBVPy9bAmyqvv6qxOsotx8qw=", + "channel_index": 2 + }, + { + "gateway_ids": { + "gateway_id": "elsewhere-ffp", + "eui": "3398CD3B764F4A14" + }, + "time": "2022-01-19T19:02:33.764357Z", + "timestamp": 26499475, + "rssi": -90, + "channel_rssi": -90, + "snr": 7, + "location": { + "latitude": 52.20, + "longitude": 12.99, + "altitude": 35, + "source": "SOURCE_REGISTRY" + }, + "uplink_token": "AaVQj9emrquiRtdWhDB9+PyZLJeCzJ6zB2ZDGahlF47vhW0vYdFuqUpVj4eqW1OlFwKZUYdH1ArOW6w=", + "channel_index": 2 + } + ], + "settings": { + "data_rate": { + "lora": { + "bandwidth": 125000, + "spreading_factor": 7 + } + }, + "coding_rate": "4/5", + "frequency": "868500000", + "timestamp": 208623723 + }, + "received_at": "2022-01-19T19:02:33.795669934Z", + "consumed_airtime": "0.077056s", + "locations": { + "user": { + "latitude": 52.2134, + "longitude": 12.9812, + "altitude": 35, + "source": "SOURCE_REGISTRY" + } + }, + "network_ids": { + "net_id": "000013", + "tenant_id": "ttn", + "cluster_id": "ttn-eu1" + } + } +} diff --git a/test/test_tts_ttn.py b/test/test_tts_ttn.py new file mode 100644 index 00000000..def5c918 --- /dev/null +++ b/test/test_tts_ttn.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# (c) 2022 Andreas Motl +""" +Test the TTS-/TTN-webhook receiver implementation. +TTS/TTN means "The Things Stack" / "The Things Network". + +Development +=========== +- https://community.hiveeyes.org/t/more-data-acquisition-payload-formats-for-kotori/1421 +- https://community.hiveeyes.org/t/ttn-daten-an-kotori-weiterleiten/1422/34 + +Usage +===== +:: + + source .venv/bin/activate + pytest -vvv -m ttn --capture=no +""" +import json +import logging +import os.path + +import pytest +import pytest_twisted +from twisted.internet import threads + +from test.settings.basic import settings, influx_sensors, create_influxdb, reset_influxdb, reset_grafana, PROCESS_DELAY_MQTT +from test.util import http_json_sensor, sleep + +logger = logging.getLogger(__name__) + + +# Define fixtures. +payload_file = os.path.join(os.path.dirname(__file__), "test_tts_ttn.json") +with open(payload_file, mode="r") as f: + data_in = json.load(f) + +data_out = { + "time": "2022-01-19T19:02:34.007345Z", + "analog_in_1": 59.04, + "analog_in_2": 58.69, + "analog_in_3": 3.49, + "relative_humidity_2": 78.5, + "temperature_2": 4.2, + "temperature_3": 3.4, +} + + +@pytest_twisted.inlineCallbacks +@pytest.mark.http +@pytest.mark.tts +@pytest.mark.ttn +def test_tts_ttn_http_json(machinery_basic, create_influxdb, reset_influxdb): + """ + Submit single reading in TTS/TTN webhook JSON format to HTTP API + and proof it is stored in the InfluxDB database. + """ + + # Submit a single measurement, without timestamp. + yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data_in) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB properly. + record = influx_sensors.get_first_record() + assert record == data_out + yield record