Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add gt frame interpolation function #120

Merged
merged 10 commits into from
Feb 6, 2024
177 changes: 177 additions & 0 deletions perception_eval/perception_eval/common/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from copy import deepcopy
import logging
from typing import Any
from typing import Dict
Expand All @@ -28,9 +29,13 @@
from perception_eval.common.dataset_utils import _sample_to_frame
from perception_eval.common.dataset_utils import _sample_to_frame_2d
from perception_eval.common.evaluation_task import EvaluationTask
from perception_eval.common.geometry import interpolate_hopmogeneous_matrix

Check warning on line 32 in perception_eval/perception_eval/common/dataset.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (hopmogeneous)
from perception_eval.common.geometry import interpolate_object_list
from perception_eval.common.label import LabelConverter
from perception_eval.common.object import DynamicObject
from perception_eval.common.schema import FrameID
from perception_eval.util.math import get_pose_transform_matrix
from pyquaternion import Quaternion
from tqdm import tqdm


Expand Down Expand Up @@ -288,3 +293,175 @@
return None
else:
return ground_truth_now_frame


def get_interpolated_now_frame(
ground_truth_frames: List[FrameGroundTruth],
unix_time: int,
threshold_min_time: int,
) -> Optional[FrameGroundTruth]:
"""Get interpolated ground truth frame in specified unix time.
It searches before and after frames which satisfy the time difference condition and if found both, interpolate them.

Args:
ground_truth_frames (List[FrameGroundTruth]): FrameGroundTruth instance list.
unix_time (int): Unix time [us].
threshold_min_time (int): Min time for unix time difference [us].

Returns:
Optional[FrameGroundTruth]:
The ground truth frame whose unix time is most close to args unix time
from dataset.
If the difference time between unix time parameter and the most close time
ground truth frame is larger than threshold_min_time, return None.

Examples:
>>> ground_truth_frames = load_all_datasets(...)
>>> get_interpolated_now_frame(ground_truth_frames, 1624157578750212, 7500)
<perception_eval.common.dataset.FrameGroundTruth object at 0x7f66040c36a0>
"""
# extract closest two frames
before_frame = None
after_frame = None
dt_before = 0.0
dt_after = 0.0
for ground_truth_frame in ground_truth_frames:
diff_time = unix_time - ground_truth_frame.unix_time
if diff_time >= 0:
before_frame = ground_truth_frame
dt_before = diff_time
else:
after_frame = ground_truth_frame
dt_after = -diff_time
if before_frame is not None and after_frame is not None:
break

# disable frame if time difference is too large
if dt_before > threshold_min_time:
before_frame = None
if dt_after > threshold_min_time:
after_frame = None

# check frame availability
if before_frame is None and after_frame is None:
logging.info(f"No frame is available for interpolation")
return None
elif before_frame is None:
logging.info(f"Only after frame is available for interpolation")
return after_frame
elif after_frame is None:
logging.info(f"Only before frame is available for interpolation")
return before_frame
else:
# do interpolation
return interpolate_ground_truth_frames(before_frame, after_frame, unix_time)


def interpolate_ground_truth_frames(
before_frame: FrameGroundTruth,
after_frame: FrameGroundTruth,
unix_time: int,
):
"""Interpolate ground truth frame with lienear interpolation.

Check warning on line 365 in perception_eval/perception_eval/common/dataset.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (lienear)

Args:
before_frame (FrameGroundTruth): input frame1
after_frame (FrameGroundTruth): input frame2
unix_time (int): target time
"""
# interpolate ego2map
ego2map = interpolate_hopmogeneous_matrix(

Check warning on line 373 in perception_eval/perception_eval/common/dataset.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (hopmogeneous)
before_frame.ego2map, after_frame.ego2map, before_frame.unix_time, after_frame.unix_time, unix_time
)

# TODO: Need refactor for simplicity
# if frame is base_link, need to interpolate with global coordinate
# 1. convert object list to global
before_frame_objects = convert_objects_to_global(before_frame.objects, before_frame.ego2map)
after_frame_objects = convert_objects_to_global(after_frame.objects, after_frame.ego2map)

# 2. interpolate objects
object_list = interpolate_object_list(
before_frame_objects, after_frame_objects, before_frame.unix_time, after_frame.unix_time, unix_time
)
# 3. convert object list to base_link
# object_list = convert_objects_to_base_link(object_list, ego2map)

# interpolate raw data
output_frame = deepcopy(before_frame)
output_frame.ego2map = ego2map
output_frame.objects = object_list
output_frame.unix_time = unix_time
return output_frame


def convert_objects_to_global(
object_list: List[ObjectType],
ego2map: np.ndarray,
) -> List[ObjectType]:
"""Convert object list to global coordinate.

Args:
object_list (List[ObjectType]): object list
ego2map (np.ndarray): ego2map matrix

Returns:
List[ObjectType]: object list in global coordinate
"""
output_object_list = []
for object in object_list:
if object.frame_id == "map":
output_object_list.append(deepcopy(object))
continue
elif object.frame_id == "base_link":
src: np.ndarray = get_pose_transform_matrix(
position=object.state.position,
rotation=object.state.orientation.rotation_matrix,
)
dst: np.ndarray = ego2map.dot(src)
updated_position: np.ndarray = tuple(dst[:3, 3].flatten())
updated_rotation: np.ndarray = Quaternion(dst[:3, :3])
output_object = deepcopy(object)
output_object.state.position = updated_position
output_object.state.orientation = updated_rotation
output_object.frame_id = "map"
output_object_list.append(output_object)
else:
raise NotImplementedError(f"Unexpected frame_id: {object.frame_id}")
return output_object_list


def convert_objects_to_base_link(
object_list: List[ObjectType],
ego2map: np.ndarray,
) -> List[ObjectType]:
"""Convert object list to base_link coordinate.

Args:
object_list (List[ObjectType]): object list
ego2map (np.ndarray): ego2map matrix

Returns:
List[ObjectType]: object list in base_link coordinate
"""
output_object_list = []
for object in object_list:
if object.frame_id == "base_link":
output_object_list.append(deepcopy(object))
continue
elif object.frame_id == "map":
src: np.ndarray = get_pose_transform_matrix(
position=object.state.position,
rotation=object.state.orientation.rotation_matrix,
)
dst: np.ndarray = np.linalg.inv(ego2map).dot(src)
updated_position: np.ndarray = tuple(dst[:3, 3].flatten())
updated_rotation: Quaternion = Quaternion(matrix=dst[:3, :3])
output_object = deepcopy(object)
output_object.state.position = updated_position
output_object.state.orientation = updated_rotation
output_object.frame_id = "base_link"
output_object_list.append(output_object)
else:
raise NotImplementedError(f"Unexpected frame_id: {object.frame_id}")
return output_object_list
197 changes: 197 additions & 0 deletions perception_eval/perception_eval/common/geometry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright 2022 TIER IV, Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from copy import deepcopy
from typing import List
from typing import Tuple
from typing import Union

import numpy as np
from perception_eval.common.object2d import DynamicObject2D
from perception_eval.common.object2d import Roi
from perception_eval.common.object import DynamicObject
from perception_eval.common.object import ObjectState
from perception_eval.common.point import distance_points
from perception_eval.common.point import distance_points_bev
from pyquaternion import Quaternion

# Type aliases
ObjectType = Union[DynamicObject, DynamicObject2D]


def interpolate_hopmogeneous_matrix(

Check warning on line 33 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (hopmogeneous)
YoshiRi marked this conversation as resolved.
Show resolved Hide resolved
matrix_1: np.ndarray, matrix_2: np.ndarray, t1: float, t2: float, t: float
) -> np.ndarray:
"""[summary]
Interpolate the state of object_1 to the time of object_2.
Args:
matrix_1 (np.ndarray): Homogeneous matrix
matrix_2 (np.ndarray): Homogeneous matrix
Returns: np.ndarray: The interpolated state.
"""
assert t1 <= t <= t2
assert matrix_1.shape == matrix_2.shape
assert matrix_1.shape == (4, 4)
R1 = matrix_1[:3, :3]
R2 = matrix_2[:3, :3]
T1 = matrix_1[:3, 3]
T2 = matrix_2[:3, 3]
# interpolation
T = T1 + (T2 - T1) * (t - t1) / (t2 - t1)
q1 = Quaternion(matrix=R1)
q2 = Quaternion(matrix=R2)
q = Quaternion.slerp(q1, q2, (t - t1) / (t2 - t1))
R = q.rotation_matrix
# put them together
matrix = np.eye(4)
matrix[:3, :3] = R
matrix[:3, 3] = T
return matrix


def interpolate_list(list_1: List[float], list_2: List[float], t1: float, t2: float, t: float) -> List[float]:
"""[summary]
Interpolate the state of object_1 to the time of object_2.
Args:
list_1 (List[float]): An object
list_2 (List[float]): An object
Returns: List[float]: The interpolated state.
"""
assert t1 <= t <= t2
assert len(list_1) == len(list_2)
state = []
for i in range(len(list_1)):
state.append(list_1[i] + (list_2[i] - list_1[i]) * (t - t1) / (t2 - t1))
return state


def interpolate_quaternion(quat_1: Quaternion, quat_2: Quaternion, t1: float, t2: float, t: float) -> Quaternion:
"""Interpolate a quaternion between two given times to a specific time."""
assert t1 <= t <= t2
alpha = (t - t1) / (t2 - t1)
interpolated_quat = quat_1.slerp(quat_1, quat_2, alpha)
return interpolated_quat


def interpolate_state(state_1: ObjectState, state_2: ObjectState, t1: float, t2: float, t: float) -> ObjectState:
"""[summary]
Interpolate the state of object_1 to the time of object_2.
Args:
state_1 (np.ndarray): An object
state_2 (np.ndarray): An object
Returns: np.ndarray: The interpolated state.
"""
assert t1 <= t <= t2
# state has position, Orientation, shape, velocity
interp_position = tuple(interpolate_list(state_1.position, state_2.position, t1, t2, t))

Check warning on line 97 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (interp)
YoshiRi marked this conversation as resolved.
Show resolved Hide resolved
interp_orientation = interpolate_quaternion(state_1.orientation, state_2.orientation, t1, t2, t)

Check warning on line 98 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (interp)
interp_shape = state_1.shape # shape will not change

Check warning on line 99 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (interp)
interp_velocity = tuple(interpolate_list(state_1.velocity, state_2.velocity, t1, t2, t))

Check warning on line 100 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (interp)
return ObjectState(
position=interp_position, orientation=interp_orientation, shape=interp_shape, velocity=interp_velocity

Check warning on line 102 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (interp)
)


def interpolate_object_list(
object_list1: List[ObjectType], object_list2: List[ObjectType], t1: float, t2: float, t: float
) -> List[ObjectType]:
"""[summary]
Interpolate object list from time t1 to time t2 to time t.

Args:
object_list1 (List[ObjectType]): _description_
object_list2 (List[ObjectType]): _description_
t1 (float): _description_
t2 (float): _description_
t (float): _description_

Returns:
List[ObjectType]: _description_
"""
assert t1 <= t <= t2
output_object_list = []
id_list = []
for object1 in object_list1:
found: bool = False
for object2 in object_list2:
if object1.uuid == object2.uuid:
output_object_list.append(interpolate_object(object1, object2, t1, t2, t))
id_list.append(object1.uuid)
found = True
break
# not found in object_list2
if not found:
output_object_list.append(deepcopy(object1))
id_list.append(object1.uuid)

for object2 in object_list2:
if object2.uuid not in id_list:
output_object_list.append(deepcopy(object2))
id_list.append(object2.uuid)

return output_object_list


def interpolate_object(object_1: ObjectType, object_2: ObjectType, t1: float, t2: float, t: float) -> ObjectType:
"""[summary]
Interpolate the state of object_1 to the time of object_2.
Args:
object_1 (ObjectType): An object
object_2 (ObjectType): An object
Returns: ObjectType: The interpolated object.
"""
if type(object_1) != type(object_2):
raise TypeError(f"objects' type must be same, but got {type(object_1) and {type(object_2)}}")

if isinstance(object_1, DynamicObject):
return interpolate_dynamicobject(object_1, object_2, t1, t2, t)

Check warning on line 158 in perception_eval/perception_eval/common/geometry.py

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (dynamicobject)
YoshiRi marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(object_1, DynamicObject2D):
return interpolate_dynamicobject2d(object_1, object_2, t1, t2, t)
else:
raise TypeError(f"object type must be DynamicObject or DynamicObject2D, but got {type(object_1)}")


def interpolate_dynamicobject(
object_1: DynamicObject, object_2: DynamicObject, t1: float, t2: float, t: float
) -> DynamicObject:
"""[summary]
Interpolate the state of object_1 to the time of object_2.
Args:
object_1 (DynamicObject): An object
object_2 (DynamicObject): An object
Returns: DynamicObject: The interpolated object.
"""
assert t1 <= t <= t2
assert object_1.uuid == object_2.uuid
# 面倒なので基本的にcopyで済ます
# TODO: 他の要素も補間する
output_object = deepcopy(object_1)
interp_state = interpolate_state(object_1.state, object_2.state, t1, t2, t)
output_object.state = interp_state
output_object.unix_time = int(t)
return output_object


def interpolate_dynamicobject2d(
object_1: DynamicObject2D, object_2: DynamicObject2D, t1: float, t2: float, t: float
) -> DynamicObject2D:
"""[summary]
Interpolate the state of object_1 to the time of object_2.
Args:
object_1 (DynamicObject2D): An object
object_2 (DynamicObject2D): An object
Returns: DynamicObject2D: The interpolated object.
"""
# TODO: implement
pass
ktro2828 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading