diff --git a/.github/.cspell-addition.json b/.github/.cspell-addition.json index 09767f77..9b75a4eb 100644 --- a/.github/.cspell-addition.json +++ b/.github/.cspell-addition.json @@ -2,6 +2,8 @@ "words": [ "allclose", "APHs", + "argmax", + "argmin", "astype", "AWML", "autoware", diff --git a/.vscode/settings.json b/.vscode/settings.json index 7b0896a8..6396abf7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -24,6 +24,8 @@ "allclose", "APHs", "arange", + "argmax", + "argmin", "argsort", "astype", "AUTOWARE", diff --git a/perception_eval/perception_eval/common/object.py b/perception_eval/perception_eval/common/object.py index 67d269b9..c2a81a41 100644 --- a/perception_eval/perception_eval/common/object.py +++ b/perception_eval/perception_eval/common/object.py @@ -19,12 +19,15 @@ from typing import List from typing import Optional from typing import Tuple -from typing import Union import numpy as np from perception_eval.common.label import AutowareLabel from perception_eval.common.point import distance_points from perception_eval.common.point import distance_points_bev +from perception_eval.common.state import ObjectPath +from perception_eval.common.state import ObjectState +from perception_eval.common.state import set_object_paths +from perception_eval.common.state import set_object_states from perception_eval.common.status import Visibility from pyquaternion import Quaternion from shapely.geometry import Polygon @@ -32,72 +35,6 @@ logger = getLogger(__name__) -class ObjectState: - """[summary] - Object state class - """ - - def __init__( - self, - position: Tuple[float, float, float], - orientation: Quaternion, - size: Tuple[float, float, float], - velocity: Tuple[float, float, float], - ) -> None: - """ - Args: - position (Tuple[float, float, float]) : center_x, center_y, center_z [m] - orientation (Quaternion) : Quaternion class. - See reference http://kieranwynn.github.io/pyquaternion/ - size (Tuple[float, float, float]): bounding box size of (wx, wy, wz) [m] - velocity (Tuple[float, float, float]): velocity of (vx, vy, vz) [m/s] - """ - - self.position: Tuple[float, float, float] = position - self.orientation: Quaternion = orientation - self.size: Tuple[float, float, float] = size - self.velocity: Tuple[float, float, float] = velocity - - -class ObjectPath: - """[summary]""" - - def __init__(self, states: List[ObjectState], confidence: float) -> None: - self.states: List[ObjectState] = states - self.confidence: float = confidence - - def __getitem__(self, idx: int) -> Union[ObjectState, List[ObjectState]]: - """[summary] - Returns Nth state. - - Args: - idx (int) - - Returns: - Union[ObjectState, List[ObjectState]] - """ - return self.states[idx] - - def __iter__(self) -> ObjectPath: - self.__n = 0 - return self - - def __next__(self): - if self.__n < len(self): - self.__n += 1 - return self[self.__n - 1] - raise StopIteration - - def __len__(self) -> int: - """[summary] - Returns length of states. - - Returns: - int: length of states. - """ - return len(self.states) - - class DynamicObject: """ Dynamic object class @@ -344,9 +281,9 @@ def get_position_error( """ if other is None: return None - err_x: float = abs(other.state.position[0] - self.state.position[0]) - err_y: float = abs(other.state.position[1] - self.state.position[1]) - err_z: float = abs(other.state.position[2] - self.state.position[2]) + err_x: float = other.state.position[0] - self.state.position[0] + err_y: float = other.state.position[1] - self.state.position[1] + err_z: float = other.state.position[2] - self.state.position[2] return (err_x, err_y, err_z) def get_heading_error( @@ -399,12 +336,50 @@ def get_velocity_error( if self.state.velocity is None or other.state.velocity is None: return None - err_vx: float = abs(other.state.velocity[0] - self.state.velocity[0]) - err_vy: float = abs(other.state.velocity[1] - self.state.velocity[1]) - err_vz: float = abs(other.state.velocity[2] - self.state.velocity[2]) + err_vx: float = other.state.velocity[0] - self.state.velocity[0] + err_vy: float = other.state.velocity[1] - self.state.velocity[1] + err_vz: float = other.state.velocity[2] - self.state.velocity[2] return err_vx, err_vy, err_vz + def get_path_error( + self, + other: Optional[DynamicObject], + num_waypoints: Optional[int] = None, + padding: float = np.nan, + ) -> Optional[np.ndarray]: + """[summary] + Returns errors of path as numpy.ndarray. + + Args: + other (Optional[DynamicObject]): DynamicObject instance. + num_waypoints (optional[int]): Number of waypoints. Defaults to None. + padding (float): Padding value. Defaults to numpy.nan. + + Returns: + numpy.ndarray: in shape (K, T, 3) + """ + if other is None: + return None + + self_paths: List[ObjectPath] = self.predicted_paths.copy() + other_paths: List[ObjectPath] = other.predicted_paths.copy() + + path_errors: List[List[List[float]]] = [] + for self_path, other_path in zip(self_paths, other_paths): + if self_path is None or other_path is None: + continue + num_waypoints_ = ( + num_waypoints if num_waypoints else min(len(self_path), len(other_path)) + ) + self_path_, other_path_ = self_path[:num_waypoints_], other_path[:num_waypoints_] + err: List[Tuple[float, float, float]] = [ + self_state.get_position_error(other_state) + for self_state, other_state in zip(self_path_, other_path_) + ] + path_errors.append(err) + return np.array(path_errors) + def get_area_bev(self) -> float: """[summary] Get area of object BEV. @@ -547,82 +522,3 @@ def distance_objects_bev(object_1: DynamicObject, object_2: DynamicObject) -> fl Returns: float: The 2d center distance from object_1 to object_2. """ return distance_points_bev(object_1.state.position, object_2.state.position) - - -def set_object_states( - positions: Optional[List[Tuple[float, float, float]]] = None, - orientations: Optional[List[Quaternion]] = None, - sizes: Optional[List[Tuple[float, float, float]]] = None, - velocities: Optional[List[Tuple[float, float, float]]] = None, -) -> Optional[ObjectPath]: - """[summary] - Set list of object states. - """ - if positions is None or orientations is None: - return None - - assert len(positions) == len(orientations), ( - "length of positions and orientations must be same, " - f"but got {len(positions)} and {len(orientations)}" - ) - states: List[ObjectState] = [] - for i, (pos, orient) in enumerate(zip(positions, orientations)): - states.append( - ObjectState( - position=pos, - orientation=orient, - size=sizes[i] if sizes else None, - velocity=velocities[i] if velocities else None, - ) - ) - return states - - -def set_object_path( - positions: Optional[List[Tuple[float, float, float]]] = None, - orientations: Optional[List[Quaternion]] = None, - sizes: Optional[List[Tuple[float, float, float]]] = None, - velocities: Optional[List[Tuple[float, float, float]]] = None, - confidence: Optional[float] = None, -) -> Optional[ObjectPath]: - """[summary] - Set single path. - """ - states: Optional[List[ObjectState]] = set_object_states( - positions=positions, - orientations=orientations, - velocities=velocities, - sizes=sizes, - ) - return ObjectPath(states, confidence) if states else None - - -def set_object_paths( - positions: Optional[List[List[Tuple[float, float, float]]]] = None, - orientations: Optional[List[List[Quaternion]]] = None, - sizes: Optional[List[List[Tuple[float, float, float]]]] = None, - velocities: Optional[List[List[Tuple[float, float, float]]]] = None, - confidences: Optional[List[float]] = None, -) -> Optional[List[ObjectPath]]: - """[summary] - Set multiple paths. - """ - if positions is None or orientations is None: - return None - - assert len(positions) == len(orientations), ( - "length of positions and orientations must be same, " - f"but got {len(positions)} and {len(orientations)}" - ) - paths: List[ObjectPath] = [] - for i, (poses, orients) in enumerate(zip(positions, orientations)): - paths.append( - set_object_path( - positions=poses, - orientations=orients, - velocities=velocities[i] if velocities else None, - sizes=sizes[i] if sizes else None, - confidence=confidences[i] if confidences else None, - ) - ) - return paths diff --git a/perception_eval/perception_eval/common/state.py b/perception_eval/perception_eval/common/state.py new file mode 100644 index 00000000..a51add32 --- /dev/null +++ b/perception_eval/perception_eval/common/state.py @@ -0,0 +1,226 @@ +# Copyright 2022 TIER IV, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +from pyquaternion import Quaternion + + +class ObjectState: + """[summary] + Object state class + """ + + def __init__( + self, + position: Tuple[float, float, float], + orientation: Quaternion, + size: Optional[Tuple[float, float, float]] = None, + velocity: Optional[Tuple[float, float, float]] = None, + ) -> None: + """ + Args: + position (Tuple[float, float, float]) : center_x, center_y, center_z [m] + orientation (Quaternion) : Quaternion class. + See reference http://kieranwynn.github.io/pyquaternion/ + size (Tuple[float, float, float]): bounding box size of (wx, wy, wz) [m] + velocity (Tuple[float, float, float]): velocity of (vx, vy, vz) [m/s] + """ + + self.position: Tuple[float, float, float] = position + self.orientation: Quaternion = orientation + self.size: Tuple[float, float, float] = size + self.velocity: Tuple[float, float, float] = velocity + + def get_position_error(self, other: ObjectState) -> Tuple[float, float, float]: + """[summary] + Returns the position error between other and itself. + + Args: + other (ObjectState): Other state. + + Returns: + Tuple[float, float, float]: Position error (x, y, z). + """ + return ( + other.position[0] - self.position[0], + other.position[1] - self.position[1], + other.position[2] - self.position[2], + ) + + def get_size_error(self, other: ObjectState) -> Tuple[float, float, float]: + """[summary] + Returns the size error between other and itself. + + Args: + other (ObjectState): Other state. + + Returns: + Tuple[float, float, float]: Size error (w, l, h). + """ + return ( + other.size[0] - self.size[0], + other.size[1] - self.size[1], + other.size[2] - self.size[2], + ) + + def get_velocity_error(self, other: ObjectState) -> Tuple[float, float, float]: + """[summary] + Returns the velocity error between other and itself. + + Args: + other (ObjectState): Other state. + + Returns: + Tuple[float, float, float]: Velocity error (vx, vy, vz). + """ + return ( + other.velocity[0] - self.velocity[0], + other.velocity[1] - self.velocity[1], + other.velocity[2] - self.velocity[2], + ) + + +class ObjectPath: + """[summary]""" + + def __init__(self, states: List[ObjectState], confidence: float) -> None: + self.states: List[ObjectState] = states + self.confidence: float = confidence + + def __getitem__(self, idx: int) -> Union[ObjectState, List[ObjectState]]: + """[summary] + Returns Nth state. + + Args: + idx (int) + + Returns: + Union[ObjectState, List[ObjectState]] + """ + return self.states[idx] + + def __iter__(self) -> ObjectPath: + self.__n = 0 + return self + + def __next__(self): + if self.__n < len(self): + self.__n += 1 + return self[self.__n - 1] + raise StopIteration + + def __len__(self) -> int: + """[summary] + Returns length of states. + + Returns: + int: length of states. + """ + return len(self.states) + + +def set_object_states( + positions: Optional[List[Tuple[float, float, float]]] = None, + orientations: Optional[List[Quaternion]] = None, + sizes: Optional[List[Tuple[float, float, float]]] = None, + velocities: Optional[List[Tuple[float, float, float]]] = None, +) -> Optional[List[ObjectState]]: + """[summary] + Set list of object states. + + Args: + positions (Optional[List[Tuple[float]]]) + orientations (Optional[List[Tuple[float]]]) + sizes (Optional[List[Tuple[float]]]) + velocities (Optional[List[Tuple[float]]]) + + Returns: + Optional[List[ObjectState]] + """ + if positions is None or orientations is None: + return None + + if len(positions) != len(orientations): + raise RuntimeError( + "length of positions and orientations must be same, " + f"but got {len(positions)} and {len(orientations)}" + ) + + return [ + ObjectState( + position=pos, + orientation=orient, + size=sizes[i] if sizes else None, + velocity=velocities[i] if velocities else None, + ) + for i, (pos, orient) in enumerate(zip(positions, orientations)) + ] + + +def set_object_path( + positions: Optional[List[Tuple[float, float, float]]] = None, + orientations: Optional[List[Quaternion]] = None, + sizes: Optional[List[Tuple[float, float, float]]] = None, + velocities: Optional[List[Tuple[float, float, float]]] = None, + confidence: Optional[float] = None, +) -> ObjectPath: + """[summary] + Set single path. + """ + states: Optional[List[ObjectState]] = set_object_states( + positions=positions, + orientations=orientations, + velocities=velocities, + sizes=sizes, + ) + return ObjectPath(states, confidence) if states else None + + +def set_object_paths( + positions: Optional[List[List[Tuple[float, float, float]]]] = None, + orientations: Optional[List[List[Quaternion]]] = None, + sizes: Optional[List[List[Tuple[float, float, float]]]] = None, + velocities: Optional[List[List[Tuple[float, float, float]]]] = None, + confidences: Optional[List[float]] = None, +) -> Optional[List[ObjectPath]]: + """[summary] + Set multiple paths. + """ + if positions is None or orientations is None or confidences is None: + return None + + if len(positions) != len(orientations) != len(confidences): + raise RuntimeError( + "length of positions and orientations must be same, " + f"but got {len(positions)} and {len(orientations)}" + ) + + paths: List[ObjectPath] = [] + for i, (poses, orients, confidence) in enumerate(zip(positions, orientations, confidences)): + path = set_object_path( + positions=poses, + orientations=orients, + velocities=velocities[i] if velocities else None, + sizes=sizes[i] if sizes else None, + confidence=confidence, + ) + if path: + paths.append(path) + return paths diff --git a/perception_eval/perception_eval/evaluation/metrics/metrics.py b/perception_eval/perception_eval/evaluation/metrics/metrics.py index 183c6672..411ec889 100644 --- a/perception_eval/perception_eval/evaluation/metrics/metrics.py +++ b/perception_eval/perception_eval/evaluation/metrics/metrics.py @@ -19,6 +19,9 @@ from perception_eval.evaluation.matching.object_matching import MatchingMode from perception_eval.evaluation.metrics.detection.map import Map from perception_eval.evaluation.metrics.metrics_score_config import MetricsScoreConfig +from perception_eval.evaluation.metrics.prediction.prediction_metrics_score import ( + PredictionMetricsScore, +) from perception_eval.evaluation.metrics.tracking.tracking_metrics_score import TrackingMetricsScore from perception_eval.evaluation.result.object_result import DynamicObjectWithPerceptionResult @@ -54,7 +57,7 @@ def __init__( self.maps: List[Map] = [] # tracking metrics scores for each matching method self.tracking_scores: List[TrackingMetricsScore] = [] - # TODO: prediction metrics scores for each matching method + # prediction metrics scores for each matching method self.prediction_scores: List = [] self.__num_frame: int = len(used_frame) @@ -86,7 +89,9 @@ def __str__(self) -> str: for track_score in self.tracking_scores: str_ += str(track_score) - # TODO: prediction + # prediction + for pred_score in self.prediction_scores: + str_ += str(pred_score) return str_ @@ -223,4 +228,41 @@ def evaluate_prediction( Args: object_results (List[DynamicObjectWithPerceptionResult]): The list of object result """ - pass + self.__num_gt += sum(num_ground_truth.values()) + + for distance_threshold_ in self.prediction_config.center_distance_thresholds: + prediction_score_ = PredictionMetricsScore( + object_results_dict=object_results, + num_ground_truth_dict=num_ground_truth, + target_labels=self.prediction_config.target_labels, + matching_mode=MatchingMode.CENTERDISTANCE, + matching_threshold_list=distance_threshold_, + ) + self.prediction_scores.append(prediction_score_) + for iou_threshold_bev_ in self.prediction_config.iou_bev_thresholds: + prediction_score_ = PredictionMetricsScore( + object_results_dict=object_results, + num_ground_truth_dict=num_ground_truth, + target_labels=self.prediction_config.target_labels, + matching_mode=MatchingMode.IOUBEV, + matching_threshold_list=iou_threshold_bev_, + ) + self.prediction_scores.append(prediction_score_) + for iou_threshold_3d_ in self.prediction_config.iou_3d_thresholds: + prediction_score_ = PredictionMetricsScore( + object_results_dict=object_results, + num_ground_truth_dict=num_ground_truth, + target_labels=self.prediction_config.target_labels, + matching_mode=MatchingMode.IOU3D, + matching_threshold_list=iou_threshold_3d_, + ) + self.prediction_scores.append(prediction_score_) + for plane_distance_threshold_ in self.prediction_config.plane_distance_thresholds: + prediction_score_ = PredictionMetricsScore( + object_results_dict=object_results, + num_ground_truth_dict=num_ground_truth, + target_labels=self.prediction_config.target_labels, + matching_mode=MatchingMode.PLANEDISTANCE, + matching_threshold_list=plane_distance_threshold_, + ) + self.prediction_scores.append(prediction_score_) diff --git a/perception_eval/perception_eval/evaluation/metrics/prediction/__init__.py b/perception_eval/perception_eval/evaluation/metrics/prediction/__init__.py index 9be33190..89eb1d13 100644 --- a/perception_eval/perception_eval/evaluation/metrics/prediction/__init__.py +++ b/perception_eval/perception_eval/evaluation/metrics/prediction/__init__.py @@ -11,3 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from .path_displacement_error import PathDisplacementError +from .soft_ap import SoftAp + +__all__ = ["PathDisplacementError", "SoftAp"] diff --git a/perception_eval/perception_eval/evaluation/metrics/prediction/path_displacement_error.py b/perception_eval/perception_eval/evaluation/metrics/prediction/path_displacement_error.py new file mode 100644 index 00000000..5d4a02dc --- /dev/null +++ b/perception_eval/perception_eval/evaluation/metrics/prediction/path_displacement_error.py @@ -0,0 +1,158 @@ +# Copyright 2022 TIER IV, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List +from typing import Optional + +import numpy as np +from perception_eval.common.label import AutowareLabel +from perception_eval.common.threshold import get_label_threshold +from perception_eval.evaluation.matching.object_matching import MatchingMode +from perception_eval.evaluation.metrics.prediction.utils import prepare_path +from perception_eval.evaluation.result.object_result import DynamicObjectWithPerceptionResult + + +class PathDisplacementError: + """[summary] + A class to calculate path displacement errors for motion prediction task. + + Support metrics: + ADE; Average Displacement Error + FDE; Final Displacement Error + Miss Rate + Soft mAP + + Attributes: + self.ade (float) + self.fde (float) + self.miss_rate (float) + self.num_ground_truth (int) + self.target_labels (List[AutowareLabel]) + self.matching_mode (MatchingMode) + self.matching_threshold_list (List[float]) + self.top_k (Optional[int]) + self.num_waypoints (int): Number of waypoints of path. Defaults to 10. + self.miss_tolerance (float): Tolerance value for miss rate. + """ + + def __init__( + self, + object_results: List[DynamicObjectWithPerceptionResult], + num_ground_truth: int, + target_labels: List[AutowareLabel], + matching_mode: MatchingMode, + matching_threshold_list: List[float], + top_k: int = 1, + num_waypoints: int = 10, + miss_tolerance: float = 2.0, + kernel: Optional[str] = None, + ) -> None: + """[summary] + + Args: + object_results (List[DynamicObjectWithPerceptionResult]): + num_ground_truth (int): + target_labels (List[AutowareLabel]): + matching_mode (MatchingMode): + matching_threshold_list (List[float]) + top_k (int): Number of top kth confidential paths to be evaluated. Defaults to 1. + num_waypoints (int): Number of horizontal frames. Defaults to 10. + miss_tolerance (float): Tolerance value to determine miss[m]. Defaults to 2.0. + kernel (Optional[str]): Target error kernel, min, max or None. Defaults to None. + If it is specified, select the mode that total error is the smallest or largest. + Otherwise, evaluate all modes. + """ + self.num_ground_truth: int = num_ground_truth + self.target_labels: List[AutowareLabel] = target_labels + self.matching_mode: MatchingMode = matching_mode + self.matching_threshold_list: List[float] = matching_threshold_list + self.top_k: Optional[int] = top_k + self.num_waypoints: Optional[int] = num_waypoints + self.miss_tolerance: float = miss_tolerance + self.kernel: Optional[str] = kernel + + all_object_results: List[DynamicObjectWithPerceptionResult] = [] + if len(object_results) == 0 or not isinstance(object_results[0], list): + all_object_results = object_results + else: + for obj_results in object_results: + all_object_results += obj_results + self.objects_results_num: int = len(all_object_results) + + self.ade, self.fde, self.miss_rate = self._calculate_displacement_error( + all_object_results, + kernel=kernel, + ) + + def _calculate_displacement_error( + self, + object_results: List[DynamicObjectWithPerceptionResult], + kernel: Optional[str] = None, + ) -> np.ndarray: + """[summary] + Returns the displacement error. + + Args: + object_results (List[DynamicObjectWithPerceptionResult]): List of DynamicObjectWithPerceptionResult. + kernel (Optional[str]): Target error kernel, min, max or None. Defaults to None. + If it is specified, select the mode that total error is the smallest or largest. + Otherwise, evaluate all modes. + + Returns: + ade (float): Average Displacement Error; ADE. + fde (float): Finale Displacement Error; FDE. + miss_rate (float): Miss rate. + """ + sum_ade, sum_fde, sum_miss = 0.0, 0.0, 0.0 + num_ade, num_fde, num_miss = 0, 0, 0 + for obj_result in object_results: + matching_threshold: float = get_label_threshold( + semantic_label=obj_result.estimated_object.semantic_label, + target_labels=self.target_labels, + threshold_list=self.matching_threshold_list, + ) + if not obj_result.is_result_correct( + matching_mode=self.matching_mode, + matching_threshold=matching_threshold, + ): + continue + estimation, ground_truth = prepare_path(obj_result, self.top_k) + # (K, T, 3) + err: np.ndarray = estimation.get_path_error( + ground_truth, + self.num_waypoints, + ) + + if len(err) == 0: + continue + + # NOTE: K, T is different for each agent + distances: np.ndarray = np.linalg.norm(err[:, :, :2], axis=-1) + if kernel == "min": + distances = distances[np.argmin(distances.sum(axis=1))].reshape(1, -1) + elif kernel == "max": + distances = distances[np.argmax(distances.sum(axis=1))].reshape(1, -1) + + sum_ade += distances.sum() + num_ade += distances.size + sum_fde += distances[:, -1].sum() + num_fde += distances[:, -1].size + sum_miss += (self.miss_tolerance <= distances).sum() + num_miss += distances.size + + ade: float = sum_ade / num_ade if 0 < num_ade else np.nan + fde: float = sum_fde / num_fde if 0 < num_fde else np.nan + miss_rate: float = sum_miss / num_miss if 0 < num_miss else np.nan + + return ade, fde, miss_rate diff --git a/perception_eval/perception_eval/evaluation/metrics/prediction/prediction_metrics_score.py b/perception_eval/perception_eval/evaluation/metrics/prediction/prediction_metrics_score.py new file mode 100644 index 00000000..f1f1cd52 --- /dev/null +++ b/perception_eval/perception_eval/evaluation/metrics/prediction/prediction_metrics_score.py @@ -0,0 +1,142 @@ +# Copyright 2022 TIER IV, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict +from typing import List + +import numpy as np +from perception_eval.common.label import AutowareLabel +from perception_eval.evaluation.matching.object_matching import MatchingMode +from perception_eval.evaluation.metrics.prediction import PathDisplacementError +from perception_eval.evaluation.metrics.prediction import SoftAp +from perception_eval.evaluation.result.object_result import DynamicObjectWithPerceptionResult + + +class PredictionMetricsScore: + """Metrics score for prediction. + + Attributes: + self.target_labels (List[AutowareLabel]): List of target labels. + self.matching_mode (MatchingMode): MatchingMode instance. + self.displacements (List[PathDisplacementError]): List of PathDisplacementError instances. + self.soft_aps (SoftAp): List of SoftAp instances. + self.ade (float): Average Displacement Error;ADE score. + self.fde (float): Final Displacement Error;FDE score. + self.miss_rate (float): Miss rate. + self.soft_map (float): Soft mAP score. + """ + + def __init__( + self, + object_results_dict: Dict[AutowareLabel, List[List[DynamicObjectWithPerceptionResult]]], + num_ground_truth_dict: Dict[AutowareLabel, int], + target_labels: List[AutowareLabel], + matching_mode: MatchingMode, + matching_threshold_list: List[float], + ) -> None: + """[summary] + Args: + object_results_dict (Dict[AutowareLabel, List[List[DynamicObjectWithPerceptionResult]]): + object results divided by label for multi frame. + num_ground_truth (int): The number of ground truth. + target_labels (List[AutowareLabel]): e.g. ["car", "pedestrian", "bus"] + matching_mode (MatchingMode): The target matching mode. + matching_threshold_list (List[float]): The list of matching threshold for each category. (e.g. [0.5, 0.3, 0.5]) + """ + assert len(target_labels) == len(matching_threshold_list) + self.target_labels: List[AutowareLabel] = target_labels + self.matching_mode: MatchingMode = matching_mode + + self.displacements: List[PathDisplacementError] = [] + self.soft_aps: List[SoftAp] = [] + for target_label, matching_threshold in zip(target_labels, matching_threshold_list): + object_results = object_results_dict[target_label] + num_ground_truth = num_ground_truth_dict[target_label] + displacement_err = PathDisplacementError( + object_results=object_results, + num_ground_truth=num_ground_truth, + target_labels=[target_label], + matching_mode=self.matching_mode, + matching_threshold_list=[matching_threshold], + ) + soft_ap = SoftAp( + object_results=object_results, + num_ground_truth=num_ground_truth, + target_labels=[target_label], + matching_mode=self.matching_mode, + matching_threshold_list=[matching_threshold], + ) + self.displacements.append(displacement_err) + self.soft_aps.append(soft_ap) + self._summarize_score() + + def _summarize_score(self) -> None: + """[summary] + Summarize score + """ + ade_list, fde_list, miss_list = [], [], [] + for err in self.displacements: + if ~np.isnan(err.ade): + ade_list.append(err.ade) + if ~np.isnan(err.fde): + fde_list.append(err.fde) + if ~np.isnan(err.miss_rate): + miss_list.append(err.miss_rate) + ap_list: List[float] = [ap.ap for ap in self.soft_aps if ~np.isnan(ap.ap)] + + self.ade: float = np.mean(ade_list) if 0 < len(ade_list) else np.nan + self.fde: float = np.mean(fde_list) if 0 < len(fde_list) else np.nan + self.miss_rate: float = np.mean(miss_list) if 0 < len(miss_list) else np.nan + self.soft_map = np.mean(ap_list) if 0 < len(ap_list) else np.nan + + def __str__(self) -> str: + """__str__ method""" + + str_: str = "\n" + str_ += f"ADE: {self.ade:.3f}, FDE: {self.fde:.3f}, Miss Rate: {self.miss_rate:.3f}, Soft mAP: {self.soft_map:.3f}" + str_ += f"({self.matching_mode.value})\n" + # Table + str_ += "\n" + # label + str_ += "| Label |" + target_str: str + for err in self.displacements: + str_ += f" {err.target_labels[0].value}({err.matching_threshold_list[0]}) | " + str_ += "\n" + str_ += "| :--------: |" + for err in self.displacements: + str_ += " :---: |" + str_ += "\n" + str_ += "| Predict_num |" + for err in self.displacements: + str_ += f" {err.objects_results_num} |" + str_ += "\n" + str_ += "| ADE |" + for err in self.displacements: + str_ += f" {err.ade:.3f} | " + str_ += "\n" + str_ += "| FDE |" + for err in self.displacements: + str_ += f" {err.fde:.3f} | " + str_ += "\n" + str_ += "| Miss Rate |" + for err in self.displacements: + str_ += f" {err.miss_rate:.3f} | " + str_ += "\n" + str_ += "| Soft AP |" + for ap in self.soft_aps: + str_ += f" {ap.ap:.3f} | " + str_ += "\n" + + return str_ diff --git a/perception_eval/perception_eval/evaluation/metrics/prediction/soft_ap.py b/perception_eval/perception_eval/evaluation/metrics/prediction/soft_ap.py new file mode 100644 index 00000000..165c2063 --- /dev/null +++ b/perception_eval/perception_eval/evaluation/metrics/prediction/soft_ap.py @@ -0,0 +1,222 @@ +# Copyright 2022 TIER IV, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from typing import List +from typing import Optional +from typing import Tuple + +import numpy as np +from perception_eval.common.label import AutowareLabel +from perception_eval.common.threshold import get_label_threshold +from perception_eval.evaluation.matching.object_matching import MatchingMode +from perception_eval.evaluation.metrics.prediction.utils import prepare_path +from perception_eval.evaluation.result.object_result import DynamicObjectWithPerceptionResult + + +class SoftAp: + """[summary] + A class to calculate path displacement errors for motion prediction task. + + Support metrics: + ADE; Average Displacement Error + FDE; Final Displacement Error + Miss Rate + + Attributes: + self.num_ground_truth (int) + self.target_labels (List[AutowareLabel]) + self.matching_mode (MatchingMode) + self.matching_threshold_list (List[float]) + self.top_k (Optional[int]) + self.num_waypoints (int): Number of waypoints of path. Defaults to 10. + self.miss_tolerance (float): Tolerance value for miss rate. + """ + + def __init__( + self, + object_results: List[DynamicObjectWithPerceptionResult], + num_ground_truth: int, + target_labels: List[AutowareLabel], + matching_mode: MatchingMode, + matching_threshold_list: List[float], + top_k: Optional[int] = 1, + num_waypoints: Optional[int] = 10, + miss_tolerance: float = 2.0, + kernel: Optional[str] = None, + ) -> None: + """[summary] + + Args: + object_results (List[DynamicObjectWithPerceptionResult]): + num_ground_truth (int): + target_labels (List[AutowareLabel]): + matching_mode (MatchingMode): + matching_threshold_list (List[float]) + num_path_frames (int): Number of horizontal frames. Defaults to 10[frames]. + top_k (Optional[int]): Number of top kth confidential paths. If None, calculate all paths. Defaults to None. + miss_tolerance (float): Tolerance value to determine miss. Defaults to 2.0[m]. + """ + + self.num_ground_truth: int = num_ground_truth + + self.target_labels: List[AutowareLabel] = target_labels + self.matching_mode: MatchingMode = matching_mode + self.matching_threshold_list: List[float] = matching_threshold_list + self.top_k: Optional[int] = top_k + self.num_waypoints: Optional[int] = num_waypoints + self.miss_tolerance: float = miss_tolerance + + all_object_results: List[DynamicObjectWithPerceptionResult] = [] + if len(object_results) == 0 or not isinstance(object_results[0], list): + all_object_results = object_results + else: + for obj_results in object_results: + all_object_results += obj_results + self.objects_results_num: int = len(all_object_results) + + # Calculate SoftAP + tp_list = self._calculate_path_tp(all_object_results, kernel=kernel) + self.ap = self._calculate_soft_ap(tp_list) + + def _calculate_path_tp( + self, + object_results: List[DynamicObjectWithPerceptionResult], + kernel: Optional[str] = None, + ): + tp_list: List[float] = [0.0 for _ in range(self.objects_results_num)] + for i, object_result in enumerate(object_results): + matching_threshold: float = get_label_threshold( + semantic_label=object_result.estimated_object.semantic_label, + target_labels=self.target_labels, + threshold_list=self.matching_threshold_list, + ) + if not object_result.is_result_correct( + matching_mode=self.matching_mode, + matching_threshold=matching_threshold, + ): + continue + estimation, ground_truth = prepare_path(object_result, self.top_k) + err: np.ndarray = estimation.get_path_error(ground_truth, self.num_waypoints) + + if len(err) == 0: + continue + + distances: np.ndarray = np.linalg.norm(err[:, :, :2], axis=-1) + if kernel == "min": + distances = distances[np.argmin(distances.sum(axis=1))].reshape(1, -1) + elif kernel == "max": + distances = distances[np.argmax(distances.sum(axis=1))].reshape(1, -1) + + if (distances < self.miss_tolerance).all(): + tp_list[i] = 1.0 + + return np.cumsum(tp_list).tolist() + + def _calculate_precision_recall_list( + self, + tp_list: List[float], + ) -> Tuple[List[float], List[float]]: + """[summary] + Calculate precision recall. + + Returns: + Tuple[List[float], List[float]]: tp_list and fp_list + + Example: + state + tp_list = [1, 1, 2, 3] + fp_list = [0, 1, 1, 1] + return + precision_list = [1.0, 0.5, 0.67, 0.75] + recall_list = [0.25, 0.25, 0.5, 0.75] + """ + precisions_list: List[float] = [0.0 for _ in range(len(tp_list))] + recalls_list: List[float] = [0.0 for _ in range(len(tp_list))] + + for i in range(len(precisions_list)): + precisions_list[i] = float(tp_list[i]) / (i + 1) + if self.num_ground_truth > 0: + recalls_list[i] = float(tp_list[i]) / self.num_ground_truth + else: + recalls_list[i] = 0.0 + + return precisions_list, recalls_list + + def _calculate_soft_ap(self, tp_list: List[float]) -> float: + """[summary] + Calculate AP (average precision) + + Args: + precision_list (List[float]): The list of precision + recall_list (List[float]): The list of recall + + Returns: + float: AP + + Example: + precision_list = [1.0, 0.5, 0.67, 0.75] + recall_list = [0.25, 0.25, 0.5, 0.75] + + max_precision_list: List[float] = [0.75, 1.0, 1.0] + max_precision_recall_list: List[float] = [0.75, 0.25, 0.0] + + ap = 0.75 * (0.75 - 0.25) + 1.0 * (0.25 - 0.0) + = 0.625 + + """ + precision_list, recall_list = self._calculate_precision_recall_list(tp_list) + + if len(precision_list) == 0: + return np.nan + + max_precision_list, max_precision_recall_list = self.interpolate_precision_recall_list( + precision_list, + recall_list, + ) + + ap: float = 0.0 + for i in range(len(max_precision_list) - 1): + score: float = max_precision_list[i] * ( + max_precision_recall_list[i] - max_precision_recall_list[i + 1] + ) + ap += score + + return ap + + def interpolate_precision_recall_list( + self, + precision_list: List[float], + recall_list: List[float], + ): + """[summary] + Interpolate precision and recall with maximum precision value per recall bins. + + Args: + precision_list (List[float]) + recall_list (List[float]) + """ + max_precision_list: List[float] = [precision_list[-1]] + max_precision_recall_list: List[float] = [recall_list[-1]] + + for i in reversed(range(len(recall_list) - 1)): + if precision_list[i] > max_precision_list[-1]: + max_precision_list.append(precision_list[i]) + max_precision_recall_list.append(recall_list[i]) + + # append min recall + max_precision_list.append(max_precision_list[-1]) + max_precision_recall_list.append(0.0) + + return max_precision_list, max_precision_recall_list diff --git a/perception_eval/perception_eval/evaluation/metrics/prediction/utils.py b/perception_eval/perception_eval/evaluation/metrics/prediction/utils.py new file mode 100644 index 00000000..267e06cb --- /dev/null +++ b/perception_eval/perception_eval/evaluation/metrics/prediction/utils.py @@ -0,0 +1,49 @@ +# Copyright 2022 TIER IV, Inc. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import deepcopy +from typing import Tuple + +from perception_eval.common.object import DynamicObject +from perception_eval.evaluation.result.object_result import DynamicObjectWithPerceptionResult + + +def prepare_path( + object_result: DynamicObjectWithPerceptionResult, + top_k: int, +) -> Tuple[DynamicObject, DynamicObject]: + """[summary] + Extract + Args: + object_result (DynamicObjectResult) + top_k (int) + + Returns: + estimated_object_ (DynamicObject) + ground_truth_object_ (DynamicObject) + """ + if object_result.ground_truth_object is None: + raise RuntimeError("Object result's ground truth object must be set") + + estimated_object_ = deepcopy(object_result.estimated_object) + ground_truth_object_ = deepcopy(object_result.ground_truth_object) + + estimated_object_.predicted_paths.sort(key=lambda x: x.confidence, reverse=True) + estimated_object_.predicted_paths = estimated_object_.predicted_paths[:top_k] + + num_stack: int = min(top_k, len(estimated_object_.predicted_paths)) + for _ in range(num_stack - 1): + ground_truth_object_.predicted_paths.append(ground_truth_object_.predicted_paths) + + return estimated_object_, ground_truth_object_ diff --git a/perception_eval/perception_eval/evaluation/metrics/tracking/tracking_metrics_score.py b/perception_eval/perception_eval/evaluation/metrics/tracking/tracking_metrics_score.py index f348016a..d6ee4687 100644 --- a/perception_eval/perception_eval/evaluation/metrics/tracking/tracking_metrics_score.py +++ b/perception_eval/perception_eval/evaluation/metrics/tracking/tracking_metrics_score.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ctypes import Union from typing import Dict from typing import List from typing import Tuple +from typing import Union from perception_eval.common.label import AutowareLabel from perception_eval.evaluation.matching.object_matching import MatchingMode diff --git a/perception_eval/perception_eval/evaluation/result/perception_frame_result.py b/perception_eval/perception_eval/evaluation/result/perception_frame_result.py index 3dc30ddf..a2423324 100644 --- a/perception_eval/perception_eval/evaluation/result/perception_frame_result.py +++ b/perception_eval/perception_eval/evaluation/result/perception_frame_result.py @@ -123,7 +123,7 @@ def evaluate_frame( tracking_results[label] = [prev_results, tracking_results[label]] self.metrics_score.evaluate_tracking(tracking_results, num_ground_truth_dict) if self.metrics_score.prediction_config is not None: - pass + self.metrics_score.evaluate_prediction(object_results_dict, num_ground_truth_dict) self.pass_fail_result.evaluate( object_results=self.object_results, diff --git a/perception_eval/perception_eval/manager/perception_evaluation_manager.py b/perception_eval/perception_eval/manager/perception_evaluation_manager.py index 89fd872e..4da7b16e 100644 --- a/perception_eval/perception_eval/manager/perception_evaluation_manager.py +++ b/perception_eval/perception_eval/manager/perception_evaluation_manager.py @@ -189,7 +189,7 @@ def get_scene_result(self) -> MetricsScore: if self.evaluator_config.metrics_config.tracking_config is not None: scene_metrics_score.evaluate_tracking(all_frame_results, all_num_gt) if self.evaluator_config.metrics_config.prediction_config is not None: - pass + scene_metrics_score.evaluate_prediction(all_frame_results, all_num_gt) return scene_metrics_score diff --git a/perception_eval/perception_eval/util/debug.py b/perception_eval/perception_eval/util/debug.py index e086a9fb..b83b938e 100644 --- a/perception_eval/perception_eval/util/debug.py +++ b/perception_eval/perception_eval/util/debug.py @@ -169,6 +169,12 @@ def get_objects_with_difference( radians=object_.state.orientation.radians + diff_yaw, ) + predicted_positions, predicted_orientations, predicted_confidences = _get_prediction_params( + object_, + diff_distance, + diff_yaw, + ) + test_object_: DynamicObject = DynamicObject( unix_time=object_.unix_time, position=position, @@ -179,7 +185,66 @@ def get_objects_with_difference( semantic_label=object_.semantic_label, pointcloud_num=object_.pointcloud_num, uuid=object_.uuid, + predicted_positions=predicted_positions, + predicted_orientations=predicted_orientations, + predicted_confidences=predicted_confidences, ) output_objects.append(test_object_) return output_objects + + +def _get_prediction_params( + object_: DynamicObject, + diff_distance: Tuple[float, float, float] = (0.0, 0.0, 0.0), + diff_yaw: float = 0.0, +) -> Tuple[ + Optional[List[List[Tuple[float]]]], + Optional[List[List[Quaternion]]], + Optional[List[float]], +]: + """ + Get object's prediction parameters with distance and yaw difference for test. + + Args: + object_ (DynamicObject): dynamic object. + diff_distance (Tuple[float, float, float], optional): + The parameter for difference of position. Defaults to + (0.0, 0.0, 0.0). + diff_yaw (float, optional): + The parameter for difference of yaw angle. Defaults to 0.0. + + Returns: + If the attribute of dynamic object named predicted_paths is None, returns None, None, None. + predicted_positions (List[List[Tuple[float]]]): List of positions + predicted_orientations (List[List[Quaternion]]): List of quaternions. + predicted_confidences (List[float]): List of confidences. + """ + if object_.predicted_paths is None: + return None, None, None + + predicted_positions: List[List[Tuple[float]]] = [] + predicted_orientations: List[List[Quaternion]] = [] + predicted_confidences: List[float] = [] + for paths in object_.predicted_paths: + positions = [] + orientations = [] + for path in paths: + positions.append( + ( + path.position[0] + diff_distance[0], + path.position[1] + diff_distance[1], + path.position[2] + diff_distance[2], + ) + ) + orientations.append( + Quaternion( + axis=path.orientation.axis, + radians=path.orientation.radians + diff_yaw, + ) + ) + predicted_positions.append(positions) + predicted_orientations.append(orientations) + predicted_confidences.append(paths.confidence) + + return predicted_positions, predicted_orientations, predicted_confidences diff --git a/perception_eval/test/perception_lsim.py b/perception_eval/test/perception_lsim.py index b7733057..9d260df2 100644 --- a/perception_eval/test/perception_lsim.py +++ b/perception_eval/test/perception_lsim.py @@ -73,7 +73,6 @@ def __init__( evaluation_config_dict.update({"evaluation_task": evaluation_task}) else: raise ValueError(f"Unexpected evaluation task: {evaluation_task}") - # TODO prediction evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( dataset_paths=dataset_paths, @@ -310,6 +309,67 @@ def visualize(self, frame_result: PerceptionFrameResult): logging.info(score_df.to_string()) logging.info(error_df.to_string()) + # ========================================= Prediction ========================================= + print("=" * 50 + "Start Prediction" + "=" * 50) + if args.use_tmpdir: + tmpdir = tempfile.TemporaryDirectory() + result_root_directory: str = tmpdir.name + else: + result_root_directory: str = "data/result/{TIME}/" + prediction_lsim = PerceptionLSimMoc(dataset_paths, "prediction", result_root_directory) + + for ground_truth_frame in prediction_lsim.evaluator.ground_truth_frames: + objects_with_difference = get_objects_with_difference( + ground_truth_objects=ground_truth_frame.objects, + diff_distance=(2.3, 0.0, 0.2), + diff_yaw=0.2, + is_confidence_with_distance=False, + ) + # To avoid case of there is no object + if len(objects_with_difference) > 0: + objects_with_difference.pop(0) + prediction_lsim.callback( + ground_truth_frame.unix_time, + objects_with_difference, + ) + + # final result + prediction_final_metric_score = prediction_lsim.get_final_result() + + # Debug + if len(prediction_lsim.evaluator.frame_results) > 0: + logging.info( + "Frame result example (frame_results[0]): " + f"{format_class_for_log(prediction_lsim.evaluator.frame_results[0], 1)}", + ) + + if len(prediction_lsim.evaluator.frame_results[0].object_results) > 0: + logging.info( + "Object result example (frame_results[0].object_results[0]): " + f"{format_class_for_log(prediction_lsim.evaluator.frame_results[0].object_results[0])}", + ) + + # Metrics config + logging.info( + "Prediction Metrics example (prediction_final_metric_score): " + f"{format_class_for_log(prediction_final_metric_score, len(prediction_final_metric_score.prediction_config.target_labels))}", + ) + + # Prediction metrics score + logging.info( + "Prediction result example (prediction_final_metric_score.tracking_scores[0].clears[0]): " + f"{format_class_for_log(prediction_final_metric_score.prediction_scores[0], 100)}" + ) + + # # Visualize all frame results + # logging.info("Start visualizing prediction results") + # prediction_lsim.evaluator.visualize_all() + + # # Prediction performance report + # prediction_analyzer = PerceptionPerformanceAnalyzer(prediction_lsim.evaluator.evaluator_config) + # prediction_analyzer.add(prediction_lsim.evaluator.frame_results) + # score_df, error_df = prediction_analyzer.analyze() + # Clean up tmpdir if args.use_tmpdir: tmpdir.cleanup()