From 4cf2035acc1523c5fd22b880d8205e4026a72ef4 Mon Sep 17 00:00:00 2001 From: ATATC Date: Thu, 15 Feb 2024 13:46:25 -0500 Subject: [PATCH] Added a generic speed in the base data container. Added speed trend functionality. --- leads/context.py | 17 ++++++++++++----- leads/data.py | 27 +++++---------------------- leads_vec/cli.py | 6 +++--- 3 files changed, 20 insertions(+), 30 deletions(-) diff --git a/leads/context.py b/leads/context.py index af380843..afad5e66 100644 --- a/leads/context.py +++ b/leads/context.py @@ -1,8 +1,10 @@ from collections import deque as _deque -from time import time as _time from copy import copy as _copy +from time import time as _time from typing import TypeVar as _TypeVar, Generic as _Generic +from numpy import diff as _diff, gradient as _gradient, average as _average, array as _array + from leads.constant import SYSTEM_DTCS, SYSTEM_ABS, SYSTEM_EBI, SYSTEM_ATBS from leads.data import DataContainer, SRWDataContainer, DRWDataContainer @@ -26,15 +28,16 @@ def __init__(self, :param data_seq_size: buffer size of previous data """ self._srw_mode: bool = srw_mode - superclass = SRWDataContainer if srw_mode else DRWDataContainer + dct = SRWDataContainer if srw_mode else DRWDataContainer if initial_data: - _check_data_type(initial_data, superclass) + _check_data_type(initial_data, dct) else: - initial_data = superclass() + initial_data = dct() self.__initial_data_type: type = type(initial_data) if data_seq_size < 1: raise ValueError("`data_seq_size` must be greater or equal to 1") - self._data_seq: _deque[superclass] = _deque((initial_data,), maxlen=data_seq_size) + self._data_seq: _deque[dct] = _deque((initial_data,), maxlen=data_seq_size) + self._speed_seq: _deque[int | float] = _deque(maxlen=data_seq_size) self._lap_time_seq: _deque[int] = _deque((int(_time() * 1000),), maxlen=num_laps_recorded + 1) self._dtcs: bool = True self._abs: bool = True @@ -54,6 +57,7 @@ def push(self, data: T) -> None: """ _check_data_type(data, self.__initial_data_type) self._data_seq.append(data) + self._speed_seq.append(data.speed) def set_subsystem(self, system: str, enabled: bool) -> None: """ @@ -103,6 +107,9 @@ def record_lap(self): def get_lap_time_list(self) -> list[int]: return [self._lap_time_seq[i] - self._lap_time_seq[i - 1] for i in range(1, len(self._lap_time_seq))] + def get_speed_trend(self) -> float: + return float(_average(_gradient(_diff(_array(self._speed_seq))))) + def brake(self, force: float) -> int: # todo return 0 diff --git a/leads/data.py b/leads/data.py index 78f796df..2f56d927 100644 --- a/leads/data.py +++ b/leads/data.py @@ -1,16 +1,12 @@ -from abc import abstractmethod as _abstractmethod, ABCMeta as _ABCMeta +from abc import ABCMeta as _ABCMeta from json import dumps as _dumps from time import time as _time -from typing import Self as _Self class DataContainer(object, metaclass=_ABCMeta): - def __init__(self) -> None: + def __init__(self, *speeds: int | float) -> None: self._time_stamp: int = int(_time() * 1000) - - @_abstractmethod - def __sub__(self, other: _Self) -> _Self: - raise NotImplementedError + self.speed: int | float = min(speeds) def __str__(self) -> str: return _dumps(self.to_dict()) @@ -56,16 +52,10 @@ def __init__(self, front_wheel_speed: int | float = 0, rear_wheel_speed: int | float = 0, ) -> None: - super().__init__() + super().__init__(front_wheel_speed, rear_wheel_speed) self.front_wheel_speed: int | float = front_wheel_speed self.rear_wheel_speed: int | float = rear_wheel_speed - def __sub__(self, other: _Self) -> _Self: - return SRWDataContainer( - self.front_wheel_speed - other.front_wheel_speed, - self.rear_wheel_speed - other.rear_wheel_speed - ) - class DRWDataContainer(DataContainer): def __init__(self, @@ -73,14 +63,7 @@ def __init__(self, left_rear_wheel_speed: int | float = 0, right_rear_wheel_speed: int | float = 0, ) -> None: - super().__init__() + super().__init__(front_wheel_speed, left_rear_wheel_speed, left_rear_wheel_speed) self.front_wheel_speed: int | float = front_wheel_speed self.left_rear_wheel_speed: int | float = left_rear_wheel_speed self.right_rear_wheel_speed: int | float = right_rear_wheel_speed - - def __sub__(self, other: _Self) -> _Self: - return DRWDataContainer( - self.front_wheel_speed - other.front_wheel_speed, - self.left_rear_wheel_speed - other.left_rear_wheel_speed, - self.right_rear_wheel_speed - other.right_rear_wheel_speed - ) diff --git a/leads_vec/cli.py b/leads_vec/cli.py index 06a2d057..b004b69d 100644 --- a/leads_vec/cli.py +++ b/leads_vec/cli.py @@ -19,7 +19,7 @@ class CustomRuntimeData(RuntimeData): def main() -> int: cfg = get_config(Config) - ctx = LEADS(srw_mode=cfg.srw_mode) + ctx = LEADS[SRWDataContainer if cfg.srw_mode else DRWDataContainer](srw_mode=cfg.srw_mode) window = Window(cfg.width, cfg.height, cfg.refresh_rate, @@ -117,13 +117,13 @@ def on_update(self, e: UpdateEvent) -> None: f"{(duration := int(time()) - uim.rd().start_time) // 60} MIN {duration % 60} SEC\n\n" f"{'SRW MODE' if cfg.srw_mode else 'DRW MODE'}\n" f"REFRESH RATE: {cfg.refresh_rate} FPS") - m2.set(int(d.front_wheel_speed)) + m2.set(int(d.speed)) if uim.rd().m3_mode == 0: m3.set("0.0V") elif uim.rd().m3_mode == 1: m3.set("G Force") else: - m3.set("Speed Trend") + m3.set("Speed Trend\n" + str(ctx.get_speed_trend())) if uim.rd().comm.num_connections() < 1: uim["comm_status"].configure(text="COMM OFFLINE", text_color="gray") if uim.rd().control_system_switch_changed: