Skip to content

Commit

Permalink
Add a Dedicated Streaming Server (#291)
Browse files Browse the repository at this point in the history
* Supported None values for image variables. (#275)

* Skipped Base64 decoding. (#275)

* Added config entries for streaming. (#275)

* Reduced the default quality to speed up Base64 encoding. (#267) (#275)

* Bug fixed: unmatched dimensions. (#275) (#284)

* Using PIL interface. (#275) (#284)

* Added comm stream server in `RuntimeData`. (#275)

* Now comm stream can be enabled through config. (#275)

* Using custom separator. (#275) (#288)
  • Loading branch information
ATATC authored Jul 15, 2024
1 parent b3d65ec commit 838bd42
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 17 deletions.
2 changes: 2 additions & 0 deletions leads_gui/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self, base: dict[str, _Any]) -> None:
self.font_size_x_large: int = 56
self.comm_addr: str = "127.0.0.1"
self.comm_port: int = 16900
self.comm_stream: bool = False
self.comm_stream_port: int = 16901
self.save_data: bool = False
super().__init__(base)

Expand Down
8 changes: 4 additions & 4 deletions leads_gui/photo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@


class ImageVariable(_Variable):
def __init__(self, master: _Misc, image: _Image, name: str | None = None) -> None:
def __init__(self, master: _Misc, image: _Image | None, name: str | None = None) -> None:
super().__init__(master, False, name)
self._image: _Image = image
self._image: _Image | None = image

@_override
def set(self, value: _Image) -> None:
def set(self, value: _Image | None) -> None:
super().set(not super().get())
self._image = value

@_override
def get(self) -> _Image:
def get(self) -> _Image | None:
return self._image


Expand Down
9 changes: 9 additions & 0 deletions leads_gui/prototype.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod
from io import BytesIO as _BytesIO
from json import dumps as _dumps
from time import time as _time
from tkinter import Misc as _Misc, Event as _Event, PhotoImage as _PhotoImage
from typing import Callable as _Callable, Self as _Self, TypeVar as _TypeVar, Generic as _Generic, Any as _Any, \
Literal as _Literal

from PIL.Image import Image as _Image
from customtkinter import CTk as _CTk, CTkCanvas as _CTkCanvas, get_appearance_mode as _get_appearance_mode, \
ThemeManager as _ThemeManager, Variable as _Variable, ScalingTracker as _ScalingTracker, \
set_appearance_mode as _set_appearance_mode
Expand Down Expand Up @@ -214,11 +216,18 @@ class RuntimeData(object):
def __init__(self) -> None:
self.start_time: int = int(_time())
self.comm: _Server | None = None
self.comm_stream: _Server | None = None

def comm_notify(self, d: _DataContainer | dict[str, _Any]) -> None:
if self.comm:
self.comm.broadcast(d.encode() if isinstance(d, _DataContainer) else _dumps(d).encode())

def comm_stream_notify(self, tag: _Literal["frvc", "lfvc", "rtvc", "revc"], frame: _Image,
quality: int = 90) -> None:
if self.comm_stream:
frame.save(buffer := _BytesIO(), "JPEG", quality=quality)
self.comm_stream.broadcast(tag.encode() + b":" + buffer.getvalue())


T = _TypeVar("T", bound=RuntimeData)

Expand Down
62 changes: 52 additions & 10 deletions leads_vec/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime as _datetime
from time import time as _time
from threading import Thread as _Thread
from time import time as _time, sleep as _sleep
from typing import Callable as _Callable, override as _override

from customtkinter import CTkButton as _Button, CTkLabel as _Label, DoubleVar as _DoubleVar, StringVar as _StringVar, \
Expand All @@ -9,13 +10,15 @@
from leads import LEADS, SystemLiteral, require_config, register_context, DTCS, ABS, EBI, ATBS, GPSSpeedCorrection, \
ESCMode, get_controller, MAIN_CONTROLLER, L, EventListener, DataPushedEvent, UpdateEvent, has_device, \
GPS_RECEIVER, get_device, InterventionEvent, SuspensionEvent, Event, LEFT_INDICATOR, RIGHT_INDICATOR, SFT, \
initialize_main, format_duration, BRAKE_INDICATOR, VisualDataContainer, REAR_VIEW_CAMERA
initialize_main, format_duration, BRAKE_INDICATOR, REAR_VIEW_CAMERA, FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, \
RIGHT_VIEW_CAMERA
from leads.comm import Callback, Service, start_server, create_server, my_ip_addresses
from leads_audio import DIRECTION_INDICATOR_ON, DIRECTION_INDICATOR_OFF, WARNING, CONFIRM
from leads_gui import RuntimeData, Window, GForceVar, FrequencyGenerator, Left, Color, Right, ContextManager, \
Typography, Speedometer, ProxyCanvas, SpeedTrendMeter, GForceMeter, Stopwatch, Hazard, initialize, Battery, Brake, \
ESC, Satellite, Motor, Speed, Photo, Light
ESC, Satellite, Motor, Speed, Photo, Light, ImageVariable
from leads_vec.__version__ import __version__
from leads_video import Camera


class CustomRuntimeData(RuntimeData):
Expand All @@ -24,19 +27,56 @@ class CustomRuntimeData(RuntimeData):


def make_system_switch(ctx: LEADS, system: SystemLiteral, runtime_data: RuntimeData) -> _Callable[[], None]:
def switch() -> None:
def _() -> None:
ctx.plugin(system).enabled(not ctx.plugin(system).enabled())
runtime_data.control_system_switch_changed = True

return switch
return _


def get_proxy_canvas(context_manager: ContextManager, key: str) -> ProxyCanvas:
r = context_manager[key]
assert isinstance(r, ProxyCanvas)
if not isinstance(r, ProxyCanvas):
raise TypeError(f"Widget \"{key}\" is supposed to be a proxy canvas")
return r


def get_camera(tag: str) -> Camera | None:
if has_device(tag):
cam = get_device(tag)
if not isinstance(cam, Camera):
raise TypeError(f"Device \"{tag}\" is supposed to be a camera")
return cam
return None


class StreamCallback(Callback):
@_override
def on_initialize(self, service: Service) -> None:
self.super(service=service)
L.debug(f"Comm stream server started listening on {service.port()}")

@_override
def on_fail(self, service: Service, error: Exception) -> None:
self.super(service=service, error=error)
L.error(f"Comm stream server error: {repr(error)}")


def enable_comm_stream(context_manager: ContextManager, port: int) -> None:
rd = context_manager.window().runtime_data()
rd.comm_stream = start_server(create_server(port, StreamCallback(), b"end;"), True)

def _() -> None:
while True:
if rd.comm_stream.num_connections() < 1:
_sleep(.01)
for tag in FRONT_VIEW_CAMERA, LEFT_VIEW_CAMERA, RIGHT_VIEW_CAMERA, REAR_VIEW_CAMERA:
if (cam := get_camera(tag)) and (frame := cam.read_pil()):
rd.comm_stream_notify(tag, frame)

_Thread(name="comm streamer", target=_, daemon=True).start()


def main() -> int:
cfg = require_config()
ctx = LEADS(data_seq_size=cfg.data_seq_size, num_laps_timed=cfg.num_laps_timed)
Expand All @@ -52,7 +92,7 @@ def main() -> int:
root.configure(cursor="dot")
var_lap_times = _StringVar(root, "")
var_gps = _StringVar(root, "")
var_rear_view_base64 = _StringVar(root, "")
var_rear_view = ImageVariable(root, None)
var_info = _StringVar(root, "")
var_speed = _DoubleVar(root, 0)
var_voltage = _StringVar(root, "")
Expand Down Expand Up @@ -88,7 +128,7 @@ def render(manager: ContextManager) -> None:
font=("Arial", cfg.font_size_small - 4))
)
if has_device(REAR_VIEW_CAMERA):
m1_widgets += (Photo(root, theme_key="CTkButton", variable=var_rear_view_base64),)
m1_widgets += (Photo(root, theme_key="CTkButton", variable=var_rear_view),)
manager["m1"] = ProxyCanvas(root, "CTkButton", *m1_widgets).lock_ratio(cfg.m_ratio)
manager["m2"] = Speedometer(root, variable=var_speed).lock_ratio(cfg.m_ratio)
manager["m3"] = ProxyCanvas(root, "CTkButton",
Expand Down Expand Up @@ -159,6 +199,8 @@ def on_receive(self, service: Service, msg: bytes) -> None:
get_proxy_canvas(uim, "m3").next_mode()

w.runtime_data().comm = start_server(create_server(cfg.comm_port, CommCallback()), True)
if cfg.comm_stream:
enable_comm_stream(uim, cfg.comm_stream_port)

class CustomListener(EventListener):
@_override
Expand Down Expand Up @@ -188,8 +230,8 @@ def on_update(self, e: UpdateEvent) -> None:
var_gps.set(f"GPS {"VALID" if d.gps_valid else "NO FIX"} - !NF!\n\n"
f"{d.gps_ground_speed:.1f} KM / H\n"
f"LAT {d.latitude:.5f}\nLON {d.longitude:.5f}")
if isinstance(d, VisualDataContainer) and d.rear_view_base64:
var_rear_view_base64.set(d.rear_view_base64)
if cam := get_camera(REAR_VIEW_CAMERA):
var_rear_view.set(cam.read_pil())
var_info.set(f"VeC {__version__.upper()}\n\n"
f"{_datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n"
f"{format_duration(duration := _time() - w.runtime_data().start_time)}\n"
Expand Down
2 changes: 1 addition & 1 deletion leads_video/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def read_numpy(self) -> _ndarray | None:
return self.read()

def read_pil(self) -> _Image | None:
return None if (frame := self.read_numpy()) is None else _fromarray(frame)
return None if (frame := self.read_numpy()) is None else _fromarray(frame.transpose(1, 2, 0))

@_override
def close(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions leads_video/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ def encode_image(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -
return None if x is None else _fromarray(x.transpose(1, 2, 0), mode)


def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None) -> str:
def base64_encode(x: _ndarray | None, mode: _Literal["L", "RGB"] | None = None, quality: int = 25) -> str:
if not (img := encode_image(x, mode)):
return ""
img.save(buffer := _BytesIO(), "JPEG")
img.save(buffer := _BytesIO(), "JPEG", quality=quality)
return _b64encode(buffer.getvalue()).decode()


Expand Down

0 comments on commit 838bd42

Please sign in to comment.