From 4281c5643bf7b81dacf3bc6de631110130aa28d8 Mon Sep 17 00:00:00 2001 From: Brendan Westley Date: Mon, 30 Sep 2024 01:39:24 -0500 Subject: [PATCH] Add packet helpers that use manifest --- rovecomm.py | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/rovecomm.py b/rovecomm.py index 0bd471d..965e10e 100644 --- a/rovecomm.py +++ b/rovecomm.py @@ -7,6 +7,7 @@ import logging import json from pathlib import Path +from functools import lru_cache ROVECOMM_UDP_PORT = 11000 ROVECOMM_TCP_PORT = 12000 @@ -57,6 +58,18 @@ "c": 1, } +types_manifest_to_byte = { + "INT8_T": "b", + "UINT8_T": "B", + "INT16_T": "h", + "UINT16_T": "H", + "INT32_T": "l", + "UINT32_T": "L", + "FLOAT_T": "f", + "DOUBLE_T": "d", + "CHAR": "c", +} + class RoveCommPacket: """ @@ -580,6 +593,7 @@ def read(self): return packets +@lru_cache(maxsize=None) def get_manifest(path=""): """ Grabs the json manifest file and returns it in dictionary form @@ -599,3 +613,86 @@ def get_manifest(path=""): manifest = json.loads(manifest) manifest = manifest["RovecommManifest"] return manifest + +@lru_cache(maxsize=None) +def data_ids(manifest_path=""): + """ + Compiles a list of data_ids + + Parameters: + ----------- + manifest_path - path to manifest file, uses the same default as get_manifest + + Returns: + {data_id: (board_name, packet_type, packet_name), ...} + """ + data_ids_ = {} + manifest = get_manifest(manifest_path) + for board_name, board_manifest in manifest.items(): + for packet_type_name in ("Commands", "Telemetry", "Error"): + if packet_type_name not in board_manifest: + continue + for packet_name, packet_manifest in board_manifest[packet_type_name].items(): + data_ids_[packet_manifest["dataId"]] = (board_name, packet_type_name, packet_name) + return data_ids_ + +def packet(board_name, packet_type, packet_name, data=(), ip=None, port=ROVECOMM_UDP_PORT, manifest_path=""): + """ + Sends a packet with information found in get_manifest(manifest_path) + + Parameters: + ----------- + board_name - board name + packet_type - "Commands", "Telemetry", or "Error" + packet_name - packet name + ip - ip to send the packet to, defaults to board ip + port - port to send the packet to, defaults to ROVECOMM_UDP_PORT + manifest_path - path to manifest file, uses the same default as get_manifest + Returns: + -------- + RoveCommPacket + """ + manifest = get_manifest(manifest_path) + board_manifest = manifest[board_name] + command_manifest = board_manifest[packet_type][packet_name] + if ip == None: + ip = board_manifest["Ip"] + return RoveCommPacket(command_manifest["dataId"], types_manifest_to_byte[command_manifest["dataType"]], data, ip, port) + +def decode_print(packet, manifest_path=""): + """ + Decode a packet using information found in get_manifest(manifest_path) before printing + + Parameters: + ----------- + packet - RoveCommPacket to decode + manifest_path - path to manifest file, uses the same default as get_manifest + Format: + ---------- + ID: _ + Board: _ + Packet Type: _ + Name: _ + Data Type: _ + Count: _ + IP: _ + Data: _ + ---------- + """ + data_ids_ = data_ids(manifest_path) + if packet.data_id in data_ids_: + board_name, packet_type_name, packet_name = data_ids_[packet.data_id] + else: + board_name = "Unknown" + packet_type_name = "Unknown" + packet_name = "Unknown" + print("----------") + print(f"ID: {packet.data_id}") + print(f"Board: {board_name}") + print(f"Packet Type: {packet_type_name}") + print(f"Name: {packet_name}") + print(f"Data Type: {packet.data_type}") + print(f"Count: {packet.data_count}") + print(f"IP: {packet.ip_address}") + print(f"Data: {packet.data}") + print("----------")