forked from KalicoCrew/kalico
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'upstream/master'
- Loading branch information
Showing
12 changed files
with
137 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
# Protocol definitions for firmware communication | ||
# | ||
# Copyright (C) 2016-2021 Kevin O'Connor <[email protected]> | ||
# Copyright (C) 2016-2024 Kevin O'Connor <[email protected]> | ||
# | ||
# This file may be distributed under the terms of the GNU GPLv3 license. | ||
import json, zlib, logging | ||
|
@@ -199,8 +199,8 @@ def convert_msg_format(msgformat): | |
|
||
|
||
class MessageFormat: | ||
def __init__(self, msgid, msgformat, enumerations={}): | ||
self.msgid = msgid | ||
def __init__(self, msgid_bytes, msgformat, enumerations={}): | ||
self.msgid_bytes = msgid_bytes | ||
self.msgformat = msgformat | ||
self.debugformat = convert_msg_format(msgformat) | ||
self.name = msgformat.split()[0] | ||
|
@@ -209,21 +209,19 @@ def __init__(self, msgid, msgformat, enumerations={}): | |
self.name_to_type = dict(self.param_names) | ||
|
||
def encode(self, params): | ||
out = [] | ||
out.append(self.msgid) | ||
out = list(self.msgid_bytes) | ||
for i, t in enumerate(self.param_types): | ||
t.encode(out, params[i]) | ||
return out | ||
|
||
def encode_by_name(self, **params): | ||
out = [] | ||
out.append(self.msgid) | ||
out = list(self.msgid_bytes) | ||
for name, t in self.param_names: | ||
t.encode(out, params[name]) | ||
return out | ||
|
||
def parse(self, s, pos): | ||
pos += 1 | ||
pos += len(self.msgid_bytes) | ||
out = {} | ||
for name, t in self.param_names: | ||
v, pos = t.parse(s, pos) | ||
|
@@ -243,14 +241,14 @@ def format_params(self, params): | |
class OutputFormat: | ||
name = "#output" | ||
|
||
def __init__(self, msgid, msgformat): | ||
self.msgid = msgid | ||
def __init__(self, msgid_bytes, msgformat): | ||
self.msgid_bytes = msgid_bytes | ||
self.msgformat = msgformat | ||
self.debugformat = convert_msg_format(msgformat) | ||
self.param_types = lookup_output_params(msgformat) | ||
|
||
def parse(self, s, pos): | ||
pos += 1 | ||
pos += len(self.msgid_bytes) | ||
out = [] | ||
for t in self.param_types: | ||
v, pos = t.parse(s, pos) | ||
|
@@ -268,7 +266,7 @@ class UnknownFormat: | |
name = "#unknown" | ||
|
||
def parse(self, s, pos): | ||
msgid = s[pos] | ||
msgid, param_pos = PT_int32().parse(s, pos) | ||
msg = bytes(bytearray(s)) | ||
return {"#msgid": msgid, "#msg": msg}, len(s) - MESSAGE_TRAILER_SIZE | ||
|
||
|
@@ -286,7 +284,8 @@ def __init__(self, warn_prefix=""): | |
self.messages = [] | ||
self.messages_by_id = {} | ||
self.messages_by_name = {} | ||
self.msgtag_by_format = {} | ||
self.msgid_by_format = {} | ||
self.msgid_parser = PT_int32() | ||
self.config = {} | ||
self.version = self.build_versions = "" | ||
self.raw_identify_data = "" | ||
|
@@ -321,7 +320,7 @@ def dump(self, s): | |
out = ["seq: %02x" % (msgseq,)] | ||
pos = MESSAGE_HEADER_SIZE | ||
while 1: | ||
msgid = s[pos] | ||
msgid, param_pos = self.msgid_parser.parse(s, pos) | ||
mid = self.messages_by_id.get(msgid, self.unknown) | ||
params, pos = mid.parse(s, pos) | ||
out.append(mid.format_params(params)) | ||
|
@@ -340,15 +339,15 @@ def format_params(self, params): | |
return str(params) | ||
|
||
def parse(self, s): | ||
msgid = s[MESSAGE_HEADER_SIZE] | ||
msgid, param_pos = self.msgid_parser.parse(s, MESSAGE_HEADER_SIZE) | ||
mid = self.messages_by_id.get(msgid, self.unknown) | ||
params, pos = mid.parse(s, MESSAGE_HEADER_SIZE) | ||
if pos != len(s) - MESSAGE_TRAILER_SIZE: | ||
self._error("Extra data at end of message") | ||
params["#name"] = mid.name | ||
return params | ||
|
||
def encode(self, seq, cmd): | ||
def encode_msgblock(self, seq, cmd): | ||
msglen = MESSAGE_MIN + len(cmd) | ||
seq = (seq & MESSAGE_SEQ_MASK) | MESSAGE_DEST | ||
out = [msglen, seq] + cmd | ||
|
@@ -377,11 +376,11 @@ def lookup_command(self, msgformat): | |
self._error("Command format mismatch: %s vs %s", msgformat, mp.msgformat) | ||
return mp | ||
|
||
def lookup_msgtag(self, msgformat): | ||
msgtag = self.msgtag_by_format.get(msgformat) | ||
if msgtag is None: | ||
def lookup_msgid(self, msgformat): | ||
msgid = self.msgid_by_format.get(msgformat) | ||
if msgid is None: | ||
self._error("Unknown command: %s", msgformat) | ||
return msgtag | ||
return msgid | ||
|
||
def create_command(self, msg): | ||
parts = msg.strip().split() | ||
|
@@ -435,22 +434,23 @@ def fill_enumerations(self, enumerations): | |
for i in range(count): | ||
enums[enum_root + str(start_enum + i)] = start_value + i | ||
|
||
def _init_messages(self, messages, command_tags=[], output_tags=[]): | ||
for msgformat, msgtag in messages.items(): | ||
def _init_messages(self, messages, command_ids=[], output_ids=[]): | ||
for msgformat, msgid in messages.items(): | ||
msgtype = "response" | ||
if msgtag in command_tags: | ||
if msgid in command_ids: | ||
msgtype = "command" | ||
elif msgtag in output_tags: | ||
elif msgid in output_ids: | ||
msgtype = "output" | ||
self.messages.append((msgtag, msgtype, msgformat)) | ||
if msgtag < -32 or msgtag > 95: | ||
self._error("Multi-byte msgtag not supported") | ||
self.msgtag_by_format[msgformat] = msgtag | ||
msgid = msgtag & 0x7F | ||
self.messages.append((msgid, msgtype, msgformat)) | ||
self.msgid_by_format[msgformat] = msgid | ||
msgid_bytes = [] | ||
self.msgid_parser.encode(msgid_bytes, msgid) | ||
if msgtype == "output": | ||
self.messages_by_id[msgid] = OutputFormat(msgid, msgformat) | ||
self.messages_by_id[msgid] = OutputFormat( | ||
msgid_bytes, msgformat | ||
) | ||
else: | ||
msg = MessageFormat(msgid, msgformat, self.enumerations) | ||
msg = MessageFormat(msgid_bytes, msgformat, self.enumerations) | ||
self.messages_by_id[msgid] = msg | ||
self.messages_by_name[msg.name] = msg | ||
|
||
|
Oops, something went wrong.