Skip to content

Commit

Permalink
Added a generic speed in the base data container.
Browse files Browse the repository at this point in the history
Added speed trend functionality.
  • Loading branch information
ATATC committed Feb 15, 2024
1 parent 4660b6e commit 4cf2035
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 30 deletions.
17 changes: 12 additions & 5 deletions leads/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from collections import deque as _deque
from time import time as _time
from copy import copy as _copy
from time import time as _time
from typing import TypeVar as _TypeVar, Generic as _Generic

from numpy import diff as _diff, gradient as _gradient, average as _average, array as _array

from leads.constant import SYSTEM_DTCS, SYSTEM_ABS, SYSTEM_EBI, SYSTEM_ATBS
from leads.data import DataContainer, SRWDataContainer, DRWDataContainer

Expand All @@ -26,15 +28,16 @@ def __init__(self,
:param data_seq_size: buffer size of previous data
"""
self._srw_mode: bool = srw_mode
superclass = SRWDataContainer if srw_mode else DRWDataContainer
dct = SRWDataContainer if srw_mode else DRWDataContainer
if initial_data:
_check_data_type(initial_data, superclass)
_check_data_type(initial_data, dct)
else:
initial_data = superclass()
initial_data = dct()
self.__initial_data_type: type = type(initial_data)
if data_seq_size < 1:
raise ValueError("`data_seq_size` must be greater or equal to 1")
self._data_seq: _deque[superclass] = _deque((initial_data,), maxlen=data_seq_size)
self._data_seq: _deque[dct] = _deque((initial_data,), maxlen=data_seq_size)
self._speed_seq: _deque[int | float] = _deque(maxlen=data_seq_size)
self._lap_time_seq: _deque[int] = _deque((int(_time() * 1000),), maxlen=num_laps_recorded + 1)
self._dtcs: bool = True
self._abs: bool = True
Expand All @@ -54,6 +57,7 @@ def push(self, data: T) -> None:
"""
_check_data_type(data, self.__initial_data_type)
self._data_seq.append(data)
self._speed_seq.append(data.speed)

def set_subsystem(self, system: str, enabled: bool) -> None:
"""
Expand Down Expand Up @@ -103,6 +107,9 @@ def record_lap(self):
def get_lap_time_list(self) -> list[int]:
return [self._lap_time_seq[i] - self._lap_time_seq[i - 1] for i in range(1, len(self._lap_time_seq))]

def get_speed_trend(self) -> float:
return float(_average(_gradient(_diff(_array(self._speed_seq)))))

def brake(self, force: float) -> int:
# todo
return 0
27 changes: 5 additions & 22 deletions leads/data.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from abc import abstractmethod as _abstractmethod, ABCMeta as _ABCMeta
from abc import ABCMeta as _ABCMeta
from json import dumps as _dumps
from time import time as _time
from typing import Self as _Self


class DataContainer(object, metaclass=_ABCMeta):
def __init__(self) -> None:
def __init__(self, *speeds: int | float) -> None:
self._time_stamp: int = int(_time() * 1000)

@_abstractmethod
def __sub__(self, other: _Self) -> _Self:
raise NotImplementedError
self.speed: int | float = min(speeds)

def __str__(self) -> str:
return _dumps(self.to_dict())
Expand Down Expand Up @@ -56,31 +52,18 @@ def __init__(self,
front_wheel_speed: int | float = 0,
rear_wheel_speed: int | float = 0,
) -> None:
super().__init__()
super().__init__(front_wheel_speed, rear_wheel_speed)
self.front_wheel_speed: int | float = front_wheel_speed
self.rear_wheel_speed: int | float = rear_wheel_speed

def __sub__(self, other: _Self) -> _Self:
return SRWDataContainer(
self.front_wheel_speed - other.front_wheel_speed,
self.rear_wheel_speed - other.rear_wheel_speed
)


class DRWDataContainer(DataContainer):
def __init__(self,
front_wheel_speed: int | float = 0,
left_rear_wheel_speed: int | float = 0,
right_rear_wheel_speed: int | float = 0,
) -> None:
super().__init__()
super().__init__(front_wheel_speed, left_rear_wheel_speed, left_rear_wheel_speed)
self.front_wheel_speed: int | float = front_wheel_speed
self.left_rear_wheel_speed: int | float = left_rear_wheel_speed
self.right_rear_wheel_speed: int | float = right_rear_wheel_speed

def __sub__(self, other: _Self) -> _Self:
return DRWDataContainer(
self.front_wheel_speed - other.front_wheel_speed,
self.left_rear_wheel_speed - other.left_rear_wheel_speed,
self.right_rear_wheel_speed - other.right_rear_wheel_speed
)
6 changes: 3 additions & 3 deletions leads_vec/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class CustomRuntimeData(RuntimeData):

def main() -> int:
cfg = get_config(Config)
ctx = LEADS(srw_mode=cfg.srw_mode)
ctx = LEADS[SRWDataContainer if cfg.srw_mode else DRWDataContainer](srw_mode=cfg.srw_mode)
window = Window(cfg.width,
cfg.height,
cfg.refresh_rate,
Expand Down Expand Up @@ -117,13 +117,13 @@ def on_update(self, e: UpdateEvent) -> None:
f"{(duration := int(time()) - uim.rd().start_time) // 60} MIN {duration % 60} SEC\n\n"
f"{'SRW MODE' if cfg.srw_mode else 'DRW MODE'}\n"
f"REFRESH RATE: {cfg.refresh_rate} FPS")
m2.set(int(d.front_wheel_speed))
m2.set(int(d.speed))
if uim.rd().m3_mode == 0:
m3.set("0.0V")
elif uim.rd().m3_mode == 1:
m3.set("G Force")
else:
m3.set("Speed Trend")
m3.set("Speed Trend\n" + str(ctx.get_speed_trend()))
if uim.rd().comm.num_connections() < 1:
uim["comm_status"].configure(text="COMM OFFLINE", text_color="gray")
if uim.rd().control_system_switch_changed:
Expand Down

0 comments on commit 4cf2035

Please sign in to comment.