diff --git a/rerun_py/depthai_viewer/_backend/device.py b/rerun_py/depthai_viewer/_backend/device.py index 32cf84b8c144..f20859c85472 100644 --- a/rerun_py/depthai_viewer/_backend/device.py +++ b/rerun_py/depthai_viewer/_backend/device.py @@ -3,7 +3,7 @@ import time from queue import Empty as QueueEmpty from queue import Queue -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Any import depthai as dai from depthai_sdk import OakCamera @@ -376,7 +376,7 @@ def update_pipeline(self, runtime_only: bool) -> Message: if isinstance(message, ErrorMessage): return message - self._queues = [] + self._component_outputs = dict() self._dai_queues = [] if config.auto: @@ -479,7 +479,11 @@ def update_pipeline(self, runtime_only: bool) -> Message: (smallest_supported_resolution.width, smallest_supported_resolution.height), res_x ) ) - self._queues.append((sdk_cam, self._oak.queue(sdk_cam.out.main))) + cam_name = str(cam.board_socket) + self._component_outputs[cam_name] = { + "component": sdk_cam, + "out": sdk_cam.out.main.set_name(cam_name), + } else: print("Skipped creating camera:", cam.board_socket, "because no valid sensor resolution was found.") continue @@ -492,17 +496,17 @@ def update_pipeline(self, runtime_only: bool) -> Message: tof_align = list( filter( lambda comp_and_q: ( # type: ignore[arg-type] - comp_and_q[0].node.getBoardSocket() == cam_cfg.tof_align - if isinstance(comp_and_q[0], CameraComponent) + comp_and_q[1]["component"].node.getBoardSocket() == cam_cfg.tof_align + if isinstance(comp_and_q[1]["component"], CameraComponent) else False ), - self._queues, + self._component_outputs.items(), ) ) tof_align = tof_align[0][0] if tof_align else None sdk_cam = self._oak.create_tof(create_tof_on, tof_align) self._tof_component = sdk_cam - self._queues.append((sdk_cam, self._oak.queue(sdk_cam.out.main))) + self._component_outputs["tof"] = {"component": sdk_cam, "out": sdk_cam.out.main.set_name("tof")} else: # Should never happen print("Couldn't find camera config for ToF, can't create ToF.") @@ -544,7 +548,10 @@ def update_pipeline(self, runtime_only: bool) -> Message: if not aligned_camera: return ErrorMessage(f"{config.stereo.align} is not configured. Couldn't create stereo pair.") aligned_camera.is_used_as_stereo_align = True - self._queues.append((self._stereo, self._oak.queue(self._stereo.out.main))) + self._component_outputs["stereo"] = { + "component": self._stereo, + "out": self._stereo.out.main.set_name("stereo"), + } if self._oak.device.getConnectedIMU() != "NONE" and self._oak.device.getConnectedIMU() != "": print("Creating IMU, connected IMU: ", self._oak.device.getConnectedIMU()) @@ -612,7 +619,7 @@ def update_pipeline(self, runtime_only: bool) -> Message: else: self._nnet = self._oak.create_nn(model_path, cam_component) if self._nnet: - self._queues.append((self._nnet, self._oak.queue(self._nnet.out.main))) + self._component_outputs["nn"] = {"component": self._nnet, "out": self._nnet.out.main.set_name("nn")} sys_logger_xlink = self._oak.pipeline.createXLinkOut() logger = self._oak.pipeline.createSystemLogger() @@ -620,6 +627,10 @@ def update_pipeline(self, runtime_only: bool) -> Message: sys_logger_xlink.setStreamName("sys_logger") logger.out.link(sys_logger_xlink.input) + outputs = [comp[1]["out"] for comp in self._component_outputs.items()] + fps = config.cameras[0].fps # All camera fps are the same + self._queue = self._oak.queue(outputs).configure_syncing(True, threshold_ms=1000 // fps).get_queue() + try: print("Starting pipeline") self._oak.start() @@ -647,12 +658,16 @@ def update(self) -> None: return if not self._oak.running(): return - for component, queue in self._queues: - try: - packet = queue.get_queue().get_nowait() - self._packet_handler.log_packet(component, packet) - except QueueEmpty: - continue + + try: + # WIP: This is a temporary solution, update() shouldn't be called if update_pipeline() hasn't finished + if not hasattr(self, "_queue"): + return + packets: Dict[str, Any] = self._queue.get_nowait() + for name, packet in packets.items(): + self._packet_handler.log_packet(self._component_outputs[name]["component"], packet) + except QueueEmpty: + pass for dai_node, queue, context in self._dai_queues: packet = queue.tryGet()