Skip to content

Commit

Permalink
Improve speed of TRCReader
Browse files Browse the repository at this point in the history
Rewrote some lines in the TRCReader to optimize for speed, mainly for TRC files v2.x. According to cProfile it's double as fast now.
  • Loading branch information
lebuni authored and Adrian Immer committed Nov 26, 2024
1 parent 9a766ce commit cea7409
Showing 1 changed file with 37 additions and 46 deletions.
83 changes: 37 additions & 46 deletions can/io/trc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import os
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Any, Callable, Dict, Generator, List, Optional, TextIO, Union
from typing import Any, Callable, Dict, Generator, Optional, TextIO, Tuple, Union

from ..message import Message
from ..typechecking import StringPathLike
from ..util import channel2int, dlc2len, len2dlc
from .generic import (
TextIOMessageReader,
TextIOMessageWriter,
)
from ..util import channel2int, len2dlc
from .generic import TextIOMessageReader, TextIOMessageWriter

logger = logging.getLogger("can.io.trc")

Expand Down Expand Up @@ -58,13 +55,22 @@ def __init__(
"""
super().__init__(file, mode="r")
self.file_version = TRCFileVersion.UNKNOWN
self.start_time: Optional[datetime] = None
self._start_time: float = 0
self.columns: Dict[str, int] = {}
self._num_columns = -1

if not self.file:
raise ValueError("The given file cannot be None")

self._parse_cols: Callable[[List[str]], Optional[Message]] = lambda x: None
self._parse_cols: Callable[[Tuple[str, ...]], Optional[Message]] = (
lambda x: None
)

@property
def start_time(self) -> Optional[datetime]:
if self._start_time:
return datetime.fromtimestamp(self._start_time, timezone.utc)
return None

def _extract_header(self):
line = ""
Expand All @@ -89,16 +95,18 @@ def _extract_header(self):
elif line.startswith(";$STARTTIME"):
logger.debug("TRCReader: Found start time '%s'", line)
try:
self.start_time = datetime(
1899, 12, 30, tzinfo=timezone.utc
) + timedelta(days=float(line.split("=")[1]))
self._start_time = (
datetime(1899, 12, 30, tzinfo=timezone.utc)
+ timedelta(days=float(line.split("=")[1]))
).timestamp()
except IndexError:
logger.debug("TRCReader: Failed to parse start time")
elif line.startswith(";$COLUMNS"):
logger.debug("TRCReader: Found columns '%s'", line)
try:
columns = line.split("=")[1].split(",")
self.columns = {column: columns.index(column) for column in columns}
self._num_columns = len(columns) - 1
except IndexError:
logger.debug("TRCReader: Failed to parse columns")
elif line.startswith(";"):
Expand All @@ -107,7 +115,7 @@ def _extract_header(self):
break

if self.file_version >= TRCFileVersion.V1_1:
if self.start_time is None:
if self._start_time is None:
raise ValueError("File has no start time information")

if self.file_version >= TRCFileVersion.V2_0:
Expand All @@ -132,7 +140,7 @@ def _extract_header(self):

return line

def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_0(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[2]
if arbit_id == "FFFFFFFF":
logger.info("TRCReader: Dropping bus info line")
Expand All @@ -147,16 +155,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)])
return msg

def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[3]

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[1]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[1]) / 1000 + self._start_time
msg.arbitration_id = int(arbit_id, 16)
msg.is_extended_id = len(arbit_id) > 4
msg.channel = 1
Expand All @@ -165,16 +168,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
msg.is_rx = cols[2] == "Rx"
return msg

def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]:
arbit_id = cols[4]

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[1]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[1]) / 1000 + self._start_time
msg.arbitration_id = int(arbit_id, 16)
msg.is_extended_id = len(arbit_id) > 4
msg.channel = int(cols[2])
Expand All @@ -183,7 +181,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
msg.is_rx = cols[3] == "Rx"
return msg

def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_msg_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]:
type_ = cols[self.columns["T"]]
bus = self.columns.get("B", None)

Expand All @@ -192,50 +190,43 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
dlc = len2dlc(length)
elif "L" in self.columns:
dlc = int(cols[self.columns["L"]])
length = dlc2len(dlc)
else:
raise ValueError("No length/dlc columns present.")

msg = Message()
if isinstance(self.start_time, datetime):
msg.timestamp = (
self.start_time + timedelta(milliseconds=float(cols[self.columns["O"]]))
).timestamp()
else:
msg.timestamp = float(cols[1]) / 1000
msg.timestamp = float(cols[self.columns["O"]]) / 1000 + self._start_time
msg.arbitration_id = int(cols[self.columns["I"]], 16)
msg.is_extended_id = len(cols[self.columns["I"]]) > 4
msg.channel = int(cols[bus]) if bus is not None else 1
msg.dlc = dlc
msg.data = bytearray(
[int(cols[i + self.columns["D"]], 16) for i in range(length)]
)
if dlc:
msg.data = bytearray.fromhex(cols[self.columns["D"]])
msg.is_rx = cols[self.columns["d"]] == "Rx"
msg.is_fd = type_ in ["FD", "FB", "FE", "BI"]
msg.bitrate_switch = type_ in ["FB", " FE"]
msg.error_state_indicator = type_ in ["FE", "BI"]
msg.is_fd = type_ in {"FD", "FB", "FE", "BI"}
msg.bitrate_switch = type_ in {"FB", "FE"}
msg.error_state_indicator = type_ in {"FE", "BI"}

return msg

def _parse_cols_v1_1(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[2]
if dtype in ("Tx", "Rx"):
return self._parse_msg_v1_1(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
return None

def _parse_cols_v1_3(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[3]
if dtype in ("Tx", "Rx"):
return self._parse_msg_v1_3(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
return None

def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_cols_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]:
dtype = cols[self.columns["T"]]
if dtype in ["DT", "FD", "FB"]:
if dtype in {"DT", "FD", "FB", "FE", "BI"}:
return self._parse_msg_v2_x(cols)
else:
logger.info("TRCReader: Unsupported type '%s'", dtype)
Expand All @@ -244,7 +235,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
def _parse_line(self, line: str) -> Optional[Message]:
logger.debug("TRCReader: Parse '%s'", line)
try:
cols = line.split()
cols = tuple(line.split(maxsplit=self._num_columns))
return self._parse_cols(cols)
except IndexError:
logger.warning("TRCReader: Failed to parse message '%s'", line)
Expand Down

0 comments on commit cea7409

Please sign in to comment.