From 838bd42777f1d4990f16f804612283d16b6e689e Mon Sep 17 00:00:00 2001 From: Terry Fu Date: Mon, 15 Jul 2024 21:41:23 +0800 Subject: [PATCH] Add a Dedicated Streaming Server (#291) * Supported None values for image variables. (#275) * Skipped Base64 decoding. (#275) * Added config entries for streaming. (#275) * Reduced the default quality to speed up Base64 encoding. (#267) (#275) * Bug fixed: unmatched dimensions. (#275) (#284) * Using PIL interface. (#275) (#284) * Added comm stream server in `RuntimeData`. (#275) * Now comm stream can be enabled through config. (#275) * Using custom separator. (#275) (#288) --- leads_gui/config.py | 2 ++ leads_gui/photo.py | 8 +++--- leads_gui/prototype.py | 9 ++++++ leads_vec/cli.py | 62 +++++++++++++++++++++++++++++++++++------- leads_video/camera.py | 2 +- leads_video/utils.py | 4 +-- 6 files changed, 70 insertions(+), 17 deletions(-) diff --git a/leads_gui/config.py b/leads_gui/config.py index b2b7c566..72947ab3 100644 --- a/leads_gui/config.py +++ b/leads_gui/config.py @@ -19,6 +19,8 @@ def __init__(self, base: dict[str, _Any]) -> None: self.font_size_x_large: int = 56 self.comm_addr: str = "127.0.0.1" self.comm_port: int = 16900 + self.comm_stream: bool = False + self.comm_stream_port: int = 16901 self.save_data: bool = False super().__init__(base) diff --git a/leads_gui/photo.py b/leads_gui/photo.py index 354be716..314fffc6 100644 --- a/leads_gui/photo.py +++ b/leads_gui/photo.py @@ -11,17 +11,17 @@ class ImageVariable(_Variable): - def __init__(self, master: _Misc, image: _Image, name: str | None = None) -> None: + def __init__(self, master: _Misc, image: _Image | None, name: str | None = None) -> None: super().__init__(master, False, name) - self._image: _Image = image + self._image: _Image | None = image @_override - def set(self, value: _Image) -> None: + def set(self, value: _Image | None) -> None: super().set(not super().get()) self._image = value @_override - def get(self) -> _Image: + def get(self) -> _Image | None: return self._image diff --git a/leads_gui/prototype.py b/leads_gui/prototype.py index 4c1a5e96..5656d5a1 100644 --- a/leads_gui/prototype.py +++ b/leads_gui/prototype.py @@ -1,10 +1,12 @@ from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod +from io import BytesIO as _BytesIO from json import dumps as _dumps from time import time as _time from tkinter import Misc as _Misc, Event as _Event, PhotoImage as _PhotoImage from typing import Callable as _Callable, Self as _Self, TypeVar as _TypeVar, Generic as _Generic, Any as _Any, \ Literal as _Literal +from PIL.Image import Image as _Image from customtkinter import CTk as _CTk, CTkCanvas as _CTkCanvas, get_appearance_mode as _get_appearance_mode, \ ThemeManager as _ThemeManager, Variable as _Variable, ScalingTracker as _ScalingTracker, \ set_appearance_mode as _set_appearance_mode @@ -214,11 +216,18 @@ class RuntimeData(object): def __init__(self) -> None: self.start_time: int = int(_time()) self.comm: _Server | None = None + self.comm_stream: _Server | None = None def comm_notify(self, d: _DataContainer | dict[str, _Any]) -> None: if self.comm: self.comm.broadcast(d.encode() if isinstance(d, _DataContainer) else _dumps(d).encode()) + def comm_stream_notify(self, tag: _Literal["frvc", "lfvc", "rtvc", "revc"], frame: _Image, + quality: int = 90) -> None: + if self.comm_stream: + frame.save(buffer := _BytesIO(), "JPEG", quality=quality) + self.comm_stream.broadcast(tag.encode() + b":" + buffer.getvalue()) + T = _TypeVar("T", bound=RuntimeData) diff --git a/leads_vec/cli.py b/leads_vec/cli.py index 32919e60..900c481f 100644 --- a/leads_vec/cli.py +++ b/leads_vec/cli.py @@ -1,5 +1,6 @@ from datetime import datetime as _datetime -from time import time as _time +from threading import Thread as _Thread +from time import time as _time, sleep as _sleep from typing import Callable as _Callable, override as _override from customtkinter import CTkButton as _Button, CTkLabel as _Label, DoubleVar as _DoubleVar, StringVar as _StringVar, \ @@ -9,13 +10,15 @@ from leads import LEADS, SystemLiteral, require_config, register_context, DTCS, ABS, EBI, ATBS, GPSSpeedCorrection, \ ESCMode, get_controller, MAIN_CONTROLLER, L, EventListener, DataPushedEvent, UpdateEvent, has_device, \ GPS_RECEIVER, get_device, InterventionEvent, SuspensionEvent, Event, LEFT_INDICATOR, RIGHT_INDICATOR, SFT, \ - initialize_main, format_duration, BRAKE_INDICATOR, VisualDataContainer, REAR_VIEW_CAMERA + initialize_main, format_duration, BRAKE_INDICATOR, REAR_VIEW_CAMERA, FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, \ + RIGHT_VIEW_CAMERA from leads.comm import Callback, Service, start_server, create_server, my_ip_addresses from leads_audio import DIRECTION_INDICATOR_ON, DIRECTION_INDICATOR_OFF, WARNING, CONFIRM from leads_gui import RuntimeData, Window, GForceVar, FrequencyGenerator, Left, Color, Right, ContextManager, \ Typography, Speedometer, ProxyCanvas, SpeedTrendMeter, GForceMeter, Stopwatch, Hazard, initialize, Battery, Brake, \ - ESC, Satellite, Motor, Speed, Photo, Light + ESC, Satellite, Motor, Speed, Photo, Light, ImageVariable from leads_vec.__version__ import __version__ +from leads_video import Camera class CustomRuntimeData(RuntimeData): @@ -24,19 +27,56 @@ class CustomRuntimeData(RuntimeData): def make_system_switch(ctx: LEADS, system: SystemLiteral, runtime_data: RuntimeData) -> _Callable[[], None]: - def switch() -> None: + def _() -> None: ctx.plugin(system).enabled(not ctx.plugin(system).enabled()) runtime_data.control_system_switch_changed = True - return switch + return _ def get_proxy_canvas(context_manager: ContextManager, key: str) -> ProxyCanvas: r = context_manager[key] - assert isinstance(r, ProxyCanvas) + if not isinstance(r, ProxyCanvas): + raise TypeError(f"Widget \"{key}\" is supposed to be a proxy canvas") return r +def get_camera(tag: str) -> Camera | None: + if has_device(tag): + cam = get_device(tag) + if not isinstance(cam, Camera): + raise TypeError(f"Device \"{tag}\" is supposed to be a camera") + return cam + return None + + +class StreamCallback(Callback): + @_override + def on_initialize(self, service: Service) -> None: + self.super(service=service) + L.debug(f"Comm stream server started listening on {service.port()}") + + @_override + def on_fail(self, service: Service, error: Exception) -> None: + self.super(service=service, error=error) + L.error(f"Comm stream server error: {repr(error)}") + + +def enable_comm_stream(context_manager: ContextManager, port: int) -> None: + rd = context_manager.window().runtime_data() + rd.comm_stream = start_server(create_server(port, StreamCallback(), b"end;"), True) + + def _() -> None: + while True: + if rd.comm_stream.num_connections() < 1: + _sleep(.01) + for tag in FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, RIGHT_VIEW_CAMERA, REAR_VIEW_CAMERA: + if (cam := get_camera(tag)) and (frame := cam.read_pil()): + rd.comm_stream_notify(tag, frame) + + _Thread(name="comm streamer", target=_, daemon=True).start() + + def main() -> int: cfg = require_config() ctx = LEADS(data_seq_size=cfg.data_seq_size, num_laps_timed=cfg.num_laps_timed) @@ -52,7 +92,7 @@ def main() -> int: root.configure(cursor="dot") var_lap_times = _StringVar(root, "") var_gps = _StringVar(root, "") - var_rear_view_base64 = _StringVar(root, "") + var_rear_view = ImageVariable(root, None) var_info = _StringVar(root, "") var_speed = _DoubleVar(root, 0) var_voltage = _StringVar(root, "") @@ -88,7 +128,7 @@ def render(manager: ContextManager) -> None: font=("Arial", cfg.font_size_small - 4)) ) if has_device(REAR_VIEW_CAMERA): - m1_widgets += (Photo(root, theme_key="CTkButton", variable=var_rear_view_base64),) + m1_widgets += (Photo(root, theme_key="CTkButton", variable=var_rear_view),) manager["m1"] = ProxyCanvas(root, "CTkButton", *m1_widgets).lock_ratio(cfg.m_ratio) manager["m2"] = Speedometer(root, variable=var_speed).lock_ratio(cfg.m_ratio) manager["m3"] = ProxyCanvas(root, "CTkButton", @@ -159,6 +199,8 @@ def on_receive(self, service: Service, msg: bytes) -> None: get_proxy_canvas(uim, "m3").next_mode() w.runtime_data().comm = start_server(create_server(cfg.comm_port, CommCallback()), True) + if cfg.comm_stream: + enable_comm_stream(uim, cfg.comm_stream_port) class CustomListener(EventListener): @_override @@ -188,8 +230,8 @@ def on_update(self, e: UpdateEvent) -> None: var_gps.set(f"GPS {"VALID" if d.gps_valid else "NO FIX"} - !NF!\n\n" f"{d.gps_ground_speed:.1f} KM / H\n" f"LAT {d.latitude:.5f}\nLON {d.longitude:.5f}") - if isinstance(d, VisualDataContainer) and d.rear_view_base64: - var_rear_view_base64.set(d.rear_view_base64) + if cam := get_camera(REAR_VIEW_CAMERA): + var_rear_view.set(cam.read_pil()) var_info.set(f"VeC {__version__.upper()}\n\n" f"{_datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n" f"{format_duration(duration := _time() - w.runtime_data().start_time)}\n" diff --git a/leads_video/camera.py b/leads_video/camera.py index 13ca3a21..938b2ce9 100644 --- a/leads_video/camera.py +++ b/leads_video/camera.py @@ -52,7 +52,7 @@ def read_numpy(self) -> _ndarray | None: return self.read() def read_pil(self) -> _Image | None: - return None if (frame := self.read_numpy()) is None else _fromarray(frame) + return None if (frame := self.read_numpy()) is None else _fromarray(frame.transpose(1, 2, 0)) @_override def close(self) -> None: diff --git a/leads_video/utils.py b/leads_video/utils.py index 47886df1..8ee6e58b 100644 --- a/leads_video/utils.py +++ b/leads_video/utils.py @@ -11,10 +11,10 @@ def encode_image(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) - return None if x is None else _fromarray(x.transpose(1, 2, 0), mode) -def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -> str: +def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None, quality: int = 25) -> str: if not (img := encode_image(x, mode)): return "" - img.save(buffer := _BytesIO(), "JPEG") + img.save(buffer := _BytesIO(), "JPEG", quality=quality) return _b64encode(buffer.getvalue()).decode()