Skip to content

Commit

Permalink
Merge pull request #12 from bwestley/dev
Browse files Browse the repository at this point in the history
Add packet helpers that use manifest
  • Loading branch information
bwestley authored Oct 1, 2024
2 parents 18ac030 + 4281c56 commit d4b5e9f
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions rovecomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import json
from pathlib import Path
from functools import lru_cache

ROVECOMM_UDP_PORT = 11000
ROVECOMM_TCP_PORT = 12000
Expand Down Expand Up @@ -57,6 +58,18 @@
"c": 1,
}

types_manifest_to_byte = {
"INT8_T": "b",
"UINT8_T": "B",
"INT16_T": "h",
"UINT16_T": "H",
"INT32_T": "l",
"UINT32_T": "L",
"FLOAT_T": "f",
"DOUBLE_T": "d",
"CHAR": "c",
}


class RoveCommPacket:
"""
Expand Down Expand Up @@ -580,6 +593,7 @@ def read(self):
return packets


@lru_cache(maxsize=None)
def get_manifest(path=""):
"""
Grabs the json manifest file and returns it in dictionary form
Expand All @@ -599,3 +613,86 @@ def get_manifest(path=""):
manifest = json.loads(manifest)
manifest = manifest["RovecommManifest"]
return manifest

@lru_cache(maxsize=None)
def data_ids(manifest_path=""):
"""
Compiles a list of data_ids
Parameters:
-----------
manifest_path - path to manifest file, uses the same default as get_manifest
Returns:
{data_id: (board_name, packet_type, packet_name), ...}
"""
data_ids_ = {}
manifest = get_manifest(manifest_path)
for board_name, board_manifest in manifest.items():
for packet_type_name in ("Commands", "Telemetry", "Error"):
if packet_type_name not in board_manifest:
continue
for packet_name, packet_manifest in board_manifest[packet_type_name].items():
data_ids_[packet_manifest["dataId"]] = (board_name, packet_type_name, packet_name)
return data_ids_

def packet(board_name, packet_type, packet_name, data=(), ip=None, port=ROVECOMM_UDP_PORT, manifest_path=""):
"""
Sends a packet with information found in get_manifest(manifest_path)
Parameters:
-----------
board_name - board name
packet_type - "Commands", "Telemetry", or "Error"
packet_name - packet name
ip - ip to send the packet to, defaults to board ip
port - port to send the packet to, defaults to ROVECOMM_UDP_PORT
manifest_path - path to manifest file, uses the same default as get_manifest
Returns:
--------
RoveCommPacket
"""
manifest = get_manifest(manifest_path)
board_manifest = manifest[board_name]
command_manifest = board_manifest[packet_type][packet_name]
if ip == None:
ip = board_manifest["Ip"]
return RoveCommPacket(command_manifest["dataId"], types_manifest_to_byte[command_manifest["dataType"]], data, ip, port)

def decode_print(packet, manifest_path=""):
"""
Decode a packet using information found in get_manifest(manifest_path) before printing
Parameters:
-----------
packet - RoveCommPacket to decode
manifest_path - path to manifest file, uses the same default as get_manifest
Format:
----------
ID: _
Board: _
Packet Type: _
Name: _
Data Type: _
Count: _
IP: _
Data: _
----------
"""
data_ids_ = data_ids(manifest_path)
if packet.data_id in data_ids_:
board_name, packet_type_name, packet_name = data_ids_[packet.data_id]
else:
board_name = "Unknown"
packet_type_name = "Unknown"
packet_name = "Unknown"
print("----------")
print(f"ID: {packet.data_id}")
print(f"Board: {board_name}")
print(f"Packet Type: {packet_type_name}")
print(f"Name: {packet_name}")
print(f"Data Type: {packet.data_type}")
print(f"Count: {packet.data_count}")
print(f"IP: {packet.ip_address}")
print(f"Data: {packet.data}")
print("----------")

0 comments on commit d4b5e9f

Please sign in to comment.