From 3cd7aef3f12360412eabd30e0e97daca23b6f826 Mon Sep 17 00:00:00 2001 From: "Tianhao (Terry) Fu" Date: Thu, 12 Dec 2024 13:29:11 -0500 Subject: [PATCH] Support Dialog Boxes (#457) * Separated `Window` and `Pot`. (#456) * Supported setting debug window status. (#456) * Bug fixed: the callbacks attached to the variable that remain after the widget is destroyed cause errors that it cannot find the widget. (#456) * Supported warning popups. (#456) * Bug fixed. (#456) * Bug fixed: unmatched types. (#456) * Bug fixed: wrong vertical position. (#456) --- leads/data_persistence/analyzer/processor.py | 2 +- leads_gui/__init__.py | 2 +- leads_gui/performance_checker.py | 2 +- leads_gui/prototype.py | 150 ++++++++++--------- leads_gui/speedometer.py | 4 +- leads_vec/benchmark.py | 6 +- leads_vec/cli.py | 35 +++-- 7 files changed, 112 insertions(+), 89 deletions(-) diff --git a/leads/data_persistence/analyzer/processor.py b/leads/data_persistence/analyzer/processor.py index 12afc68d..84526e93 100644 --- a/leads/data_persistence/analyzer/processor.py +++ b/leads/data_persistence/analyzer/processor.py @@ -189,7 +189,7 @@ def shared_pre(row: dict[str, _Any], i: int) -> tuple[int, float, float]: return (dt := t - self._lap_start_time), ( ds := mileage - self._lap_start_mileage), 3600000 * ds / dt if dt else 0 - def shared_post(duration: float, distance: float, avg_speed: float) -> None: + def shared_post(duration: int, distance: float, avg_speed: float) -> None: if self._max_lap_duration is None or duration > self._max_lap_duration: self._max_lap_duration = duration if self._max_lap_distance is None or distance > self._max_lap_distance: diff --git a/leads_gui/__init__.py b/leads_gui/__init__.py index ab1b91ff..7e827bc7 100644 --- a/leads_gui/__init__.py +++ b/leads_gui/__init__.py @@ -39,7 +39,7 @@ def _(cfg: Config) -> None: _set_on_register_config(_on_register_config) -def initialize(window: Window, +def initialize(window: Pot, render: _Callable[[ContextManager], None], leads: _LEADS[_Any]) -> ContextManager: main_controller = _get_controller(_MAIN_CONTROLLER) diff --git a/leads_gui/performance_checker.py b/leads_gui/performance_checker.py index d29321a4..5d741bcb 100644 --- a/leads_gui/performance_checker.py +++ b/leads_gui/performance_checker.py @@ -20,7 +20,7 @@ def frame_rate(self) -> float: return 1 / _average(self._delay_seq) def net_delay(self) -> float: - return _average(self._net_delay_seq) + return float(_average(self._net_delay_seq)) def record_frame(self, last_interval: float) -> None: # add .0000000001 to avoid zero division diff --git a/leads_gui/prototype.py b/leads_gui/prototype.py index 09d7d56a..d4ef18f0 100644 --- a/leads_gui/prototype.py +++ b/leads_gui/prototype.py @@ -1,7 +1,7 @@ from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod from json import dumps as _dumps from time import time as _time -from tkinter import Misc as _Misc, Event as _Event, PhotoImage as _PhotoImage +from tkinter import Misc as _Misc, Event as _Event, PhotoImage as _PhotoImage, TclError as _TclError from typing import Callable as _Callable, Self as _Self, TypeVar as _TypeVar, Generic as _Generic, Any as _Any, \ Literal as _Literal, override as _override @@ -173,7 +173,10 @@ def attach(self, callback: _Callable[[], None]) -> None: def unique(_, __, ___) -> None: if (v := self._variable.get()) != self._last_value: - callback() + try: + callback() + except _TclError: + self.detach() self._last_value = v self._trace_cb_name = self._variable.trace_add("write", unique) @@ -214,7 +217,6 @@ def attempt(self) -> bool: class _RuntimeData(object): def __init__(self) -> None: - self.protected_pot: Window | None = None self.start_time: int = int(_time()) self.comm: _Server | None = None self.comm_stream: _Server | None = None @@ -247,66 +249,35 @@ def __new__(cls, *args, **kwargs) -> _RuntimeData: return super().__new__(cls, *args, **kwargs) -T = _TypeVar("T", bound=RuntimeData) - - -class Window(_Generic[T]): +class Window(object): def __init__(self, - width: int, - height: int, - refresh_rate: int, - runtime_data: T, - on_refresh: _Callable[[_Self], None] = lambda _: None, + master: _Misc | None = None, + width: int = 720, + height: int = 480, title: str = "LEADS", fullscreen: bool = False, no_title_bar: bool = True, - theme_mode: _Literal["system", "light", "dark"] = "system", - display: int = 0) -> None: - self._refresh_rate: int = refresh_rate - self._runtime_data: T = runtime_data - self._on_refresh: _Callable[[Window], None] = on_refresh - self._frequency_generators: dict[str, FrequencyGenerator] = {} - self._display: int = display - - pot = runtime_data.protected_pot - popup = False - - if pot: - self._master: _CTkToplevel = _CTkToplevel(pot._master) - popup = display == pot._display - if not popup: - self._master.bind("", lambda _: pot._master.focus_force()) - self.show() + display: int = 0, + popup: bool = False) -> None: + self._pot_master: _Misc | None = master + if master: + self._master: _CTk | _CTkToplevel = _CTkToplevel(master) + elif self.__class__ is not Pot: + raise TypeError("Use `Pot` for root windows") else: - self._master: _CTk = _CTk() - runtime_data.protected_pot = self + self._master: _CTk | _CTkToplevel = _CTk() + popup = False screen = _get_monitors()[display] - self._master.title(title) - self._master.wm_iconbitmap() - self._master.iconphoto(True, _PhotoImage(master=self._master, file=f"{_ASSETS_PATH}/logo.png")) - self._master.overrideredirect(no_title_bar) - _set_appearance_mode(theme_mode) + self._screen_x: int = screen.x + self._screen_y: int = screen.y self._screen_width: int = screen.width self._screen_height: int = screen.height self._width: int = self._screen_width if fullscreen else width self._height: int = self._screen_height if fullscreen else height - - x, y = int((self._screen_width - self._width) * .5) + screen.x, int((self._screen_height - self._height) * .5) - if popup: - x = int((pot._width - self._width) * .5 + pot._master.winfo_rootx()) - y = int((pot._height - self._height) * .5 + pot._master.winfo_rooty()) - self._master.geometry(f"{self._width}x{self._height}+{x}+{y}") - self._master.resizable(False, False) - - self._active: bool = isinstance(self._master, _CTkToplevel) - self._performance_checker: PerformanceChecker = PerformanceChecker() - self._last_interval: float = 0 - - def root(self) -> _CTk: - return self._master - - def is_pot(self) -> bool: - return isinstance(self._master, _CTk) + self._title: str = title + self._no_title_bar: bool = no_title_bar + self._display: int = display + self._popup: bool = popup def screen_index(self) -> int: return self._display @@ -323,6 +294,55 @@ def width(self) -> int: def height(self) -> int: return self._height + def root(self) -> _CTk: + return self._master + + def show(self) -> None: + self._master.title(self._title) + self._master.wm_iconbitmap() + self._master.iconphoto(True, _PhotoImage(master=self._master, file=f"{_ASSETS_PATH}/logo.png")) + self._master.overrideredirect(self._no_title_bar) + x, y = (int((self._screen_width - self._width) * .5) + self._screen_x, + int((self._screen_height - self._height) * .5)) + if self._popup: + x = int((self._pot_master.winfo_width() - self._width) * .5 + self._pot_master.winfo_rootx()) + y = int((self._pot_master.winfo_height() - self._height) * .5 + self._pot_master.winfo_rooty()) + self._master.transient(self._pot_master) + elif self._pot_master: + y += self._pot_master.winfo_screenheight() - self._screen_height - self._screen_y + self._master.geometry(f"{self._width}x{self._height}+{x}+{y}") + self._master.resizable(False, False) + + def kill(self) -> None: + self._master.destroy() + + +T = _TypeVar("T", bound=RuntimeData) + + +class Pot(Window, _Generic[T]): + def __init__(self, + width: int, + height: int, + refresh_rate: int, + runtime_data: T, + on_refresh: _Callable[[_Self], None] = lambda _: None, + title: str = "LEADS", + fullscreen: bool = False, + no_title_bar: bool = True, + theme_mode: _Literal["system", "light", "dark"] = "system", + display: int = 0) -> None: + Window.__init__(self, None, width, height, title, fullscreen, no_title_bar, display) + self._refresh_rate: int = refresh_rate + self._runtime_data: T = runtime_data + self._on_refresh: _Callable[[Pot], None] = on_refresh + self._frequency_generators: dict[str, FrequencyGenerator] = {} + _set_appearance_mode(theme_mode) + + self._active: bool = isinstance(self._master, _CTkToplevel) + self._performance_checker: PerformanceChecker = PerformanceChecker() + self._last_interval: float = 0 + def frame_rate(self) -> float: return self._performance_checker.frame_rate() @@ -359,16 +379,9 @@ def clear_frequency_generators(self) -> None: def active(self) -> bool: return self._active + @_override def show(self) -> None: - try: - if isinstance(self._master, _CTkToplevel): - pot = self._runtime_data.protected_pot - if self._display == pot._display: - self._master.transient(pot._master) - return - finally: - self._active = True - + super().show() def wrapper(init: bool) -> None: if not init: self._on_refresh(self) @@ -383,26 +396,24 @@ def wrapper(init: bool) -> None: self._master.after(int((ni := self._performance_checker.next_interval()) * 1000), wrapper, init) self._last_interval = ni + self._active = True self._master.after(1, wrapper, True) self._master.mainloop() self._active = False - def kill(self) -> None: - self._master.destroy() - class ContextManager(object): def __init__(self, *windows: Window) -> None: pot = None self._windows: dict[int, Window] = {} for window in windows: - if window.is_pot(): + if isinstance(window, Pot): pot = window else: self.add_window(window) if not pot: raise LookupError("No root window") - self._pot: Window = pot + self._pot: Pot = pot self._widgets: dict[str, _Widget] = {} def num_windows(self) -> int: @@ -418,12 +429,13 @@ def _allocate_window(self) -> int: def add_window(self, window: Window) -> int: self._windows[index := self._allocate_window()] = window + window.show() return index def remove_window(self, index: int) -> None: self._windows.pop(index).kill() - def index_of_window(self, window: Window) -> int: + def index_of_window(self, window: Pot) -> int: for k, v in self._windows.items(): if v == window: return k @@ -466,7 +478,7 @@ def layout(self, layout: list[list[str | _Widget | None]], padding: float = .005 widget.configure(width=screen_width) widget.grid(row=i, column=j * s, sticky="NSEW", columnspan=s, ipadx=p, ipady=p, padx=p, pady=p) - def window(self, index: int = -1) -> Window: + def window(self, index: int = -1) -> Pot: return self._pot if index < 0 else self._windows[index] def show(self) -> None: diff --git a/leads_gui/speedometer.py b/leads_gui/speedometer.py index 2729d478..29e23ba4 100644 --- a/leads_gui/speedometer.py +++ b/leads_gui/speedometer.py @@ -66,9 +66,9 @@ def dynamic_renderer(self, canvas: CanvasBased) -> None: f"#{f"{int(0x4d + 0xb2 * p):02x}" * 3}")) canvas.collect("d0", canvas.create_arc(x - r, y - r, x + r, y + r, start=-30, extent=240, width=4, style=_ARC, outline=color)) - canvas.collect("d1", canvas.create_line(*(x, y) if self._style == 2 else (x - _cos(rad) * (r - 8), + canvas.collect("d1", canvas.create_line((x, y) if self._style == 2 else (x - _cos(rad) * (r - 8), y - _sin(rad) * (r - 8)), - x - _cos(rad) * (r + 8), y - _sin(rad) * (r + 8), width=4, + (x - _cos(rad) * (r + 8), y - _sin(rad) * (r + 8)), width=4, fill=color)) canvas.collect("d2", canvas.create_text(x, y * .95 if self._style == 1 else y + (r - font[1]) * .5, text=str(int(v)), fill=self._text_color, font=font)) diff --git a/leads_vec/benchmark.py b/leads_vec/benchmark.py index 76d631df..7c3f20a5 100644 --- a/leads_vec/benchmark.py +++ b/leads_vec/benchmark.py @@ -8,7 +8,7 @@ from cv2 import VideoCapture, imencode, IMWRITE_JPEG_QUALITY, CAP_PROP_FPS from leads import L, require_config -from leads_gui import RuntimeData, Window, ContextManager, Speedometer +from leads_gui import RuntimeData, Pot, ContextManager, Speedometer def video_tester(container: Callable[[], None]) -> float: @@ -61,7 +61,7 @@ def __init__(self) -> None: self.t: float = time() self.speed: DoubleVar | None = None - def on_refresh(self, window: Window) -> None: + def on_refresh(self, window: Pot) -> None: self.speed.set((d := time() - self.t) * 20) if d > 10: window.kill() @@ -72,7 +72,7 @@ def main() -> int: L.info("GUI test starting, this takes about 10 seconds") rd = RuntimeData() callbacks = Callbacks() - w = Window(800, 256, 30, rd, callbacks.on_refresh, "Benchmark", no_title_bar=False) + w = Pot(800, 256, 30, rd, callbacks.on_refresh, "Benchmark", no_title_bar=False) callbacks.speed = DoubleVar(w.root()) uim = ContextManager(w) uim.layout([[CTkLabel(w.root(), text="Do NOT close the window", height=240), diff --git a/leads_vec/cli.py b/leads_vec/cli.py index b3eb2910..f6921c2a 100644 --- a/leads_vec/cli.py +++ b/leads_vec/cli.py @@ -15,7 +15,7 @@ FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, RIGHT_VIEW_CAMERA, SuspensionExitEvent from leads.comm import Callback, Service, start_server, create_server, my_ip_addresses, ConnectionBase from leads_audio import DIRECTION_INDICATOR_ON, DIRECTION_INDICATOR_OFF, WARNING, CONFIRM -from leads_gui import RuntimeData, Window, GForceVar, FrequencyGenerator, Left, Color, Right, ContextManager, \ +from leads_gui import RuntimeData, Window, Pot, GForceVar, FrequencyGenerator, Left, Color, Right, ContextManager, \ Typography, Speedometer, ProxyCanvas, SpeedTrendMeter, GForceMeter, Stopwatch, Hazard, initialize, Battery, Brake, \ ESC, Satellite, Motor, Speed, Photo, Light, ImageVariable from leads_vec.__version__ import __version__ @@ -118,7 +118,7 @@ def on_receive(self, service: Service, msg: bytes) -> None: def add_secondary_window(context_manager: ContextManager, display: int, var_lap_times: _StringVar, var_speed: _DoubleVar, var_speed_trend: _DoubleVar) -> None: pot = context_manager.window() - w = Window(0, 0, pot.refresh_rate(), pot.runtime_data(), fullscreen=True, display=display) + w = Window(pot.root(), 0, 0, fullscreen=True, display=display) window_index = context_manager.add_window(w) num_widgets = int(w.width() / w.height()) fonts = (("Arial", int(w.width() * .2)), ("Arial", int(w.width() * .1)), ("Arial", int(w.width() * .025))) @@ -135,14 +135,22 @@ def toggle_debug_window(context_manager: ContextManager, var_debug: _StringVar) pot = context_manager.window() rd = pot.runtime_data() if rd.debug_window_index < 0: - w = Window(pot.width(), pot.height(), pot.refresh_rate(), rd) + w = Window(pot.root(), pot.width(), pot.height(), popup=True) rd.debug_window_index = context_manager.add_window(w) - context_manager.layout([[Typography(w.root(), width=pot.width(), height=pot.height(), variable=var_debug, - font=("Arial", int(pot.height() * .022)))]], 0, - rd.debug_window_index) - return - context_manager.remove_window(rd.debug_window_index) - rd.debug_window_index = -1 + context_manager.layout([ + [Typography(w.root(), width=pot.width(), height=pot.height() * .9, variable=var_debug, + font=("Arial", int(pot.height() * .022)))], + [_Button(w.root(), pot.width(), int(pot.height() * .1), text="CLOSE", + command=lambda: toggle_debug_window(context_manager, var_debug))] + ], 0, rd.debug_window_index) + else: + context_manager.remove_window(rd.debug_window_index) + rd.debug_window_index = -1 + + +def set_debug_window(context_manager: ContextManager, var_debug: _StringVar, status: bool) -> None: + if status ^ context_manager.window().runtime_data().debug_window_index < 0: + toggle_debug_window(context_manager, var_debug) def main() -> int: @@ -153,8 +161,8 @@ def main() -> int: ctx.plugin(SystemLiteral.ABS, ABS()) ctx.plugin(SystemLiteral.EBI, EBI()) ctx.plugin(SystemLiteral.ATBS, ATBS()) - w = Window(cfg.width, cfg.height, cfg.refresh_rate, CustomRuntimeData(), fullscreen=cfg.fullscreen, - no_title_bar=cfg.no_title_bar, theme_mode=cfg.theme_mode) + w = Pot(cfg.width, cfg.height, cfg.refresh_rate, CustomRuntimeData(), fullscreen=cfg.fullscreen, + no_title_bar=cfg.no_title_bar, theme_mode=cfg.theme_mode) root = w.root() root.configure(cursor="dot") var_lap_times = _StringVar(root, "") @@ -262,7 +270,10 @@ def switch_esc_mode(mode: str) -> None: class IdleUpdate(FrequencyGenerator): @_override def do(self) -> None: - cpu_temp = get_device("cpu").read()["temp"] if has_device("cpu") else "?" + cpu_temp = get_device("cpu").read()["temp"] if has_device("cpu") else 0 + if cpu_temp > 90: + L.warn("! CPU OVERHEATING, PULL OVER RIGHT NOW !") + set_debug_window(uim, var_debug, True) var_info.set(f"VeC {__version__.upper()}\n\n" f"{_datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n" f"{format_duration(duration := _time() - w.runtime_data().start_time)} {cpu_temp} °C\n"