Skip to content

Commit

Permalink
fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomáš Kubíček committed Dec 8, 2023
1 parent 1b044e8 commit a3e35b0
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 147 deletions.
122 changes: 41 additions & 81 deletions rerun_py/depthai_viewer/_backend/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from depthai_sdk import OakCamera
from depthai_sdk.classes.packet_handlers import ComponentOutput
from depthai_sdk.components import CameraComponent, NNComponent, StereoComponent
from depthai_sdk.components.tof_component import Component
from depthai_sdk.components.camera_helper import (
getClosestIspScale,
)
Expand All @@ -32,7 +31,6 @@
compare_dai_camera_configs,
get_size_from_resolution,
size_to_resolution,
ALL_NEURAL_NETWORKS,
)
from depthai_viewer._backend.messages import (
ErrorMessage,
Expand Down Expand Up @@ -70,8 +68,7 @@ def update(self) -> None:

class Device:
id: str
intrinsic_matrix: Dict[Tuple[dai.CameraBoardSocket,
int, int], NDArray[np.float32]] = {}
intrinsic_matrix: Dict[Tuple[dai.CameraBoardSocket, int, int], NDArray[np.float32]] = {}
calibration_data: Optional[dai.CalibrationHandler] = None
use_encoding: bool = False
store: Store
Expand All @@ -92,8 +89,7 @@ def __init__(self, device_id: str, store: Store):
self.id = device_id
self.set_oak(OakCamera(device_id))
self.store = store
self._packet_handler = PacketHandler(
self.store, self.get_intrinsic_matrix)
self._packet_handler = PacketHandler(self.store, self.get_intrinsic_matrix)
print("Oak cam: ", self._oak)
# self.start = time.time()
# self._profiler.enable()
Expand All @@ -109,15 +105,13 @@ def is_closed(self) -> bool:

def get_intrinsic_matrix(self, board_socket: dai.CameraBoardSocket, width: int, height: int) -> NDArray[np.float32]:
if self.intrinsic_matrix.get((board_socket, width, height)) is not None:
# type: ignore[return-value]
return self.intrinsic_matrix.get((board_socket, width, height))
return self.intrinsic_matrix.get((board_socket, width, height)) # type: ignore[return-value]
if self.calibration_data is None:
raise Exception("Missing calibration data!")
M_right = self.calibration_data.getCameraIntrinsics( # type: ignore[union-attr]
board_socket, dai.Size2f(width, height)
)
self.intrinsic_matrix[(board_socket, width, height)] = np.array(
M_right).reshape(3, 3)
self.intrinsic_matrix[(board_socket, width, height)] = np.array(M_right).reshape(3, 3)
return self.intrinsic_matrix[(board_socket, width, height)]

def _get_possible_stereo_pairs_for_cam(
Expand All @@ -139,13 +133,11 @@ def _get_possible_stereo_pairs_for_cam(
possible_stereo_pairs = []
if cam.name == "right":
possible_stereo_pairs.extend(
[features.socket for features in filter(
lambda c: c.name == "left", connected_camera_features)]
[features.socket for features in filter(lambda c: c.name == "left", connected_camera_features)]
)
elif cam.name == "left":
possible_stereo_pairs.extend(
[features.socket for features in filter(
lambda c: c.name == "right", connected_camera_features)]
[features.socket for features in filter(lambda c: c.name == "right", connected_camera_features)]
)
else:
possible_stereo_pairs.extend(
Expand All @@ -155,8 +147,7 @@ def _get_possible_stereo_pairs_for_cam(
if camera != cam
and all(
map(
lambda confs: compare_dai_camera_configs(
confs[0], confs[1]),
lambda confs: compare_dai_camera_configs(confs[0], confs[1]),
zip(camera.configs, cam.configs),
)
)
Expand Down Expand Up @@ -185,8 +176,7 @@ def get_device_properties(self) -> DeviceProperties:
else XLinkConnection.USB,
mxid=device_info.mxid,
)
device_properties = DeviceProperties(
id=self.id, imu=imu, info=device_info)
device_properties = DeviceProperties(id=self.id, imu=imu, info=device_info)
try:
calib = self._oak.device.readCalibration2()
left_cam = calib.getStereoLeftCameraId()
Expand All @@ -200,8 +190,7 @@ def get_device_properties(self) -> DeviceProperties:
except RuntimeError:
print("No calibration found while trying to fetch the default stereo pair.")

ordered_resolutions = list(
sorted(size_to_resolution.keys(), key=lambda res: res[0] * res[1]))
ordered_resolutions = list(sorted(size_to_resolution.keys(), key=lambda res: res[0] * res[1]))
for cam in connected_cam_features:
prioritized_type = cam.supportedTypes[0]
biggest_width_height = [
Expand All @@ -214,8 +203,7 @@ def get_device_properties(self) -> DeviceProperties:
biggest_width, biggest_height = biggest_width_height[0]

if cam.supportedTypes[0] == dai.CameraSensorType.TOF:
all_supported_resolutions = [
size_to_resolution[(biggest_width, biggest_height)]]
all_supported_resolutions = [size_to_resolution[(biggest_width, biggest_height)]]
else:
all_supported_resolutions = list(
filter(
Expand All @@ -235,8 +223,7 @@ def get_device_properties(self) -> DeviceProperties:
max_fps=60,
resolutions=all_supported_resolutions,
supported_types=cam.supportedTypes,
stereo_pairs=self._get_possible_stereo_pairs_for_cam(
cam, connected_cam_features),
stereo_pairs=self._get_possible_stereo_pairs_for_cam(cam, connected_cam_features),
name=cam.name.capitalize(),
)
)
Expand Down Expand Up @@ -268,7 +255,8 @@ def reconnect_to_oak(self) -> Message:
while time.time() - timeout_start < 10:
available_devices = [
# type: ignore[call-arg]
device.getMxId() for device in dai.Device.getAllAvailableDevices()
device.getMxId()
for device in dai.Device.getAllAvailableDevices()
]
if self.id in available_devices:
break
Expand All @@ -282,8 +270,7 @@ def reconnect_to_oak(self) -> Message:
return ErrorMessage("Failed to create oak camera")

def _get_component_by_socket(self, socket: dai.CameraBoardSocket) -> Optional[CameraComponent]:
component = list(
filter(lambda c: c.node.getBoardSocket() == socket, self._cameras))
component = list(filter(lambda c: c.node.getBoardSocket() == socket, self._cameras))
if not component:
return None
return component[0]
Expand All @@ -292,8 +279,7 @@ def _get_camera_config_by_socket(
self, config: PipelineConfiguration, socket: dai.CameraBoardSocket
) -> Optional[CameraConfiguration]:
print("Getting cam by socket: ", socket, " Cameras: ", config.cameras)
camera = list(
filter(lambda c: c.board_socket == socket, config.cameras))
camera = list(filter(lambda c: c.board_socket == socket, config.cameras))
if not camera:
return None
return camera[0]
Expand All @@ -319,16 +305,14 @@ def _create_auto_pipeline_config(self, config: PipelineConfiguration) -> Message
if cam.name == "rgb": # By convention
rgb_cam_socket = cam.socket
resolution = (
CameraSensorResolution.THE_1080_P if cam.width >= 1920 else size_to_resolution[(
cam.width, cam.height)]
CameraSensorResolution.THE_1080_P if cam.width >= 1920 else size_to_resolution[(cam.width, cam.height)]
)
resolution = CameraSensorResolution.THE_1200_P if cam.height == 1200 else resolution
preferred_type = cam.supportedTypes[0]
if preferred_type == dai.CameraSensorType.TOF:
has_tof = True
config.cameras.append(
CameraConfiguration(
resolution=resolution, kind=preferred_type, board_socket=cam.socket, name=cam.name)
CameraConfiguration(resolution=resolution, kind=preferred_type, board_socket=cam.socket, name=cam.name)
)
# 2. Create stereo depth
if not has_tof:
Expand All @@ -351,8 +335,7 @@ def _create_auto_pipeline_config(self, config: PipelineConfiguration) -> Message
# Try to find a color camera config
nnet_cam_sock = next(
filter(
# type: ignore[arg-type,union-attr]
lambda cam: cam.kind == dai.CameraSensorType.COLOR,
lambda cam: cam.kind == dai.CameraSensorType.COLOR, # type: ignore[arg-type,union-attr]
config.cameras,
),
None,
Expand All @@ -377,8 +360,7 @@ def update_pipeline(self, runtime_only: bool) -> Message:
if self._oak.device.isPipelineRunning():
if runtime_only:
if config.depth is not None:
self._stereo.control.send_controls(
config.depth.to_runtime_controls())
self._stereo.control.send_controls(config.depth.to_runtime_controls())
return InfoMessage("")
return ErrorMessage("Depth is disabled, can't send runtime controls!")
print("Cam running, closing...")
Expand All @@ -401,25 +383,21 @@ def update_pipeline(self, runtime_only: bool) -> Message:
is_usb2 = self._oak.device.getUsbSpeed() == dai.UsbSpeed.HIGH
if is_poe:
self.store.send_message_to_frontend(
WarningMessage(
"Device is connected via PoE. This may cause performance issues.")
WarningMessage("Device is connected via PoE. This may cause performance issues.")
)
print("Connected to a PoE device, camera streams will be JPEG encoded...")
elif is_usb2:
self.store.send_message_to_frontend(
WarningMessage(
"Device is connected in USB2 mode. This may cause performance issues.")
WarningMessage("Device is connected in USB2 mode. This may cause performance issues.")
)
print(
"Device is connected in USB2 mode, camera streams will be JPEG encoded...")
print("Device is connected in USB2 mode, camera streams will be JPEG encoded...")
self.use_encoding = is_poe or is_usb2

connected_camera_features = self._oak.device.getConnectedCameraFeatures()
for cam in config.cameras:
print("Creating camera: ", cam)

camera_features = next(
filter(lambda feat: feat.socket == cam.board_socket, connected_camera_features))
camera_features = next(filter(lambda feat: feat.socket == cam.board_socket, connected_camera_features))

# When the resolution is too small, the ISP needs to scale it down
res_x, res_y = get_size_from_resolution(cam.resolution)
Expand All @@ -444,8 +422,7 @@ def update_pipeline(self, runtime_only: bool) -> Message:
config for config in camera_features.configs if config.type == camera_features.supportedTypes[0]
][0]
sensor_resolution = size_to_resolution.get(
(smallest_supported_resolution.width,
smallest_supported_resolution.height), None
(smallest_supported_resolution.width, smallest_supported_resolution.height), None
)
is_used_by_depth = config.depth is not None and (
cam.board_socket == config.depth.align or cam.board_socket in config.depth.stereo_pair
Expand All @@ -468,17 +445,14 @@ def update_pipeline(self, runtime_only: bool) -> Message:
if not does_sensor_support_resolution:
sdk_cam.config_color_camera(
isp_scale=getClosestIspScale(
(smallest_supported_resolution.width,
smallest_supported_resolution.height), res_x
(smallest_supported_resolution.width, smallest_supported_resolution.height), res_x
)
)
self._cameras.append(sdk_cam)
else:
print("Skipped creating camera:", cam.board_socket,
"because no valid sensor resolution was found.")
print("Skipped creating camera:", cam.board_socket, "because no valid sensor resolution was found.")
continue
self._queues.append(
(sdk_cam, self._oak.queue(sdk_cam.out.main)))
self._queues.append((sdk_cam, self._oak.queue(sdk_cam.out.main)))

if config.depth:
print("Creating depth")
Expand All @@ -490,14 +464,11 @@ def update_pipeline(self, runtime_only: bool) -> Message:

if left_cam.node.getResolutionWidth() > 1280:
print("Left cam width > 1280, setting isp scale to get 800")
left_cam.config_color_camera(isp_scale=calculate_isp_scale(
left_cam.node.getResolutionWidth()))
left_cam.config_color_camera(isp_scale=calculate_isp_scale(left_cam.node.getResolutionWidth()))
if right_cam.node.getResolutionWidth() > 1280:
print("Right cam width > 1280, setting isp scale to get 800")
right_cam.config_color_camera(isp_scale=calculate_isp_scale(
right_cam.node.getResolutionWidth()))
self._stereo = self._oak.create_stereo(
left=left_cam, right=right_cam, name="depth")
right_cam.config_color_camera(isp_scale=calculate_isp_scale(right_cam.node.getResolutionWidth()))
self._stereo = self._oak.create_stereo(left=left_cam, right=right_cam, name="depth")

align_component = self._get_component_by_socket(config.depth.align)
if not align_component:
Expand All @@ -511,16 +482,13 @@ def update_pipeline(self, runtime_only: bool) -> Message:
median=config.depth.median,
)

aligned_camera = self._get_camera_config_by_socket(
config, config.depth.align)
aligned_camera = self._get_camera_config_by_socket(config, config.depth.align)
if not aligned_camera:
return ErrorMessage(f"{config.depth.align} is not configured. Couldn't create stereo pair.")
self._queues.append(
(self._stereo, self._oak.queue(self._stereo.out.main)))
self._queues.append((self._stereo, self._oak.queue(self._stereo.out.main)))

if self._oak.device.getConnectedIMU() != "NONE" and self._oak.device.getConnectedIMU() != "":
print("Creating IMU, connected IMU: ",
self._oak.device.getConnectedIMU())
print("Creating IMU, connected IMU: ", self._oak.device.getConnectedIMU())
# TODO(someone): Handle IMU updates
imu = self._oak.create_imu()
sensors = [
Expand All @@ -537,26 +505,20 @@ def update_pipeline(self, runtime_only: bool) -> Message:
print("Connected cam doesn't have IMU, skipping IMU creation...")

if config.ai_model and config.ai_model.path:
cam_component = self._get_component_by_socket(
config.ai_model.camera)
cam_component = self._get_component_by_socket(config.ai_model.camera)
if not cam_component:
return ErrorMessage(f"{config.ai_model.camera} is not configured. Couldn't create NN.")
if config.ai_model.path == "age-gender-recognition-retail-0013":
face_detection = self._oak.create_nn(
"face-detection-retail-0004", cam_component)
self._nnet = self._oak.create_nn(
"age-gender-recognition-retail-0013", input=face_detection)
face_detection = self._oak.create_nn("face-detection-retail-0004", cam_component)
self._nnet = self._oak.create_nn("age-gender-recognition-retail-0013", input=face_detection)
else:
self._nnet = self._oak.create_nn(
config.ai_model.path, cam_component)
self._nnet = self._oak.create_nn(config.ai_model.path, cam_component)

camera = self._get_camera_config_by_socket(
config, config.ai_model.camera)
camera = self._get_camera_config_by_socket(config, config.ai_model.camera)
if not camera:
return ErrorMessage(f"{config.ai_model.camera} is not configured. Couldn't create NN.")

self._queues.append(
(self._nnet, self._oak.queue(self._nnet.out.main)))
self._queues.append((self._nnet, self._oak.queue(self._nnet.out.main)))

sys_logger_xlink = self._oak.pipeline.createXLinkOut()
logger = self._oak.pipeline.createSystemLogger()
Expand All @@ -573,8 +535,7 @@ def update_pipeline(self, runtime_only: bool) -> Message:
running = self._oak.running()
if running:
self._pipeline_start_t = time.time()
self._sys_info_q = self._oak.device.getOutputQueue(
"sys_logger", 1, False)
self._sys_info_q = self._oak.device.getOutputQueue("sys_logger", 1, False)
# We might have modified the config, so store it
self.store.set_pipeline_config(config)
try:
Expand Down Expand Up @@ -607,8 +568,7 @@ def update(self) -> None:
sys_info = self._sys_info_q.tryGet() # type: ignore[attr-defined]
if sys_info is not None and self._pipeline_start_t is not None:
print("----------------------------------------")
print(
f"[{int(time.time() - self._pipeline_start_t)}s] System information")
print(f"[{int(time.time() - self._pipeline_start_t)}s] System information")
print("----------------------------------------")
print_system_information(sys_info)
# if time.time() - self.start > 10:
Expand Down
Loading

0 comments on commit a3e35b0

Please sign in to comment.