diff --git a/ingest_development_scripts/testing_script.py b/ingest_development_scripts/testing_script.py new file mode 100644 index 00000000..902ce0c6 --- /dev/null +++ b/ingest_development_scripts/testing_script.py @@ -0,0 +1,43 @@ +from nucleus.sensorfusion import * +import numpy as np +import glob + +def format_pointcloud(lidar_np): + mask = lidar_np[:, 4] == 0.0 + pc_1 = lidar_np[mask, :] + pc_1 = np.delete(pc_1, (4), 1) + return pc_1 + +npz_files = sorted(glob.glob("sdk-sample-data/*")) +npz_files.remove("sdk-sample-data/extrinsics_seq000-003_11042021.npz") + +updated_extrinsics = np.load("sdk-sample-data/extrinsics_seq000-003_11042021.npz", allow_pickle = True) +wnsl_extrinsics = updated_extrinsics['camera_WindshieldNarrowStereoLeft_lidar_extrinsic'] +print(f"Camera Extrinsics: \n{wnsl_extrinsics}") + +cam_1_calib = CameraCalibration(extrinsic_matrix=wnsl_extrinsics) +print(f"Original Camera Pose:{cam_1_calib.pose}") + +RawScene = RawScene() +for idx,npz_fp in enumerate(npz_files): + print(f"Frame Index: {idx}") + + frame_npz = np.load(npz_fp, allow_pickle=True) + + pointcloud_np= format_pointcloud(frame_npz['points']) + print(f"PointCloud Shape: {pointcloud_np.shape}") + + pointcloud = RawPointCloud(points=pointcloud_np) + + print(f"World Coordinate Transformation:\n{frame_npz['vehicle_local_tf']}") + frame_pose = Transform(frame_npz['vehicle_local_tf']) + print(f"Frame Pose: {frame_pose}") + + raw_frame = RawFrame(lidar=pointcloud, cam1=cam_1_calib, device_pose=frame_pose) + RawScene.add_frame(raw_frame) + +print(f"Frame 5, Point1 PreTransform: {RawScene.frames[4].items['lidar'].points[0]}") +print(f"Frame 5, Camera Extrinsics PreTransform: {RawScene.frames[4].items['cam1'].pose}") +RawScene.apply_transforms(relative=True) +print(f"Frame 5, Point1 in World: {RawScene.frames[4].items['lidar'].points[0]}") +print(f"Frame 5, Camera Extrinsics PostTransform: {RawScene.frames[4].items['cam1'].pose}") \ No newline at end of file diff --git a/nucleus/sensorfusion.py b/nucleus/sensorfusion.py new file mode 100644 index 00000000..05708480 --- /dev/null +++ b/nucleus/sensorfusion.py @@ -0,0 +1,162 @@ +import numpy as np +from nucleus.transform import Transform +import copy +from typing import List +from dataclasses import dataclass + +from sensorfusion_utils import read_pcd + +@dataclass +class RawPointCloud: + ''' + RawPointClouds are containers for raw point cloud data. This structure contains Point Clouds as (N,3) or (N,4) numpy arrays + Point cloud data is assumed to be in ego coordinates. + + If the point cloud is in world coordinates, one can provide the inverse pose as the "transform" argument. If this argument + is present, the point cloud will be transformed back into ego coordinates by: + + transform.apply(points) + + or in the extended implementation: + + points_4d = np.hstack([points[:, :3], np.ones((points.shape[0], 1))]) + transformed_4d = points_4d.dot(self.matrix.T) + return np.hstack([transformed_4d[:, :3], points[:, 3:]]) + + + Args: + points (np.array): Point cloud data represented as (N,3) or (N,4) numpy arrays + transform (:class:`Transform`): If in World Coordinate, the transformation used to transform lidar points into ego coordinates + ''' + points: np.array + + def __init__(self, points: np.array = None, transform: Transform = None): + self.points = points + + if points is not None and (len(self.points.shape) != 2 or self.points.shape[1] not in [3, 4]): + raise Exception(f'numpy array has unexpected shape{self.points.shape}. Please convert to (N,3) or (N,4) numpy array where each row is [x,y,z] or [x,y,z,i]') + + if transform is not None: + self.points = transform.apply(points) + + def load_pcd(self, filepath: str, transform: Transform = None): + ''' + Takes raw pcd file and reads in as numpy array. + Args: + filepath (str): Local filepath to pcd file + transform (:class:`Transform`): If in world, the transformation used to transform lidar points into ego coordinates + ''' + points = read_pcd(filepath) + self.points = points + if transform is not None: + self.points = transform.apply(points) + + +class CameraCalibration: + ''' + CameraCalibration solely holds the pose of the camera + This CameraCalibration will inevitably be transformed by the device_pose + Args: + extrinsic_matrix (np.array): (4,4) extrinsic transformation matrix representing device_to_lidar + ''' + + def __init__(self, extrinsic_matrix=None): + self.extrinsic_matrix = extrinsic_matrix + + @property + def extrinsic_matrix(self): + """Camera extrinsic + :getter: Return camera's extrinsic matrix (pose.inverse[:3, :4]) + :setter: pose = Transform(matrix).inverse + :type: 3x4 matrix + """ + return self.pose.inverse[:3, :4] + + @extrinsic_matrix.setter + def extrinsic_matrix(self, matrix): + ''' + Sets pose as inverse of extrinsic matrix + ''' + self.pose = Transform(matrix).inverse + + +class RawFrame: + ''' + RawFrames are containers for point clouds, image extrinsics, and device pose. + These objects most notably are leveraged to transform point clouds and image extrinsic matrices to the world coordinate frame. + Args: + device_pose (:class:`Transform`): World Coordinate transformation for frame + **kwargs (Dict[str, :class:`RawPointCloud, :class: CameraCalibration`]): Mappings from sensor name + to pointcloud and camera calibrations. Each frame of a lidar scene must contain exactly one + pointcloud and any number of camera calibrations + ''' + def __init__(self, device_pose: Transform = None, **kwargs): + self.device_pose = device_pose + self.items = {} + for key, value in kwargs.items(): + if isinstance(value, CameraCalibration): + self.items[key] = copy.copy(value) + else: + self.items[key] = value + + def get_world_points(self): + """Return the list of points with the frame transformation applied + :returns: List of points in world coordinates + :rtype: np.array + """ + for item in self.items: + if isinstance(self.items[item], RawPointCloud): + return np.hstack( + [ + self.device_pose @ self.items[item][:, :3], + self.items[item][:, 3:4], + self.items[item][:, 4:5] + ] + ) + + +class RawScene: + ''' + RawsScenes are containers for frames + Args: + frames (:class:`RawFrame`): Indexed sequential frame objects composing the scene + ''' + + def __init__(self, frames: List[RawFrame] = []): + if frames is None: + self.frames = [] + else: + self.frames = frames + + self.transformed = False + + def add_frame(self, frame: RawFrame): + self.frames.append(frame) + + def make_transforms_relative(self): + """Make all the frame transform relative to the first transform/frame. This will set the first transform to position (0,0,0) and heading (1,0,0,0)""" + offset = self.frames[0].device_pose.inverse + for frame in self.frames: + frame.device_pose = offset @ frame.device_pose + + def apply_transforms(self, relative: bool = False): + if self.transformed: + raise Exception("apply_transforms was called more than once. Raw Scene has already been transformed!") + + if relative: + self.make_transforms_relative() + + for frame in self.frames: + for item in frame.items: + if isinstance(frame.items[item], RawPointCloud): + frame.items[item].points = np.hstack( + [ + frame.device_pose @ frame.items[item].points[:, :3], + frame.items[item].points[:, 3:4], + frame.items[item].points[:, 4:5], + ] + ) + else: + wct = frame.device_pose @ frame.items[item].pose + frame.items[item].pose = wct + self.transformed = True \ No newline at end of file diff --git a/nucleus/sensorfusion_utils.py b/nucleus/sensorfusion_utils.py new file mode 100644 index 00000000..25e40e76 --- /dev/null +++ b/nucleus/sensorfusion_utils.py @@ -0,0 +1,19 @@ +import open3d as o3d +import numpy as np + +def read_pcd(pcd_filepath): + ''' + Loads in pcd file and returns (N,3) or (N,4) numpy array + + Args: + param pcd_filepath : filepath to local .pcd file + type pcd_filepath: str + + Returns: + point_numpy: (N,4) or (N,3) numpy array of points + type point_numpy: np.array + ''' + point_cloud = o3d.io.read_point_cloud(pcd_filepath) + + point_numpy = np.asarray(point_cloud.points)[:, :4] + return point_numpy \ No newline at end of file diff --git a/nucleus/transform.py b/nucleus/transform.py new file mode 100644 index 00000000..a14e4675 --- /dev/null +++ b/nucleus/transform.py @@ -0,0 +1,275 @@ +import numpy as np +import transforms3d as t3d +from pyquaternion import Quaternion + + +class Transform: + """Transform object represent a rigid transformation matrix (rotation and translation). + + Transform is a 4x4 matrix, although it could be instance using (16,1), (3,4), (3,3) or (3,1) matrices. + **Note**: not all the methods from Transform will work using scaled/small matrixes + + .. highlight:: python + .. code-block:: python + + [ + [ r00, r01, r02, t0], + [ r10, r11, r12, t1], + [ r20, r21, r22, t2], + [ 0, 0, 0, 1] + ] + + """ + + def __init__(self, value=None): + self.matrix = np.eye(4) + + if isinstance(value, Transform): + self.matrix = np.array(value) + + elif isinstance(value, Quaternion): + self.rotation = value.rotation_matrix + + elif value is not None: + value = np.array(value) + if value.shape == (4, 4): + self.matrix = value + + if value.shape == (16,): + self.matrix = np.array(value).reshape((4, 4)) + + elif value.shape == (3, 4): + self.matrix[:3, :4] = value + + elif value.shape == (3, 3): + self.rotation = value + + elif value.shape == (3,): + self.translation = value + + @staticmethod + def from_Rt(R, t): + """Create a transform based on a rotation and a translation components. + + :param R: Rotation matrix or quaternion. + :type R: Quaternion, list + :param t: Translation component + :type t: list + :returns: Transform created based on the components + :rtype: Transform + + """ + if isinstance(R, Quaternion): + R = R.rotation_matrix + return Transform(np.block([[R, np.mat(t).T], [np.zeros(3), 1]])) + + @staticmethod + def from_euler(angles, axes="sxyz", degrees=False): + """Create a transform from euler angles + + :param angles: Values of the rotation per axis + :type angles: list + :param axes: Order of the axis (default ``sxyz``) + :type axes: str + :param degrees: Use degrees or radians values (default ``False`` = radians) + :type degrees: boolean + :returns: Transform created from euler angles + :rtype: Transform + + """ + if degrees: + angles = np.deg2rad(angles) + return Transform(t3d.euler.euler2mat(*angles, axes=axes)) + + @staticmethod + def from_transformed_points(A, B): + """Create a transform from two points + + :param A: Point A (x,y,z) + :type A: list + :param B: Point B (x,y,z) + :type B: list + :returns: Transform created from the angles + :rtype: Transform + + """ + assert A.shape == B.shape + + mean_A = np.mean(A, axis=0) + mean_B = np.mean(B, axis=0) + centroid_A = A - mean_A + centroid_B = B - mean_B + + C = centroid_A.T @ centroid_B + V, S, W = np.linalg.svd(C) + + if (np.linalg.det(V) * np.linalg.det(W)) < 0.0: + S[-1] = -S[-1] + V[:, -1] = -V[:, -1] + + R = V @ W + t = mean_B - mean_A @ R + + return Transform.from_Rt(R.T, t) + + @staticmethod + def random(): + """Create a transform from random rotation and translation + + :returns: Transform created based on the angles + :rtype: Transform + + """ + return Transform.from_Rt(Quaternion.random(), np.random.rand(3)) + + @property + def rotation(self): + """Transform rotation + + :getter: Return transform's rotation + :setter: Set transform rotation, could use a 3x3 matrix or a Quaternion + :type: 3x3 matrix + """ + return self.matrix[:3, :3] + + @property + def quaternion(self): + """Transform rotation as quaternion + + :getter: Return transform's rotation as quaternion + :type: Quaternion + """ + return Quaternion(t3d.quaternions.mat2quat(self.matrix[:3, :3])) + + @rotation.setter + def rotation(self, rotation): + if isinstance(rotation, Quaternion): + rotation = rotation.rotation_matrix + self.matrix[:3, :3] = rotation + + @property + def position(self): + """Transform position/translation + + :getter: Return transform's position + :setter: Set transform's position list(3x1) + :type: list + """ + return self.matrix[:3, 3].flatten() + + @position.setter + def position(self, position): + self.matrix[:3, 3] = np.array(position).reshape(3) + + @property + def translation(self): + """Transform position/translation + + :getter: Return transform's position + :setter: Set transform's position list(3x1) + :type: list + """ + return self.matrix[:3, 3].flatten() + + @translation.setter + def translation(self, translation): + self.matrix[:3, 3] = np.array(translation).reshape(3) + + @property + def euler_angles(self, axes="sxyz"): + """Transform rotation in euler angles + + :getter: Return transform's rotaiton in euler angles + :type: list + """ + return t3d.euler.mat2euler(self.matrix, axes=axes) + + @property + def euler_degrees(self, axes="sxyz"): + """Transform rotation in euler degrees + + :getter: Return transform's rotaiton in euler degrees + :type: list + """ + return np.rad2deg(t3d.euler.mat2euler(self.matrix, axes=axes)) + + @property + def T(self): + """Transpose of the transform + + :returns: Transpose of the transform + :rtype: Transform + + """ + try: + return Transform(self.matrix.T) + except ValueError: + print("Can not transpose the Transform matrix") + + @property + def inverse(self): + """Inverse of the transform + + :returns: Inverse of the transform + :rtype: Transform + + """ + try: + return Transform.from_Rt( + self.rotation.T, np.dot(-self.rotation.T, self.translation) + ) + except ValueError: + print("Can not inverse the Transform matrix") + + def apply(self, points): + """Apply transform to a list of points + + :param points: List of points (N,3) or (N,4) + :returns: List of points witht the transform applied + :rtype: list + + """ + points_4d = np.hstack([points[:, :3], np.ones((points.shape[0], 1))]) + transformed_4d = points_4d.dot(self.matrix.T) + return np.hstack([transformed_4d[:, :3], points[:, 3:]]) + + def interpolate(self, other, factor): + """Interpotation of the transform + + :param other: Transform to interpolate with + :type other: Transform + :param factor: Factor of interpolation + :type factor: float between 0 and 1 + :returns: Transform resulted from the interpolation + :rtype: Transform + + """ + assert 0 <= factor <= 1.0 + other = Transform(other) + return self.from_Rt( + Quaternion.slerp(self.quaternion, other.quaternion, factor), + self.position + factor * (other.position - self.position), + ) + + def __array__(self): + return self.matrix + + def __getitem__(self, values): + return self.matrix.__getitem__(values) + + def __add__(self, other): + return Transform(other) @ self + + def __matmul__(self, other): + if isinstance(other, np.ndarray): + return self.apply(other) + return Transform(self.matrix @ Transform(other).matrix) + + def __eq__(self, other): + return np.allclose(self.matrix, other.matrix) + + def __repr__(self): + return "R=%s t=%s" % ( + np.array_str(self.euler_degrees, precision=3, suppress_small=True), + np.array_str(self.position, precision=3, suppress_small=True), + )