Skip to content

Commit

Permalink
Added syncing between all components (cameras, stereo, tof, ai..) (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
Erol444 authored Nov 13, 2024
1 parent e909165 commit 18b2d90
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions rerun_py/depthai_viewer/_backend/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
from queue import Empty as QueueEmpty
from queue import Queue
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Any

import depthai as dai
from depthai_sdk import OakCamera
Expand Down Expand Up @@ -376,7 +376,7 @@ def update_pipeline(self, runtime_only: bool) -> Message:
if isinstance(message, ErrorMessage):
return message

self._queues = []
self._component_outputs = dict()
self._dai_queues = []

if config.auto:
Expand Down Expand Up @@ -479,7 +479,11 @@ def update_pipeline(self, runtime_only: bool) -> Message:
(smallest_supported_resolution.width, smallest_supported_resolution.height), res_x
)
)
self._queues.append((sdk_cam, self._oak.queue(sdk_cam.out.main)))
cam_name = str(cam.board_socket)
self._component_outputs[cam_name] = {
"component": sdk_cam,
"out": sdk_cam.out.main.set_name(cam_name),
}
else:
print("Skipped creating camera:", cam.board_socket, "because no valid sensor resolution was found.")
continue
Expand All @@ -492,17 +496,17 @@ def update_pipeline(self, runtime_only: bool) -> Message:
tof_align = list(
filter(
lambda comp_and_q: ( # type: ignore[arg-type]
comp_and_q[0].node.getBoardSocket() == cam_cfg.tof_align
if isinstance(comp_and_q[0], CameraComponent)
comp_and_q[1]["component"].node.getBoardSocket() == cam_cfg.tof_align
if isinstance(comp_and_q[1]["component"], CameraComponent)
else False
),
self._queues,
self._component_outputs.items(),
)
)
tof_align = tof_align[0][0] if tof_align else None
sdk_cam = self._oak.create_tof(create_tof_on, tof_align)
self._tof_component = sdk_cam
self._queues.append((sdk_cam, self._oak.queue(sdk_cam.out.main)))
self._component_outputs["tof"] = {"component": sdk_cam, "out": sdk_cam.out.main.set_name("tof")}
else: # Should never happen
print("Couldn't find camera config for ToF, can't create ToF.")

Expand Down Expand Up @@ -544,7 +548,10 @@ def update_pipeline(self, runtime_only: bool) -> Message:
if not aligned_camera:
return ErrorMessage(f"{config.stereo.align} is not configured. Couldn't create stereo pair.")
aligned_camera.is_used_as_stereo_align = True
self._queues.append((self._stereo, self._oak.queue(self._stereo.out.main)))
self._component_outputs["stereo"] = {
"component": self._stereo,
"out": self._stereo.out.main.set_name("stereo"),
}

if self._oak.device.getConnectedIMU() != "NONE" and self._oak.device.getConnectedIMU() != "":
print("Creating IMU, connected IMU: ", self._oak.device.getConnectedIMU())
Expand Down Expand Up @@ -612,14 +619,18 @@ def update_pipeline(self, runtime_only: bool) -> Message:
else:
self._nnet = self._oak.create_nn(model_path, cam_component)
if self._nnet:
self._queues.append((self._nnet, self._oak.queue(self._nnet.out.main)))
self._component_outputs["nn"] = {"component": self._nnet, "out": self._nnet.out.main.set_name("nn")}

sys_logger_xlink = self._oak.pipeline.createXLinkOut()
logger = self._oak.pipeline.createSystemLogger()
logger.setRate(0.1)
sys_logger_xlink.setStreamName("sys_logger")
logger.out.link(sys_logger_xlink.input)

outputs = [comp[1]["out"] for comp in self._component_outputs.items()]
fps = config.cameras[0].fps # All camera fps are the same
self._queue = self._oak.queue(outputs).configure_syncing(True, threshold_ms=1000 // fps).get_queue()

try:
print("Starting pipeline")
self._oak.start()
Expand Down Expand Up @@ -647,12 +658,16 @@ def update(self) -> None:
return
if not self._oak.running():
return
for component, queue in self._queues:
try:
packet = queue.get_queue().get_nowait()
self._packet_handler.log_packet(component, packet)
except QueueEmpty:
continue

try:
# WIP: This is a temporary solution, update() shouldn't be called if update_pipeline() hasn't finished
if not hasattr(self, "_queue"):
return
packets: Dict[str, Any] = self._queue.get_nowait()
for name, packet in packets.items():
self._packet_handler.log_packet(self._component_outputs[name]["component"], packet)
except QueueEmpty:
pass

for dai_node, queue, context in self._dai_queues:
packet = queue.tryGet()
Expand Down

0 comments on commit 18b2d90

Please sign in to comment.