Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge ground segmentation evaluator #55

Merged
merged 11 commits into from
Nov 29, 2024
1 change: 1 addition & 0 deletions driving_log_replayer_v2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ install(PROGRAMS
scripts/perception_database_result.py
scripts/perception_2d_evaluator_node.py
scripts/traffic_light_evaluator_node.py
scripts/ground_segmentation_evaluator_node.py
scripts/obstacle_segmentation_evaluator_node.py
scripts/performance_diag_evaluator_node.py
scripts/localization_evaluator_node.py
Expand Down
100 changes: 100 additions & 0 deletions driving_log_replayer_v2/driving_log_replayer_v2/ground_segmentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2024 TIER IV.inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Literal

from pydantic import BaseModel

from driving_log_replayer_v2.result import EvaluationItem
from driving_log_replayer_v2.result import ResultBase
from driving_log_replayer_v2.scenario import number
from driving_log_replayer_v2.scenario import Scenario
from driving_log_replayer_v2_msgs.msg import GroundSegmentationEvalResult


class Condition(BaseModel):
Method: Literal["annotated_pcd", "annotated_rosbag"]
ground_label: number
obstacle_label: number
accuracy_min: number
accuracy_max: number
PassRate: number


class Evaluation(BaseModel):
UseCaseName: Literal["ground_segmentation"]
UseCaseFormatVersion: Literal["0.3.0"]
Conditions: Condition


class GroundSegmentationScenario(Scenario):
Evaluation: Evaluation


@dataclass
class GroundSegmentation(EvaluationItem):
name: str = "Ground Segmentation"

def set_frame(self, msg: GroundSegmentationEvalResult) -> dict:
self.condition: Condition
self.total += 1

frame_success = self.condition.accuracy_min <= msg.accuracy <= self.condition.accuracy_max

if frame_success:
self.passed += 1

current_rate = self.rate()
self.success = current_rate >= self.condition.PassRate
self.summary = f"{self.name} ({self.success_str()}): {self.passed} / {self.total} -> {current_rate:.2f}"

return {
"GroundSegmentation": {
"Result": {
"Total": self.success_str(),
"Frame": "Success" if frame_success else "Fail",
},
"Info": {
"TP": msg.tp,
"FP": msg.fp,
"TN": msg.tn,
"FN": msg.fn,
"Accuracy": msg.accuracy,
"Precision": msg.precision,
"Recall": msg.recall,
"Specificity": msg.specificity,
"F1-score": msg.f1_score,
},
},
}


class GroundSegmentationResult(ResultBase):
def __init__(self, condition: Condition) -> None:
super().__init__()
self.__ground_segmentation = GroundSegmentation(condition=condition)

def update(self) -> None:
summary_str = f"{self.__ground_segmentation.summary}"
if self.__ground_segmentation.success:
self._success = True
self._summary = f"Passed: {summary_str}"
else:
self._success = False
self._summary = f"Failed: {summary_str}"

def set_frame(self, msg: GroundSegmentationEvalResult) -> None:
self._frame = self.__ground_segmentation.set_frame(msg)
self.update()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) 2024 TIER IV.inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

RECORD_TOPIC = """^/tf$\
|^/diagnostics$"\
"""

AUTOWARE_DISABLE = {
"localization": "false",
"planning": "false",
"control": "false",
}

AUTOWARE_ARGS = {"perception_mode": "lidar"}

NODE_PARAMS = {
"vehicle_model": LaunchConfiguration("vehicle_model"),
"evaluation_target_topic": LaunchConfiguration("evaluation_target_topic"),
}

USE_CASE_ARGS: list[DeclareLaunchArgument] = [
DeclareLaunchArgument(
"evaluation_target_topic",
default_value="/perception/obstacle_segmentation/single_frame/pointcloud",
)
]
225 changes: 225 additions & 0 deletions driving_log_replayer_v2/scripts/ground_segmentation_evaluator_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
#!/usr/bin/env python3

# Copyright (c) 2024 TIER IV.inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from pathlib import Path

import message_filters
import numpy as np
from rclpy.qos import qos_profile_sensor_data
import ros2_numpy
from scipy.spatial import cKDTree
from sensor_msgs.msg import PointCloud2

from driving_log_replayer_v2.evaluator import DLREvaluatorV2
from driving_log_replayer_v2.evaluator import evaluator_main
from driving_log_replayer_v2.ground_segmentation import Condition
from driving_log_replayer_v2.ground_segmentation import GroundSegmentationResult
from driving_log_replayer_v2.ground_segmentation import GroundSegmentationScenario
import driving_log_replayer_v2.perception_eval_conversions as eval_conversions
from driving_log_replayer_v2_msgs.msg import GroundSegmentationEvalResult


class GroundSegmentationEvaluator(DLREvaluatorV2):
CLOUD_DIM = 6
TS_DIFF_THRESH = 75000

def __init__(self, name: str) -> None:
super().__init__(name, GroundSegmentationScenario, GroundSegmentationResult)

eval_condition: Condition = self._scenario.Evaluation.Conditions
self.ground_label = eval_condition.ground_label
self.obstacle_label = eval_condition.obstacle_label
self.declare_parameter(
"evaluation_target_topic",
"/perception/obstacle_segmentation/pointcloud",
)
self.eval_target_topic = (
self.get_parameter("evaluation_target_topic").get_parameter_value().string_value
)

if eval_condition.Method == "annotated_pcd":
# pcd eval mode
sample_data_path = Path(self._t4_dataset_paths[0], "annotation", "sample_data.json")
sample_data = json.load(sample_data_path.open())
sample_data = list(filter(lambda d: d["filename"].split(".")[-2] == "pcd", sample_data))

self.ground_truth: dict[int, np.ndarray] = {}
for i in range(len(sample_data)):
pcd_file_path = Path(
self._t4_dataset_paths[0],
sample_data[i]["filename"],
).as_posix()
raw_points = np.fromfile(pcd_file_path, dtype=np.float32)
points: np.ndarray = raw_points.reshape((-1, self.CLOUD_DIM))
self.ground_truth[int(sample_data[i]["timestamp"])] = points

self.__sub_pointcloud = self.create_subscription(
PointCloud2,
self.eval_target_topic,
self.annotated_pcd_eval_cb,
qos_profile_sensor_data,
)
elif eval_condition.Method == "annotated_rosbag":
# rosbag (AWSIM) eval mode

self.__sub_gt_cloud = message_filters.Subscriber(
self,
PointCloud2,
"/sensing/lidar/concatenated/pointcloud",
qos_profile=qos_profile_sensor_data,
)
self.__sub_eval_target_cloud = message_filters.Subscriber(
self,
PointCloud2,
self.eval_target_topic,
qos_profile=qos_profile_sensor_data,
)
self.__sync_sub = message_filters.TimeSynchronizer(
[self.__sub_gt_cloud, self.__sub_eval_target_cloud],
1000,
)
self.__sync_sub.registerCallback(self.annotated_rosbag_eval_cb)
else:
err = 'The "Method" field must be set to either "annotated_rosbag" or "annotated_pcd"'
raise ValueError(err)

def annotated_pcd_eval_cb(self, msg: PointCloud2) -> None:
unix_time: int = eval_conversions.unix_time_from_ros_msg(msg.header)
gt_frame_ts = self.__get_gt_frame_ts(unix_time=unix_time)

if gt_frame_ts < 0:
return

# get ground truth pointcloud in this frame
# construct kd-tree from gt cloud
gt_frame_cloud: np.ndarray = self.ground_truth[gt_frame_ts]
kdtree = cKDTree(gt_frame_cloud[:, 0:3])

# convert ros2 pointcloud to numpy
numpy_pcd = ros2_numpy.numpify(msg)
pointcloud = np.zeros((numpy_pcd.shape[0], 3))
pointcloud[:, 0] = numpy_pcd["x"]
pointcloud[:, 1] = numpy_pcd["y"]
pointcloud[:, 2] = numpy_pcd["z"]

# count TP+FN, TN+FP
tp_fn = np.count_nonzero(gt_frame_cloud[:, 5] == self.ground_label)
fp_tn = np.count_nonzero(gt_frame_cloud[:, 5] == self.obstacle_label)
tn: int = 0
fn: int = 0
for p in pointcloud:
_, idx = kdtree.query(p, k=1)
if gt_frame_cloud[idx][5] == self.ground_label:
fn += 1
elif gt_frame_cloud[idx][5] == self.obstacle_label:
tn += 1
tp = tp_fn - fn
fp = fp_tn - tn

self.get_logger().info(f"TP {tp}, FP {fp}, TN {tn}, FN {fn}")

metrics_list = self.__compute_metrics(tp, fp, tn, fn)

frame_result = GroundSegmentationEvalResult()
frame_result.tp = tp
frame_result.fp = fp
frame_result.tn = tn
frame_result.fn = fn
frame_result.accuracy = metrics_list[0]
frame_result.precision = metrics_list[1]
frame_result.recall = metrics_list[2]
frame_result.specificity = metrics_list[3]
frame_result.f1_score = metrics_list[4]

self._result.set_frame(frame_result)
self._result_writer.write_result(self._result)

def annotated_rosbag_eval_cb(
self,
gt_cloud_msg: PointCloud2,
eval_target_cloud_msg: PointCloud2,
) -> None:
np_gt_cloud: np.array = ros2_numpy.numpify(gt_cloud_msg)
np_target_cloud: np.array = ros2_numpy.numpify(eval_target_cloud_msg)

# guard
if (
"entity_id" not in np_gt_cloud.dtype.fields
or "entity_id" not in np_target_cloud.dtype.fields
):
self.get_logger().warn('The input PointCloud doesn\'t have a field named "entity_id"')
return

tp_fn = np.count_nonzero(np_gt_cloud["entity_id"] == self.ground_label)
tn_fp = np_gt_cloud.size - tp_fn
fn = np.count_nonzero(np_target_cloud["entity_id"] == self.ground_label)
tn = np_target_cloud.size - fn

tp = tp_fn - fn
fp = tn_fp - tn
self.get_logger().info(f"TP+FN = {tp_fn}, TN+FP = {tn_fp}")
self.get_logger().info(f"TP {tp}, FP {fp}, TN {tn}, FN {fn}")
metrics_list = self.__compute_metrics(tp, fp, tn, fn)
frame_result = GroundSegmentationEvalResult()
frame_result.tp = tp
frame_result.fp = fp
frame_result.tn = tn
frame_result.fn = fn
frame_result.accuracy = metrics_list[0]
frame_result.precision = metrics_list[1]
frame_result.recall = metrics_list[2]
frame_result.specificity = metrics_list[3]
frame_result.f1_score = metrics_list[4]

self._result.set_frame(frame_result)
self._result_writer.write_result(self._result)

def __get_gt_frame_ts(self, unix_time: int) -> int:
ts_itr = iter(self.ground_truth.keys())
ret_ts: int = int(next(ts_itr))
min_diff: int = abs(unix_time - ret_ts)

for _ in range(1, len(self.ground_truth)):
sample_ts = next(ts_itr)
diff_time = abs(unix_time - sample_ts)
if diff_time < min_diff:
min_diff = diff_time
ret_ts = sample_ts

if min_diff > self.TS_DIFF_THRESH:
self.get_logger().warn("time diff is too big")
return -1

return ret_ts

def __compute_metrics(self, tp: int, fp: int, tn: int, fn: int) -> list[float]:
eps = 1e-10
accuracy = float(tp + tn) / float(tp + fp + tn + fn + eps)
precision = float(tp) / float(tp + fp + eps)
recall = float(tp) / float(tp + fn + eps)
specificity = float(tn) / float(tn + fp + eps)
f1_score = 2 * (precision * recall) / (precision + recall + eps)
return [accuracy, precision, recall, specificity, f1_score]


@evaluator_main
def main() -> DLREvaluatorV2:
return GroundSegmentationEvaluator("ground_segmentation_evaluator")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions driving_log_replayer_v2_msgs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ autoware_package()
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/ObstacleSegmentationMarker.msg"
"msg/ObstacleSegmentationMarkerArray.msg"
"msg/GroundSegmentationEvalResult.msg"
DEPENDENCIES
std_msgs
)
Expand Down
10 changes: 10 additions & 0 deletions driving_log_replayer_v2_msgs/msg/GroundSegmentationEvalResult.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
uint32 tp
uint32 fp
uint32 tn
uint32 fn

float32 accuracy
float32 precision
float32 recall
float32 specificity
float32 f1_score
Loading
Loading