Skip to content

Commit

Permalink
Add TTS (The Things Stack) / TTN (The Things Network) decoder adapter
Browse files Browse the repository at this point in the history
This patch adds a basic decoder for TTS/TTN v3 Uplink Messages [1],
submitted using Webhooks [2]. It can be improved by adding further
commits.

[1] https://www.thethingsindustries.com/docs/reference/data-formats/#uplink-messages
[2] https://www.thethingsindustries.com/docs/integrations/webhooks/
  • Loading branch information
amotl committed Jan 19, 2022
1 parent 50b346d commit 1fbabd8
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ in progress
``name`` and ``title`` fields.
- CI: Use Grafana 8.2.5
- Improve decoding fractional epoch timestamps
- Add TTS (The Things Stack) / TTN (The Things Network) decoder adapter


.. _kotori-0.26.12:
Expand Down
9 changes: 8 additions & 1 deletion kotori/daq/decoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from kotori.daq.decoder.airrohr import AirrohrDecoder
from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder
from kotori.daq.decoder.schema import MessageType
from kotori.daq.decoder.tts_ttn import TheThingsStackDecoder


class DecoderInfo:
Expand All @@ -18,7 +19,7 @@ def __init__(self, topology):
self.topology = topology
self.info = DecoderInfo()

def probe(self):
def probe(self, payload: str = None):

if 'slot' not in self.topology:
return False
Expand All @@ -41,4 +42,10 @@ def probe(self):
self.info.decoder = TasmotaStateDecoder
return True

# TTS/TTN: The Things Stack / The Things Network
if self.topology.slot.endswith('data.json') and "received_at" in payload and "uplink_message" in payload:
self.info.message_type = MessageType.DATA_CONTAINER
self.info.decoder = TheThingsStackDecoder
return True

return False
39 changes: 39 additions & 0 deletions kotori/daq/decoder/tts_ttn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# (c) 2022 Andreas Motl <[email protected]>
import json
from collections import OrderedDict


class TheThingsStackDecoder:
"""
Decode JSON payloads in TTS-/TTN-webhook JSON format.
TTS/TTN means "The Things Stack" / "The Things Network".
Documentation
=============
- https://getkotori.org/docs/handbook/decoders/tts_ttn.html
- https://community.hiveeyes.org/t/more-data-acquisition-payload-formats-for-kotori/1421/2
Development
===========
- https://community.hiveeyes.org/t/more-data-acquisition-payload-formats-for-kotori/1421
- https://community.hiveeyes.org/t/ttn-daten-an-kotori-weiterleiten/1422/34
"""

@staticmethod
def decode(payload: str):

# Decode from JSON.
message = json.loads(payload)

# Use timestamp and decoded payload.
data = OrderedDict()
data["timestamp"] = message["received_at"]
data.update(message["uplink_message"]["decoded_payload"])

# TODO: Add more data / metadata.
# This is an implementation from scratch. It can be improved by
# cherry-picking more specific decoding routines from `ttnlogger`.
# https://github.com/daq-tools/ttnlogger

return data
2 changes: 1 addition & 1 deletion kotori/daq/services/mig.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def decode_message(self, topic, payload):

# Message can be handled by one of the device-specific decoders.
decoder_manager = DecoderManager(topology)
if decoder_manager.probe():
if decoder_manager.probe(payload):
message.type = decoder_manager.info.message_type
message.data = decoder_manager.info.decoder.decode(payload)
return message
Expand Down
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ markers =
homie: Tests specific to the Homie firmware framework.
airrohr: Tests specific to Airrohr devices.
weewx: Tests for WeeWX integration.
tts: Tests for TTS/TTN adapter.
ttn: Tests for TTS/TTN adapter.
hiveeyes: Tests for vendor hiveeyes.
legacy: Tests for legacy endpoints and such.
99 changes: 99 additions & 0 deletions test/test_tts_ttn.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"end_device_ids": {
"device_id": "testdrive-foo-bar-baz",
"application_ids": {
"application_id": "acme"
},
"dev_eui": "030E202BD5504B52",
"join_eui": "FB81CCAAE6D7B2F7",
"dev_addr": "4406826C"
},
"correlation_ids": [
"as:up:01FSSVG88HVN4WVXT5X75KG545",
"gs:conn:01FSRW4G4EK6JFFREN8CF04YAW",
"gs:up:host:01FSRW4G4NF8PN3HZP2MT482ED",
"gs:uplink:01FSSVG822Q4KP03T951ZFD9HZ",
"ns:uplink:01FSSVG8230BA2CGX4B5M6BQDJ",
"rpc:/ttn.lorawan.v3.GsNs/HandleUplink:01FSSVG823BPTEQDJR7EQWA63E",
"rpc:/ttn.lorawan.v3.NsAs/HandleUplink:01FSSVG88GDGNTYP2X7MV3DXC9"
],
"received_at": "2022-01-19T19:02:34.007345025Z",
"uplink_message": {
"session_key_id": "QR5vq9sPi2tZPtH8rDSjpA==",
"f_port": 1,
"f_cnt": 2289,
"frm_payload": "AQIXEAICFu0CZwAqAmidA2cAIgMCAV0=",
"decoded_payload": {
"analog_in_1": 59.04,
"analog_in_2": 58.69,
"analog_in_3": 3.49,
"relative_humidity_2": 78.5,
"temperature_2": 4.2,
"temperature_3": 3.4
},
"rx_metadata": [
{
"gateway_ids": {
"gateway_id": "somewhere-ffp",
"eui": "FEA9BE611914F97D"
},
"timestamp": 208623723,
"rssi": -107,
"channel_rssi": -107,
"snr": -6.5,
"location": {
"latitude": 52.21,
"longitude": 12.96,
"source": "SOURCE_REGISTRY"
},
"uplink_token": "amLzqHY2UL1HQfYlwc+vByXUwkZ6FiwUwKh/5T29axuAfl+XOXnpMKzFBVPy9bAmyqvv6qxOsotx8qw=",
"channel_index": 2
},
{
"gateway_ids": {
"gateway_id": "elsewhere-ffp",
"eui": "3398CD3B764F4A14"
},
"time": "2022-01-19T19:02:33.764357Z",
"timestamp": 26499475,
"rssi": -90,
"channel_rssi": -90,
"snr": 7,
"location": {
"latitude": 52.20,
"longitude": 12.99,
"altitude": 35,
"source": "SOURCE_REGISTRY"
},
"uplink_token": "AaVQj9emrquiRtdWhDB9+PyZLJeCzJ6zB2ZDGahlF47vhW0vYdFuqUpVj4eqW1OlFwKZUYdH1ArOW6w=",
"channel_index": 2
}
],
"settings": {
"data_rate": {
"lora": {
"bandwidth": 125000,
"spreading_factor": 7
}
},
"coding_rate": "4/5",
"frequency": "868500000",
"timestamp": 208623723
},
"received_at": "2022-01-19T19:02:33.795669934Z",
"consumed_airtime": "0.077056s",
"locations": {
"user": {
"latitude": 52.2134,
"longitude": 12.9812,
"altitude": 35,
"source": "SOURCE_REGISTRY"
}
},
"network_ids": {
"net_id": "000013",
"tenant_id": "ttn",
"cluster_id": "ttn-eu1"
}
}
}
69 changes: 69 additions & 0 deletions test/test_tts_ttn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# (c) 2022 Andreas Motl <[email protected]>
"""
Test the TTS-/TTN-webhook receiver implementation.
TTS/TTN means "The Things Stack" / "The Things Network".
Development
===========
- https://community.hiveeyes.org/t/more-data-acquisition-payload-formats-for-kotori/1421
- https://community.hiveeyes.org/t/ttn-daten-an-kotori-weiterleiten/1422/34
Usage
=====
::
source .venv/bin/activate
pytest -vvv -m ttn --capture=no
"""
import json
import logging
import os.path

import pytest
import pytest_twisted
from twisted.internet import threads

from test.settings.basic import settings, influx_sensors, create_influxdb, reset_influxdb, reset_grafana, PROCESS_DELAY_MQTT
from test.util import http_json_sensor, sleep

logger = logging.getLogger(__name__)


# Define fixtures.
payload_file = os.path.join(os.path.dirname(__file__), "test_tts_ttn.json")
with open(payload_file, mode="r") as f:
data_in = json.load(f)

data_out = {
"time": "2022-01-19T19:02:34.007345Z",
"analog_in_1": 59.04,
"analog_in_2": 58.69,
"analog_in_3": 3.49,
"relative_humidity_2": 78.5,
"temperature_2": 4.2,
"temperature_3": 3.4,
}


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.tts
@pytest.mark.ttn
def test_tts_ttn_http_json(machinery_basic, create_influxdb, reset_influxdb):
"""
Submit single reading in TTS/TTN webhook JSON format to HTTP API
and proof it is stored in the InfluxDB database.
"""

# Submit a single measurement, without timestamp.
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data_in)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB properly.
record = influx_sensors.get_first_record()
assert record == data_out
yield record

0 comments on commit 1fbabd8

Please sign in to comment.