From e977104664d195f2170331d33db1021671c747e9 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 09:56:15 +0900 Subject: [PATCH 1/7] refactor: localization abstract base class (#228) --- .../driving_log_replayer/evaluator.py | 282 ++++++++++++++++++ .../driving_log_replayer/node_common.py | 115 ------- .../driving_log_replayer/result.py | 12 +- .../scripts/eagleye_evaluator_node.py | 156 ++-------- .../scripts/localization_evaluator_node.py | 238 ++++----------- .../scripts/yabloc_evaluator_node.py | 155 ++-------- 6 files changed, 379 insertions(+), 579 deletions(-) create mode 100644 driving_log_replayer/driving_log_replayer/evaluator.py delete mode 100644 driving_log_replayer/driving_log_replayer/node_common.py diff --git a/driving_log_replayer/driving_log_replayer/evaluator.py b/driving_log_replayer/driving_log_replayer/evaluator.py new file mode 100644 index 000000000..b90dc3e9b --- /dev/null +++ b/driving_log_replayer/driving_log_replayer/evaluator.py @@ -0,0 +1,282 @@ +# Copyright (c) 2023 TIER IV.inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC +from abc import abstractmethod +import os +from os.path import expandvars +from typing import Dict +from typing import Optional +from typing import TYPE_CHECKING + +from autoware_adapi_v1_msgs.srv import InitializeLocalization +from geometry_msgs.msg import PoseStamped +from geometry_msgs.msg import PoseWithCovarianceStamped +from geometry_msgs.msg import TransformStamped +import numpy as np +import rclpy +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup +from rclpy.clock import Clock +from rclpy.clock import ClockType +from rclpy.executors import MultiThreadedExecutor +from rclpy.node import Node +from rclpy.time import Time +from rosidl_runtime_py import message_to_ordereddict +import simplejson as json +from tf2_ros import Buffer +from tf2_ros import TransformListener +from tf_transformations import euler_from_quaternion +from tier4_localization_msgs.srv import PoseWithCovarianceStamped as PoseWithCovarianceStampedSrv +import yaml + +from driving_log_replayer.result import ResultWriter + +if TYPE_CHECKING: + from autoware_adapi_v1_msgs.msg import ResponseStatus + + +class DLREvaluator(Node, ABC): + COUNT_SHUTDOWN_NODE = 5 + + def __init__(self, name: str) -> None: + super().__init__(name) + self.declare_parameter("scenario_path", "") + self.declare_parameter("result_json_path", "") + + self._scenario_path = expandvars( + self.get_parameter("scenario_path").get_parameter_value().string_value + ) + self._result_json_path = expandvars( + self.get_parameter("result_json_path").get_parameter_value().string_value + ) + + self._scenario_yaml_obj = None + try: + with open(self._scenario_path) as scenario_file: + self._scenario_yaml_obj = yaml.safe_load(scenario_file) + except (FileNotFoundError, PermissionError, yaml.YAMLError) as e: + self.get_logger().error(f"An error occurred while loading the scenario. {e}") + rclpy.shutdown() + + self._condition = self._scenario_yaml_obj["Evaluation"].get("Conditions", {}) + self._result_writer = ResultWriter( + self._result_json_path, self.get_clock(), self._condition + ) + + self._tf_buffer = Buffer() + self._tf_listener = TransformListener(self._tf_buffer, self, spin_thread=True) + + self._initial_pose = DLREvaluator.set_initial_pose( + self._scenario_yaml_obj["Evaluation"].get("InitialPose", None) + ) + self.start_initialpose_service() + + self._current_time = Time().to_msg() + self._prev_time = Time().to_msg() + self._clock_stop_counter = 0 + + self._timer_group = MutuallyExclusiveCallbackGroup() + self._timer = self.create_timer( + 1.0, + self.timer_cb, + callback_group=self._timer_group, + clock=Clock(clock_type=ClockType.SYSTEM_TIME), + ) # wall timer + + def timer_cb(self) -> None: + self._current_time = self.get_clock().now().to_msg() + # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") + if self._current_time.sec > 0: + if self._initial_pose is not None: + self.call_initialpose_service() + if self._current_time == self._prev_time: + self._clock_stop_counter += 1 + else: + self._clock_stop_counter = 0 + self._prev_time = self._current_time + if self._clock_stop_counter >= DLREvaluator.COUNT_SHUTDOWN_NODE: + self._result_writer.close() + rclpy.shutdown() + + def start_initialpose_service(self) -> None: + if self._initial_pose is None: + return + self._initial_pose_running = False + self._initial_pose_success = False + self._initial_pose_client = self.create_client( + InitializeLocalization, "/api/localization/initialize" + ) + self._map_fit_client = self.create_client( + PoseWithCovarianceStampedSrv, "/map/map_height_fitter/service" + ) + + while not self._initial_pose_client.wait_for_service(timeout_sec=1.0): + self.get_logger().warning("initial pose service not available, waiting again...") + while not self._map_fit_client.wait_for_service(timeout_sec=1.0): + self.get_logger().warning("map height fitter service not available, waiting again...") + + def call_initialpose_service(self) -> None: + if not self._initial_pose_success and not self._initial_pose_running: + self.get_logger().info( + f"call initial_pose time: {self._current_time.sec}.{self._current_time.nanosec}" + ) + self._initial_pose_running = True + self._initial_pose.header.stamp = self._current_time + future_map_fit = self._map_fit_client.call_async( + PoseWithCovarianceStampedSrv.Request(pose_with_covariance=self._initial_pose) + ) + future_map_fit.add_done_callback(self.map_fit_cb) + + def map_fit_cb(self, future): + result = future.result() + if result is not None: + if result.success: + future_init_pose = self._initial_pose_client.call_async( + InitializeLocalization.Request(pose=[result.pose_with_covariance]) + ) + future_init_pose.add_done_callback(self.initial_pose_cb) + else: + # free self._initial_pose_running when the service call fails + self._initial_pose_running = False + self.get_logger().warn("map_height_height service result is fail") + else: + # free self._initial_pose_running when the service call fails + self._initial_pose_running = False + self.get_logger().error(f"Exception for service: {future.exception()}") + + def initial_pose_cb(self, future): + result = future.result() + if result is not None: + res_status: ResponseStatus = result.status + self._initial_pose_success = res_status.success + self.get_logger().info( + f"initial_pose_success: {self._initial_pose_success}" + ) # debug msg + else: + self.get_logger().error(f"Exception for service: {future.exception()}") + # free self._initial_pose_running + self._initial_pose_running = False + + @abstractmethod + def check_scenario(self) -> None: + """Check self._scenario_yaml_obj and if has error shutdown.""" + if self._scenario_yaml_obj is None: + rclpy.shutdown() + + @classmethod + def get_goal_pose_from_t4_dataset(cls, dataset_path: str) -> PoseStamped: + ego_pose_json_path = os.path.join(dataset_path, "annotation", "ego_pose.json") + with open(ego_pose_json_path) as ego_pose_file: + ego_pose_json = json.load(ego_pose_file) + last_ego_pose = ego_pose_json[-1] + goal_pose = PoseStamped() + goal_pose.header.frame_id = "map" + goal_pose.pose.position.x = last_ego_pose["translation"][0] + goal_pose.pose.position.y = last_ego_pose["translation"][1] + goal_pose.pose.position.z = last_ego_pose["translation"][2] + goal_pose.pose.orientation.x = last_ego_pose["rotation"][1] + goal_pose.pose.orientation.y = last_ego_pose["rotation"][2] + goal_pose.pose.orientation.z = last_ego_pose["rotation"][3] + goal_pose.pose.orientation.w = last_ego_pose["rotation"][0] + return goal_pose + + @classmethod + def transform_stamped_with_euler_angle(cls, transform_stamped: TransformStamped) -> Dict: + tf_euler = message_to_ordereddict(transform_stamped) + euler_angle = euler_from_quaternion( + [ + transform_stamped.transform.rotation.x, + transform_stamped.transform.rotation.y, + transform_stamped.transform.rotation.z, + transform_stamped.transform.rotation.w, + ] + ) + tf_euler["rotation_euler"] = { + "roll": euler_angle[0], + "pitch": euler_angle[1], + "yaw": euler_angle[2], + } + return tf_euler + + @classmethod + def set_initial_pose(cls, initial_pose: Optional[Dict]) -> Optional[PoseWithCovarianceStamped]: + if initial_pose is None: + return None + try: + ros_init_pose = PoseWithCovarianceStamped() + ros_init_pose.header.frame_id = "map" + ros_init_pose.pose.pose.position.x = float(initial_pose["position"]["x"]) + ros_init_pose.pose.pose.position.y = float(initial_pose["position"]["y"]) + ros_init_pose.pose.pose.position.z = float(initial_pose["position"]["z"]) + ros_init_pose.pose.pose.orientation.x = float(initial_pose["orientation"]["x"]) + ros_init_pose.pose.pose.orientation.y = float(initial_pose["orientation"]["y"]) + ros_init_pose.pose.pose.orientation.z = float(initial_pose["orientation"]["z"]) + ros_init_pose.pose.pose.orientation.w = float(initial_pose["orientation"]["w"]) + ros_init_pose.pose.covariance = np.array( + [ + 0.25, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.25, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.06853892326654787, + ] + ) + except KeyError: + return None + else: + return ros_init_pose + + +def evaluator_main(func): + def wrapper(): + rclpy.init() + executor = MultiThreadedExecutor() + evaluator = func() + executor.add_node(evaluator) + executor.spin() + evaluator.destroy_node() + rclpy.shutdown() + + return wrapper diff --git a/driving_log_replayer/driving_log_replayer/node_common.py b/driving_log_replayer/driving_log_replayer/node_common.py deleted file mode 100644 index 6c91fcff0..000000000 --- a/driving_log_replayer/driving_log_replayer/node_common.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright (c) 2022 TIER IV.inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -from typing import Dict -from typing import Optional - -from geometry_msgs.msg import PoseStamped -from geometry_msgs.msg import PoseWithCovarianceStamped -from geometry_msgs.msg import TransformStamped -import numpy as np -from rosidl_runtime_py import message_to_ordereddict -import simplejson as json -from tf_transformations import euler_from_quaternion - - -def get_goal_pose_from_t4_dataset(dataset_path: str) -> PoseStamped: - ego_pose_json_path = os.path.join(dataset_path, "annotation", "ego_pose.json") - with open(ego_pose_json_path) as ego_pose_file: - ego_pose_json = json.load(ego_pose_file) - last_ego_pose = ego_pose_json[-1] - goal_pose = PoseStamped() - goal_pose.header.frame_id = "map" - goal_pose.pose.position.x = last_ego_pose["translation"][0] - goal_pose.pose.position.y = last_ego_pose["translation"][1] - goal_pose.pose.position.z = last_ego_pose["translation"][2] - goal_pose.pose.orientation.x = last_ego_pose["rotation"][1] - goal_pose.pose.orientation.y = last_ego_pose["rotation"][2] - goal_pose.pose.orientation.z = last_ego_pose["rotation"][3] - goal_pose.pose.orientation.w = last_ego_pose["rotation"][0] - return goal_pose - - -def transform_stamped_with_euler_angle(transform_stamped: TransformStamped) -> Dict: - tf_euler = message_to_ordereddict(transform_stamped) - euler_angle = euler_from_quaternion( - [ - transform_stamped.transform.rotation.x, - transform_stamped.transform.rotation.y, - transform_stamped.transform.rotation.z, - transform_stamped.transform.rotation.w, - ] - ) - tf_euler["rotation_euler"] = { - "roll": euler_angle[0], - "pitch": euler_angle[1], - "yaw": euler_angle[2], - } - return tf_euler - - -def set_initial_pose(initial_pose: Optional[Dict]) -> Optional[PoseWithCovarianceStamped]: - ros_init_pose: Optional[PoseWithCovarianceStamped] = None - if initial_pose is not None: - ros_init_pose = PoseWithCovarianceStamped() - ros_init_pose.header.frame_id = "map" - ros_init_pose.pose.pose.position.x = float(initial_pose["position"]["x"]) - ros_init_pose.pose.pose.position.y = float(initial_pose["position"]["y"]) - ros_init_pose.pose.pose.position.z = float(initial_pose["position"]["z"]) - ros_init_pose.pose.pose.orientation.x = float(initial_pose["orientation"]["x"]) - ros_init_pose.pose.pose.orientation.y = float(initial_pose["orientation"]["y"]) - ros_init_pose.pose.pose.orientation.z = float(initial_pose["orientation"]["z"]) - ros_init_pose.pose.pose.orientation.w = float(initial_pose["orientation"]["w"]) - ros_init_pose.pose.covariance = np.array( - [ - 0.25, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.25, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.0, - 0.06853892326654787, - ] - ) - return ros_init_pose diff --git a/driving_log_replayer/driving_log_replayer/result.py b/driving_log_replayer/driving_log_replayer/result.py index f55671b6b..8339f3b45 100644 --- a/driving_log_replayer/driving_log_replayer/result.py +++ b/driving_log_replayer/driving_log_replayer/result.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABC +from abc import abstractmethod import os import pickle from typing import Dict @@ -21,7 +23,7 @@ import simplejson as json -class ResultBase: +class ResultBase(ABC): def __init__(self) -> None: self._success = False self._summary = "NoData" @@ -36,6 +38,14 @@ def summary(self) -> str: def frame(self) -> Dict: return self._frame + @abstractmethod + def update(self) -> None: + """Update success and summary.""" + + @abstractmethod + def set_frame(self, msg) -> None: + """Set the result of one frame from the subscribe ros message.""" + class ResultWriter: def __init__(self, result_json_path: str, ros_clock: Clock, condition: Dict): diff --git a/driving_log_replayer/scripts/eagleye_evaluator_node.py b/driving_log_replayer/scripts/eagleye_evaluator_node.py index c233606aa..6f6dbf5b8 100755 --- a/driving_log_replayer/scripts/eagleye_evaluator_node.py +++ b/driving_log_replayer/scripts/eagleye_evaluator_node.py @@ -14,29 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. - -import os -from typing import TYPE_CHECKING - -from autoware_adapi_v1_msgs.srv import InitializeLocalization from diagnostic_msgs.msg import DiagnosticArray from diagnostic_msgs.msg import DiagnosticStatus -import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node -from rclpy.time import Time -from tier4_localization_msgs.srv import PoseWithCovarianceStamped -import yaml - -from driving_log_replayer.node_common import set_initial_pose -from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter -if TYPE_CHECKING: - from autoware_adapi_v1_msgs.msg import ResponseStatus +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main +from driving_log_replayer.result import ResultBase class EagleyeResult(ResultBase): @@ -63,7 +46,7 @@ def update(self): self._success = False self._summary = f"Failed: {summary_str}" - def add_eagleye_availability_frame(self, msg: DiagnosticArray): + def set_frame(self, msg: DiagnosticArray): for diag_status in msg.status: out_frame = {"Ego": {}} if diag_status.name != "monitor: eagleye_enu_absolute_pos_interpolate": @@ -78,57 +61,13 @@ def add_eagleye_availability_frame(self, msg: DiagnosticArray): self.update() -class EagleyeEvaluator(Node): - def __init__(self): - super().__init__("eagleye_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - - self.__timer_group = MutuallyExclusiveCallbackGroup() - - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value - ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value - ) +class EagleyeEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() self.__result = EagleyeResult() - self.__result_writer = ResultWriter(self.__result_json_path, self.get_clock(), {}) - - self.__initial_pose = set_initial_pose( - self.__scenario_yaml_obj["Evaluation"]["InitialPose"] - ) - self.__initial_pose_running = False - self.__initial_pose_success = False - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - self.__counter = 0 - - # service client - self.__initial_pose_client = self.create_client( - InitializeLocalization, "/api/localization/initialize" - ) - self.__map_fit_client = self.create_client( - PoseWithCovarianceStamped, "/map/map_height_fitter/service" - ) - while not self.__initial_pose_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("initial pose service not available, waiting again...") - while not self.__map_fit_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("map height fitter service not available, waiting again...") - - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer - self.__sub_diagnostics = self.create_subscription( DiagnosticArray, "/diagnostics", @@ -136,76 +75,17 @@ def __init__(self): 1, ) + def check_scenario(self) -> None: + pass + def diagnostics_cb(self, msg: DiagnosticArray): - self.__result.add_eagleye_availability_frame(msg) - self.__result_writer.write(self.__result) - - def timer_cb(self) -> None: - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if ( - self.__initial_pose is not None - and not self.__initial_pose_success - and not self.__initial_pose_running - ): - self.get_logger().info( - f"call initial_pose time: {self.__current_time.sec}.{self.__current_time.nanosec}" - ) - self.__initial_pose_running = True - self.__initial_pose.header.stamp = self.__current_time - future_map_fit = self.__map_fit_client.call_async( - PoseWithCovarianceStamped.Request(pose_with_covariance=self.__initial_pose) - ) - future_map_fit.add_done_callback(self.map_fit_cb) - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.__result_writer.close() - rclpy.shutdown() - - def map_fit_cb(self, future): - result = future.result() - if result is not None: - if result.success: - future_init_pose = self.__initial_pose_client.call_async( - InitializeLocalization.Request(pose=[result.pose_with_covariance]) - ) - future_init_pose.add_done_callback(self.initial_pose_cb) - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().warn("map_height_height service result is fail") - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().error(f"Exception for service: {future.exception()}") - - def initial_pose_cb(self, future): - result = future.result() - if result is not None: - res_status: ResponseStatus = result.status - self.__initial_pose_success = res_status.success - self.get_logger().info( - f"initial_pose_success: {self.__initial_pose_success}" - ) # debug msg - else: - self.get_logger().error(f"Exception for service: {future.exception()}") - # free self.__initial_pose_running - self.__initial_pose_running = False - - -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - localization_evaluator = EagleyeEvaluator() - executor.add_node(localization_evaluator) - executor.spin() - localization_evaluator.destroy_node() - rclpy.shutdown() + self.__result.set_frame(msg) + self._result_writer.write(self.__result) + + +@evaluator_main +def main(): + return EagleyeEvaluator("eagleye_evaluator") if __name__ == "__main__": diff --git a/driving_log_replayer/scripts/localization_evaluator_node.py b/driving_log_replayer/scripts/localization_evaluator_node.py index 62b3214cd..4a2c81016 100755 --- a/driving_log_replayer/scripts/localization_evaluator_node.py +++ b/driving_log_replayer/scripts/localization_evaluator_node.py @@ -15,12 +15,10 @@ # limitations under the License. -import os +from functools import singledispatchmethod import statistics from typing import Dict -from typing import TYPE_CHECKING -from autoware_adapi_v1_msgs.srv import InitializeLocalization from diagnostic_msgs.msg import DiagnosticArray from example_interfaces.msg import Float64 from geometry_msgs.msg import PoseStamped @@ -28,30 +26,16 @@ from nav_msgs.msg import Odometry import numpy as np import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node from rclpy.time import Duration -from rclpy.time import Time from rosidl_runtime_py import message_to_ordereddict -from tf2_ros import Buffer from tf2_ros import TransformException -from tf2_ros import TransformListener from tf_transformations import euler_from_quaternion from tier4_debug_msgs.msg import Float32Stamped from tier4_debug_msgs.msg import Int32Stamped -from tier4_localization_msgs.srv import PoseWithCovarianceStamped -import yaml -from driving_log_replayer.node_common import set_initial_pose -from driving_log_replayer.node_common import transform_stamped_with_euler_angle +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter - -if TYPE_CHECKING: - from autoware_adapi_v1_msgs.msg import ResponseStatus def calc_pose_lateral_distance(ndt_pose: PoseStamped, ekf_pose: Odometry): @@ -126,7 +110,12 @@ def update(self): self._success = False self._summary = f"Failed: {summary_str}" - def add_reliability_frame( + @singledispatchmethod + def set_frame(self, msg): + pass + + @set_frame.register + def set_reliability_frame( self, msg: Float32Stamped, map_to_baselink: Dict, reference: Float32Stamped ): self.__reliability_total += 1 @@ -158,7 +147,8 @@ def add_reliability_frame( self._frame = out_frame self.update() - def add_convergence_frame( + @set_frame.register + def set_convergence_frame( self, msg: PoseStamped, map_to_baselink: Dict, @@ -205,7 +195,8 @@ def add_convergence_frame( self.update() return msg_lateral_dist - def add_ndt_availability_frame(self, msg: DiagnosticArray): + @set_frame.register + def set_ndt_availability_frame(self, msg: DiagnosticArray): # Check if the NDT is available. Note that it does NOT check topic rate itself, but just the availability of the topic for diag_status in msg.status: if ( @@ -226,50 +217,12 @@ def add_ndt_availability_frame(self, msg: DiagnosticArray): self.update() -class LocalizationEvaluator(Node): - def __init__(self): - super().__init__("localization_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - - self.__timer_group = MutuallyExclusiveCallbackGroup() - self.__tf_buffer = Buffer() - self.__tf_listener = TransformListener(self.__tf_buffer, self, spin_thread=True) - - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value - ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value - ) - - self.__condition = self.__scenario_yaml_obj["Evaluation"]["Conditions"] - - self.__reliability_method = self.__condition["Reliability"]["Method"] - if self.__reliability_method not in ["TP", "NVTL"]: - self.get_logger().error( - f"Reliability Method {self.__reliability_method} is not defined." - ) - rclpy.shutdown() - - self.__result = LocalizationResult(self.__condition) +class LocalizationEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() - self.__result_writer = ResultWriter( - self.__result_json_path, self.get_clock(), self.__condition - ) - - self.__initial_pose = set_initial_pose( - self.__scenario_yaml_obj["Evaluation"]["InitialPose"] - ) - self.__initial_pose_running = False - self.__initial_pose_success = False - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - self.__counter = 0 + self.__result = LocalizationResult(self._condition) self.__pub_lateral_distance = self.create_publisher( Float64, "localization/lateral_distance", 1 @@ -281,25 +234,6 @@ def __init__(self): self.__latest_tp: Float32Stamped = Float32Stamped() self.__latest_nvtl: Float32Stamped = Float32Stamped() - # service client - self.__initial_pose_client = self.create_client( - InitializeLocalization, "/api/localization/initialize" - ) - self.__map_fit_client = self.create_client( - PoseWithCovarianceStamped, "/map/map_height_fitter/service" - ) - while not self.__initial_pose_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("initial pose service not available, waiting again...") - while not self.__map_fit_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("map height fitter service not available, waiting again...") - - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer - self.__sub_tp = self.create_subscription( Float32Stamped, "/localization/pose_estimator/transform_probability", @@ -343,6 +277,18 @@ def __init__(self): 1, ) + def check_scenario(self) -> None: + try: + self.__reliability_method = self._condition["Reliability"]["Method"] + if self.__reliability_method not in ["TP", "NVTL"]: + self.get_logger().error( + f"Reliability Method {self.__reliability_method} is not defined." + ) + rclpy.shutdown() + except KeyError: + self.get_logger().error("Scenario format error") + rclpy.shutdown() + def ekf_pose_cb(self, msg: Odometry): self.__latest_ekf_pose = msg @@ -358,16 +304,18 @@ def tp_cb(self, msg: Float32Stamped): # evaluates when reliability_method is TP return try: - map_to_baselink = self.__tf_buffer.lookup_transform( + map_to_baselink = self._tf_buffer.lookup_transform( "map", "base_link", msg.stamp, Duration(seconds=0.5) ) except TransformException as ex: self.get_logger().info(f"Could not transform map to baselink: {ex}") map_to_baselink = TransformStamped() - self.__result.add_reliability_frame( - msg, transform_stamped_with_euler_angle(map_to_baselink), self.__latest_nvtl + self.__result.set_frame( + msg, + DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), + self.__latest_nvtl, ) - self.__result_writer.write(self.__result) + self._result_writer.write(self.__result) def nvtl_cb(self, msg: Float32Stamped): self.__latest_nvtl = msg @@ -375,129 +323,43 @@ def nvtl_cb(self, msg: Float32Stamped): # evaluates when reliability_method is NVTL return try: - map_to_baselink = self.__tf_buffer.lookup_transform( + map_to_baselink = self._tf_buffer.lookup_transform( "map", "base_link", msg.stamp, Duration(seconds=0.5) ) except TransformException as ex: self.get_logger().info(f"Could not transform map to baselink: {ex}") map_to_baselink = TransformStamped() - self.__result.add_reliability_frame( - msg, transform_stamped_with_euler_angle(map_to_baselink), self.__latest_tp + self.__result.set_frame( + msg, DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), self.__latest_tp ) - self.__result_writer.write(self.__result) + self._result_writer.write(self.__result) def pose_cb(self, msg: PoseStamped): try: - map_to_baselink = self.__tf_buffer.lookup_transform( + map_to_baselink = self._tf_buffer.lookup_transform( "map", "base_link", msg.header.stamp, Duration(seconds=0.5) ) except TransformException as ex: self.get_logger().info(f"Could not transform map to baselink: {ex}") map_to_baselink = TransformStamped() - msg_lateral_distance = self.__result.add_convergence_frame( + msg_lateral_distance = self.__result.set_frame( msg, - transform_stamped_with_euler_angle(map_to_baselink), + DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), self.__latest_ekf_pose, self.__latest_exe_time, self.__latest_iteration_num, ) self.__pub_lateral_distance.publish(msg_lateral_distance) - self.__result_writer.write(self.__result) + self._result_writer.write(self.__result) - def timer_cb(self) -> None: - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if ( - self.__initial_pose is not None - and not self.__initial_pose_success - and not self.__initial_pose_running - ): - self.get_logger().info( - f"call initial_pose time: {self.__current_time.sec}.{self.__current_time.nanosec}" - ) - self.__initial_pose_running = True - self.__initial_pose.header.stamp = self.__current_time - future_map_fit = self.__map_fit_client.call_async( - PoseWithCovarianceStamped.Request(pose_with_covariance=self.__initial_pose) - ) - future_map_fit.add_done_callback(self.map_fit_cb) - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.__result_writer.close() - rclpy.shutdown() + def diagnostics_cb(self, msg: DiagnosticArray): + self.__result.set_frame(msg) + self._result_writer.write(self.__result) - def map_fit_cb(self, future): - result = future.result() - if result is not None: - if result.success: - # result.pose_with_covarianceに補正済みデータが入っている - # 補正済みデータでinitialposeを投げる - # debug result.pose_with_covariance - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.position.x: {result.pose_with_covariance.pose.pose.position.x}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.position.y: {result.pose_with_covariance.pose.pose.position.y}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.position.z: {result.pose_with_covariance.pose.pose.position.z}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.x: {result.pose_with_covariance.pose.pose.orientation.x}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.y: {result.pose_with_covariance.pose.pose.orientation.y}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.z: {result.pose_with_covariance.pose.pose.orientation.z}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.w: {result.pose_with_covariance.pose.pose.orientation.w}" - # ) - future_init_pose = self.__initial_pose_client.call_async( - InitializeLocalization.Request(pose=[result.pose_with_covariance]) - ) - future_init_pose.add_done_callback(self.initial_pose_cb) - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().warn("map_height_height service result is fail") - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().error(f"Exception for service: {future.exception()}") - - def initial_pose_cb(self, future): - result = future.result() - if result is not None: - res_status: ResponseStatus = result.status - self.__initial_pose_success = res_status.success - self.get_logger().info( - f"initial_pose_success: {self.__initial_pose_success}" - ) # debug msg - else: - self.get_logger().error(f"Exception for service: {future.exception()}") - # free self.__initial_pose_running - self.__initial_pose_running = False - def diagnostics_cb(self, msg: DiagnosticArray): - self.__result.add_ndt_availability_frame(msg) - self.__result_writer.write(self.__result) - - -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - localization_evaluator = LocalizationEvaluator() - executor.add_node(localization_evaluator) - executor.spin() - localization_evaluator.destroy_node() - rclpy.shutdown() +@evaluator_main +def main(): + return LocalizationEvaluator("localization_evaluator") if __name__ == "__main__": diff --git a/driving_log_replayer/scripts/yabloc_evaluator_node.py b/driving_log_replayer/scripts/yabloc_evaluator_node.py index 752103694..f1344997b 100755 --- a/driving_log_replayer/scripts/yabloc_evaluator_node.py +++ b/driving_log_replayer/scripts/yabloc_evaluator_node.py @@ -15,27 +15,11 @@ # limitations under the License. -import os -from typing import TYPE_CHECKING - -from autoware_adapi_v1_msgs.srv import InitializeLocalization from diagnostic_msgs.msg import DiagnosticArray -import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node -from rclpy.time import Time -from tier4_localization_msgs.srv import PoseWithCovarianceStamped -import yaml - -from driving_log_replayer.node_common import set_initial_pose -from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter -if TYPE_CHECKING: - from autoware_adapi_v1_msgs.msg import ResponseStatus +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main +from driving_log_replayer.result import ResultBase class YabLocResult(ResultBase): @@ -62,7 +46,7 @@ def update(self): self._success = False self._summary = f"Failed: {summary_str}" - def add_yabloc_availability_frame(self, msg: DiagnosticArray): + def set_frame(self, msg: DiagnosticArray): for diag_status in msg.status: out_frame = {"Ego": {}} if diag_status.name != "yabloc_monitor: yabloc_status": @@ -78,57 +62,13 @@ def add_yabloc_availability_frame(self, msg: DiagnosticArray): self.update() -class YabLocEvaluator(Node): - def __init__(self): - super().__init__("yabloc_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - - self.__timer_group = MutuallyExclusiveCallbackGroup() - - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value - ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value - ) +class YabLocEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() self.__result = YabLocResult() - self.__result_writer = ResultWriter(self.__result_json_path, self.get_clock(), {}) - - self.__initial_pose = set_initial_pose( - self.__scenario_yaml_obj["Evaluation"]["InitialPose"] - ) - self.__initial_pose_running = False - self.__initial_pose_success = False - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - self.__counter = 0 - - # service client - self.__initial_pose_client = self.create_client( - InitializeLocalization, "/api/localization/initialize" - ) - self.__map_fit_client = self.create_client( - PoseWithCovarianceStamped, "/map/map_height_fitter/service" - ) - while not self.__initial_pose_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("initial pose service not available, waiting again...") - while not self.__map_fit_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("map height fitter service not available, waiting again...") - - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer - self.__sub_diagnostics = self.create_subscription( DiagnosticArray, "/diagnostics", @@ -136,76 +76,17 @@ def __init__(self): 1, ) + def check_scenario(self) -> None: + pass + def diagnostics_cb(self, msg: DiagnosticArray): - self.__result.add_yabloc_availability_frame(msg) - self.__result_writer.write(self.__result) - - def timer_cb(self) -> None: - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if ( - self.__initial_pose is not None - and not self.__initial_pose_success - and not self.__initial_pose_running - ): - self.get_logger().info( - f"call initial_pose time: {self.__current_time.sec}.{self.__current_time.nanosec}" - ) - self.__initial_pose_running = True - self.__initial_pose.header.stamp = self.__current_time - future_map_fit = self.__map_fit_client.call_async( - PoseWithCovarianceStamped.Request(pose_with_covariance=self.__initial_pose) - ) - future_map_fit.add_done_callback(self.map_fit_cb) - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.__result_writer.close() - rclpy.shutdown() - - def map_fit_cb(self, future): - result = future.result() - if result is not None: - if result.success: - future_init_pose = self.__initial_pose_client.call_async( - InitializeLocalization.Request(pose=[result.pose_with_covariance]) - ) - future_init_pose.add_done_callback(self.initial_pose_cb) - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().warn("map_height_height service result is fail") - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().error(f"Exception for service: {future.exception()}") - - def initial_pose_cb(self, future): - result = future.result() - if result is not None: - res_status: ResponseStatus = result.status - self.__initial_pose_success = res_status.success - self.get_logger().info( - f"initial_pose_success: {self.__initial_pose_success}" - ) # debug msg - else: - self.get_logger().error(f"Exception for service: {future.exception()}") - # free self.__initial_pose_running - self.__initial_pose_running = False - - -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - localization_evaluator = YabLocEvaluator() - executor.add_node(localization_evaluator) - executor.spin() - localization_evaluator.destroy_node() - rclpy.shutdown() + self.__result.set_frame(msg) + self._result_writer.write(self.__result) + + +@evaluator_main +def main(): + return YabLocEvaluator("yabloc_evaluator") if __name__ == "__main__": From fd2da4e1e6070793aeae0b72fb343e069f090328 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 10:29:44 +0900 Subject: [PATCH 2/7] refactor: diag abc (#229) --- .../performance_diag_evaluator_node.py | 224 +++--------------- pyproject.toml | 2 +- 2 files changed, 35 insertions(+), 191 deletions(-) diff --git a/driving_log_replayer/scripts/performance_diag_evaluator_node.py b/driving_log_replayer/scripts/performance_diag_evaluator_node.py index bdd740daf..b0d396d09 100755 --- a/driving_log_replayer/scripts/performance_diag_evaluator_node.py +++ b/driving_log_replayer/scripts/performance_diag_evaluator_node.py @@ -15,42 +15,24 @@ # limitations under the License. -import os import re from typing import Dict from typing import List from typing import Optional from typing import Tuple -from typing import TYPE_CHECKING -from autoware_adapi_v1_msgs.srv import InitializeLocalization from diagnostic_msgs.msg import DiagnosticArray from diagnostic_msgs.msg import DiagnosticStatus from example_interfaces.msg import Byte from example_interfaces.msg import Float64 from geometry_msgs.msg import TransformStamped -import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node from rclpy.time import Duration -from rclpy.time import Time from std_msgs.msg import Header -from tf2_ros import Buffer from tf2_ros import TransformException -from tf2_ros import TransformListener -from tier4_localization_msgs.srv import PoseWithCovarianceStamped -import yaml -from driving_log_replayer.node_common import set_initial_pose -from driving_log_replayer.node_common import transform_stamped_with_euler_angle +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter - -if TYPE_CHECKING: - from autoware_adapi_v1_msgs.msg import ResponseStatus REGEX_VISIBILITY_DIAG_NAME = "/autoware/sensing/lidar/performance_monitoring/visibility/.*" BLOCKAGE_DIAG_BASE_NAME = ( @@ -60,11 +42,10 @@ def get_diag_value(diag_status: DiagnosticStatus, key_name: str) -> str: - diag_value = "" for key_value in diag_status.values: if key_value.key == key_name: - diag_value = key_value.value - return diag_value + return key_value.value + return "" def trim_lidar_name(diag_name: str) -> str: @@ -93,19 +74,21 @@ def __init__(self, condition: Dict): self.__blockage_lidar_result[k] = True def update(self): - summary_str = f"Visibility: {self.__visibility_msg} Blockage: {self.__blockage_msg}" + if self.__visibility_result: + visibility_summary = f"Visibility (Passed): {self.__visibility_msg}" + else: + visibility_summary = f"Visibility (Failed): {self.__visibility_msg}" + if self.__blockage_result: + blockage_summary = f"Blockage (Passed): {self.__blockage_msg}" + else: + blockage_summary = f"Blockage (Failed): {self.__blockage_msg}" + summary_str = f"{visibility_summary}, {blockage_summary}" if self.__visibility_result and self.__blockage_result: self._success = True self._summary = f"Passed: {summary_str}" - elif self.__visibility_result and not self.__blockage_result: - self._success = False - self._summary = f"BlockageError: {summary_str}" - elif not self.__visibility_result and self.__blockage_result: - self._success = False - self._summary = f"VisibilityError: {summary_str}" - elif not self.__visibility_result and not self.__blockage_result: - self._success = False - self._summary = f"VisibilityAndBlockageError: {summary_str}" + else: + self._success = True + self._summary = f"Failed: {summary_str}" def summarize_diag_agg( self, msg: DiagnosticArray @@ -273,7 +256,7 @@ def update_blockage(self): if not v: self.__blockage_result = False - def add_frame( + def set_frame( self, msg: DiagnosticArray, map_to_baselink: Dict ) -> Tuple[ Optional[Float64], @@ -305,42 +288,11 @@ def add_frame( ) -class PerformanceDiagEvaluator(Node): - def __init__(self): - super().__init__("performance_diag_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - self.declare_parameter("localization", False) # noqa - - self.__timer_group = MutuallyExclusiveCallbackGroup() - self.__tf_buffer = Buffer() - self.__tf_listener = TransformListener(self.__tf_buffer, self, spin_thread=True) - - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value - ) - self.__launch_localization = ( - self.get_parameter("scenario_path").get_parameter_value().bool_value - ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value - ) - - self.__condition = self.__scenario_yaml_obj["Evaluation"]["Conditions"] - self.__result = PerformanceDiagResult(self.__condition) - - self.__result_writer = ResultWriter( - self.__result_json_path, self.get_clock(), self.__condition - ) - - self.__initial_pose = set_initial_pose( - self.__scenario_yaml_obj["Evaluation"]["InitialPose"] - ) - self.__initial_pose_running = False - self.__initial_pose_success = False +class PerformanceDiagEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() + self.__result = PerformanceDiagResult(self._condition) self.__pub_visibility_value = self.create_publisher(Float64, "visibility/value", 1) self.__pub_visibility_level = self.create_publisher(Byte, "visibility/level", 1) @@ -348,8 +300,9 @@ def __init__(self): self.__pub_blockage_ground_ratios = {} self.__pub_blockage_sky_ratios = {} self.__pub_blockage_levels = {} + self.__diag_header_prev = Header() - for k, v in self.__condition["LiDAR"]["Blockage"].items(): + for k, v in self._condition["LiDAR"]["Blockage"].items(): if v["ScenarioType"] is not None: self.__pub_blockage_sky_ratios[k] = self.create_publisher( Float64, f"blockage/{k}/sky/ratio", 1 @@ -361,33 +314,6 @@ def __init__(self): Byte, f"blockage/{k}/level", 1 ) - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - self.__diag_header_prev = Header() - self.__counter = 0 - - # service client - self.__initial_pose_client = self.create_client( - InitializeLocalization, "/api/localization/initialize" - ) - self.__map_fit_client = self.create_client( - PoseWithCovarianceStamped, "/map/map_height_fitter/service" - ) - if self.__launch_localization: - while not self.__initial_pose_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning("initial pose service not available, waiting again...") - while not self.__map_fit_client.wait_for_service(timeout_sec=1.0): - self.get_logger().warning( - "map height fitter service not available, waiting again..." - ) - - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer - self.__sub_diag = self.create_subscription( DiagnosticArray, "/diagnostics_agg", @@ -395,87 +321,8 @@ def __init__(self): 1, ) - def timer_cb(self) -> None: - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if ( - self.__launch_localization - and self.__initial_pose is not None - and not self.__initial_pose_success - and not self.__initial_pose_running - ): - self.get_logger().info( - f"call initial_pose time: {self.__current_time.sec}.{self.__current_time.nanosec}" - ) - self.__initial_pose_running = True - self.__initial_pose.header.stamp = self.__current_time - future_map_fit = self.__map_fit_client.call_async( - PoseWithCovarianceStamped.Request(pose_with_covariance=self.__initial_pose) - ) - future_map_fit.add_done_callback(self.map_fit_cb) - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.__result_writer.close() - rclpy.shutdown() - - def map_fit_cb(self, future): - result = future.result() - if result is not None: - if result.success: - # result.pose_with_covarianceに補正済みデータが入っている - # 補正済みデータでinitialposeを投げる - # debug result.pose_with_covariance - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.position.x: {result.pose_with_covariance.pose.pose.position.x}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.position.y: {result.pose_with_covariance.pose.pose.position.y}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.position.z: {result.pose_with_covariance.pose.pose.position.z}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.x: {result.pose_with_covariance.pose.pose.orientation.x}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.y: {result.pose_with_covariance.pose.pose.orientation.y}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.z: {result.pose_with_covariance.pose.pose.orientation.z}" - # ) - # self.get_logger().error( - # f"corrected_pose_with_covariance.pose.orientation.w: {result.pose_with_covariance.pose.pose.orientation.w}" - # ) - future_init_pose = self.__initial_pose_client.call_async( - InitializeLocalization.Request(pose=[result.pose_with_covariance]) - ) - future_init_pose.add_done_callback(self.initial_pose_cb) - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().warn("map_height_height service result is fail") - else: - # free self.__initial_pose_running when the service call fails - self.__initial_pose_running = False - self.get_logger().error(f"Exception for service: {future.exception()}") - - def initial_pose_cb(self, future): - result = future.result() - if result is not None: - res_status: ResponseStatus = result.status - self.__initial_pose_success = res_status.success - self.get_logger().info( - f"initial_pose_success: {self.__initial_pose_success}" - ) # debug msg - else: - self.get_logger().error(f"Exception for service: {future.exception()}") - # free self.__initial_pose_running - self.__initial_pose_running = False + def check_scenario(self) -> None: + pass def diag_cb(self, msg: DiagnosticArray) -> None: # self.get_logger().error( @@ -485,7 +332,7 @@ def diag_cb(self, msg: DiagnosticArray) -> None: return self.__diag_header_prev = msg.header try: - map_to_baselink = self.__tf_buffer.lookup_transform( + map_to_baselink = self._tf_buffer.lookup_transform( "map", "base_link", msg.header.stamp, Duration(seconds=0.5) ) except TransformException as ex: @@ -497,7 +344,9 @@ def diag_cb(self, msg: DiagnosticArray) -> None: msg_blockage_sky_ratios, msg_blockage_ground_ratios, msg_blockage_levels, - ) = self.__result.add_frame(msg, transform_stamped_with_euler_angle(map_to_baselink)) + ) = self.__result.set_frame( + msg, DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink) + ) if msg_visibility_value is not None: self.__pub_visibility_value.publish(msg_visibility_value) if msg_visibility_level is not None: @@ -511,17 +360,12 @@ def diag_cb(self, msg: DiagnosticArray) -> None: for k, v in msg_blockage_levels.items(): if v is not None: self.__pub_blockage_levels[k].publish(v) - self.__result_writer.write(self.__result) + self._result_writer.write(self.__result) -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - performance_diag_evaluator = PerformanceDiagEvaluator() - executor.add_node(performance_diag_evaluator) - executor.spin() - performance_diag_evaluator.destroy_node() - rclpy.shutdown() +@evaluator_main +def main(): + return PerformanceDiagEvaluator("performance_diag_evaluator") if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index f819c324d..66d73620a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ show-source = true # select = ["A", "B", "C", "D", "E", "F", "G", "I", "N", "Q", "S", "T", "W", "ANN", "ARG", "BLE", "COM", "DJ", "DTZ", "EM", "ERA", "EXE", "FBT", "ICN", "INP", "ISC", "NPY", "PD", "PGH", "PIE", "PL", "PT", "PTH", "PYI", "RET", "RSE", "RUF", "SIM", "SLF", "TCH", "TID", "TRY", "UP", "YTT"] # "G", "S", "T", "ANN", "COM", "DJ", "DTZ", "ERA", "INP", "PD", "PGH", "PL", "PTH" select = ["A", "B", "C", "D", "E", "F", "I", "N", "Q", "W", "ARG", "BLE", "EM", "EXE", "FBT", "ICN", "ISC", "NPY", "PIE", "PT", "PYI", "RET", "RSE", "RUF", "SIM", "SLF", "TCH", "TID", "TRY", "UP", "YTT"] -ignore = ["D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203", "D212", "D404", "D417", "E203", "E501", "Q000", "S108", "FBT001", "S602", "C901"] +ignore = ["D100", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "D203", "D212", "D404", "D417", "E203", "E501", "Q000", "S108", "FBT001", "S602", "C901", "UP006", "UP007", "UP035"] fixable=["D", "I", "EXE", "PIE"] [tool.ruff.isort] From 2570f447846cbd9fc91b8a94bcfefb88d6d177a2 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 10:35:56 +0900 Subject: [PATCH 3/7] refactor: perception abc (#230) --- REFACTOR.md | 4 + .../driving_log_replayer/evaluator.py | 22 +- .../scripts/perception_evaluator_node.py | 292 +++++++----------- 3 files changed, 134 insertions(+), 184 deletions(-) create mode 100644 REFACTOR.md diff --git a/REFACTOR.md b/REFACTOR.md new file mode 100644 index 000000000..0137bc5b3 --- /dev/null +++ b/REFACTOR.md @@ -0,0 +1,4 @@ +# refactor + +- [] frameをsetしてからwriteするまで更新される可能性がある。コピーを渡すほうがいいかも +- [] frameを渡したら、set_frameを自動でやって書き込みまで自動にしたい diff --git a/driving_log_replayer/driving_log_replayer/evaluator.py b/driving_log_replayer/driving_log_replayer/evaluator.py index b90dc3e9b..ff929034c 100644 --- a/driving_log_replayer/driving_log_replayer/evaluator.py +++ b/driving_log_replayer/driving_log_replayer/evaluator.py @@ -16,6 +16,7 @@ from abc import abstractmethod import os from os.path import expandvars +from pathlib import Path from typing import Dict from typing import Optional from typing import TYPE_CHECKING @@ -94,7 +95,24 @@ def __init__(self, name: str) -> None: clock=Clock(clock_type=ClockType.SYSTEM_TIME), ) # wall timer - def timer_cb(self) -> None: + def use_t4_dataset(self) -> None: + self.declare_parameter("t4_dataset_path", "") + self.declare_parameter("result_archive_path", "") + + result_archive_path = Path( + expandvars(self.get_parameter("result_archive_path").get_parameter_value().string_value) + ) + result_archive_path.mkdir(exist_ok=True) + + self._pkl_path = result_archive_path.joinpath("scene_result.pkl").as_posix() + self._t4_dataset_paths = [ + expandvars(self.get_parameter("t4_dataset_path").get_parameter_value().string_value) + ] + self._perception_eval_log_path = result_archive_path.parent.joinpath( + "perception_eval_log" + ).as_posix() + + def timer_cb(self, *, write_metrics_func=None) -> None: self._current_time = self.get_clock().now().to_msg() # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") if self._current_time.sec > 0: @@ -106,6 +124,8 @@ def timer_cb(self) -> None: self._clock_stop_counter = 0 self._prev_time = self._current_time if self._clock_stop_counter >= DLREvaluator.COUNT_SHUTDOWN_NODE: + if write_metrics_func is not None: + write_metrics_func() self._result_writer.close() rclpy.shutdown() diff --git a/driving_log_replayer/scripts/perception_evaluator_node.py b/driving_log_replayer/scripts/perception_evaluator_node.py index 5428d2172..0504ea7e4 100755 --- a/driving_log_replayer/scripts/perception_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_evaluator_node.py @@ -16,10 +16,8 @@ import logging import os -from pathlib import Path from typing import Dict from typing import List -from typing import Optional from typing import Tuple from typing import Union @@ -44,26 +42,17 @@ from perception_eval.tool import PerceptionAnalyzer3D from perception_eval.util.logger_config import configure_logger import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node from rclpy.time import Duration -from rclpy.time import Time from std_msgs.msg import ColorRGBA from std_msgs.msg import Header -from tf2_ros import Buffer from tf2_ros import TransformException -from tf2_ros import TransformListener from visualization_msgs.msg import MarkerArray -import yaml -from driving_log_replayer.node_common import transform_stamped_with_euler_angle +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main import driving_log_replayer.perception_eval_conversions as eval_conversions from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter def get_label(classification: ObjectClassification) -> str: @@ -122,7 +111,7 @@ def update(self): self._success = False self._summary = f"Failed: {summary_str}" - def add_frame( + def set_frame( self, frame: PerceptionFrameResult, skip: int, header: Header, map_to_baselink: Dict ) -> Tuple[MarkerArray, MarkerArray]: self.__total += 1 @@ -174,163 +163,105 @@ def add_frame( self.update() return marker_ground_truth, marker_results - def add_final_metrics(self, final_metrics: Dict): + def set_final_metrics(self, final_metrics: Dict): self._frame = {"FinalScore": final_metrics} -class PerceptionEvaluator(Node): - def __init__(self): - super().__init__("perception_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - self.declare_parameter("t4_dataset_path", "") - self.declare_parameter("result_archive_path", "") +class PerceptionEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() + self.use_t4_dataset() - self.__timer_group = MutuallyExclusiveCallbackGroup() - self.__tf_buffer = Buffer() - self.__tf_listener = TransformListener(self.__tf_buffer, self, spin_thread=True) + self.__result = PerceptionResult(self._condition) + self.__frame_id: FrameID = FrameID.from_value(self.__frame_id_str) - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value + evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( + dataset_paths=self._t4_dataset_paths, + frame_id=self.__frame_id_str, + result_root_directory=os.path.join(self._perception_eval_log_path, "result", "{TIME}"), + evaluation_config_dict=self.__p_cfg["evaluation_config_dict"], + load_raw_data=False, ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value + _ = configure_logger( + log_file_directory=evaluation_config.log_directory, + console_log_level=logging.INFO, + file_log_level=logging.INFO, ) - result_archive_path = Path( - os.path.expandvars( - self.get_parameter("result_archive_path").get_parameter_value().string_value + # どれを注目物体とするかのparam + self.__critical_object_filter_config: CriticalObjectFilterConfig = ( + CriticalObjectFilterConfig( + evaluator_config=evaluation_config, + target_labels=self.__c_cfg["target_labels"], + ignore_attributes=self.__c_cfg.get("ignore_attributes", None), + max_x_position_list=self.__c_cfg.get("max_x_position_list", None), + max_y_position_list=self.__c_cfg.get("max_y_position_list", None), + max_distance_list=self.__c_cfg.get("max_distance_list", None), + min_distance_list=self.__c_cfg.get("min_distance_list", None), + min_point_numbers=self.__c_cfg.get("min_point_numbers", None), + confidence_threshold_list=self.__c_cfg.get("confidence_threshold_list", None), + target_uuids=self.__c_cfg.get("target_uuids", None), ) ) - result_archive_path.mkdir(exist_ok=True) - - self.__pkl_path = result_archive_path.joinpath("scene_result.pkl").as_posix() - self.__t4_dataset_paths = [ - os.path.expandvars( - self.get_parameter("t4_dataset_path").get_parameter_value().string_value - ) - ] - self.__perception_eval_log_path = result_archive_path.parent.joinpath( - "perception_eval_log" - ).as_posix() + # Pass fail を決めるパラメータ + self.__frame_pass_fail_config: PerceptionPassFailConfig = PerceptionPassFailConfig( + evaluator_config=evaluation_config, + target_labels=self.__f_cfg["target_labels"], + matching_threshold_list=self.__f_cfg.get("matching_threshold_list", None), + confidence_threshold_list=self.__f_cfg.get("confidence_threshold_list", None), + ) + self.__evaluator = PerceptionEvaluationManager(evaluation_config=evaluation_config) + self.__sub_perception = self.create_subscription( + self.__msg_type, + "/perception/object_recognition/" + self.__topic_ns + "/objects", + self.perception_cb, + 1, + ) + self.__pub_marker_ground_truth = self.create_publisher( + MarkerArray, "marker/ground_truth", 1 + ) + self.__pub_marker_results = self.create_publisher(MarkerArray, "marker/results", 1) + self.__skip_counter = 0 + def check_scenario(self) -> None: try: - self.__condition = self.__scenario_yaml_obj["Evaluation"]["Conditions"] - self.__result = PerceptionResult(self.__condition) - - self.__result_writer = ResultWriter( - self.__result_json_path, self.get_clock(), self.__condition - ) - - p_cfg = self.__scenario_yaml_obj["Evaluation"]["PerceptionEvaluationConfig"] - c_cfg = self.__scenario_yaml_obj["Evaluation"]["CriticalObjectFilterConfig"] - f_cfg = self.__scenario_yaml_obj["Evaluation"]["PerceptionPassFailConfig"] - - self.__evaluation_task = p_cfg["evaluation_config_dict"]["evaluation_task"] - p_cfg["evaluation_config_dict"][ + self.__p_cfg = self._scenario_yaml_obj["Evaluation"]["PerceptionEvaluationConfig"] + self.__c_cfg = self._scenario_yaml_obj["Evaluation"]["CriticalObjectFilterConfig"] + self.__f_cfg = self._scenario_yaml_obj["Evaluation"]["PerceptionPassFailConfig"] + self.__evaluation_task = self.__p_cfg["evaluation_config_dict"]["evaluation_task"] + self.__p_cfg["evaluation_config_dict"][ "label_prefix" ] = "autoware" # Add a fixed value setting - - frame_id, msg_type, topic_ns = self.get_frame_id_and_msg_type() - self.__frame_id = FrameID.from_value(frame_id) - - evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( - dataset_paths=self.__t4_dataset_paths, - frame_id=frame_id, - result_root_directory=os.path.join( - self.__perception_eval_log_path, "result", "{TIME}" - ), - evaluation_config_dict=p_cfg["evaluation_config_dict"], - load_raw_data=False, - ) - _ = configure_logger( - log_file_directory=evaluation_config.log_directory, - console_log_level=logging.INFO, - file_log_level=logging.INFO, - ) - # どれを注目物体とするかのparam - self.__critical_object_filter_config: CriticalObjectFilterConfig = ( - CriticalObjectFilterConfig( - evaluator_config=evaluation_config, - target_labels=c_cfg["target_labels"], - ignore_attributes=c_cfg.get("ignore_attributes", None), - max_x_position_list=c_cfg.get("max_x_position_list", None), - max_y_position_list=c_cfg.get("max_y_position_list", None), - max_distance_list=c_cfg.get("max_distance_list", None), - min_distance_list=c_cfg.get("min_distance_list", None), - min_point_numbers=c_cfg.get("min_point_numbers", None), - confidence_threshold_list=c_cfg.get("confidence_threshold_list", None), - target_uuids=c_cfg.get("target_uuids", None), - ) - ) - # Pass fail を決めるパラメータ - self.__frame_pass_fail_config: PerceptionPassFailConfig = PerceptionPassFailConfig( - evaluator_config=evaluation_config, - target_labels=f_cfg["target_labels"], - matching_threshold_list=f_cfg.get("matching_threshold_list", None), - confidence_threshold_list=f_cfg.get("confidence_threshold_list", None), - ) - self.__evaluator = PerceptionEvaluationManager(evaluation_config=evaluation_config) - self.__sub_perception = self.create_subscription( - msg_type, - "/perception/object_recognition/" + topic_ns + "/objects", - self.perception_cb, - 1, - ) - self.__pub_marker_ground_truth = self.create_publisher( - MarkerArray, "marker/ground_truth", 1 - ) - self.__pub_marker_results = self.create_publisher(MarkerArray, "marker/results", 1) - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - - self.__counter = 0 - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer - self.__skip_counter = 0 except KeyError: - # Immediate termination if the scenario does not contain the required items and is incompatible. self.get_logger().error("Scenario format error.") rclpy.shutdown() + if not self.check_evaluation_task(): + rclpy.shutdown() - def get_frame_id_and_msg_type( - self, - ) -> Optional[Tuple[str, Union[DetectedObjects, TrackedObjects], str]]: + def check_evaluation_task(self) -> bool: if self.__evaluation_task in ["detection", "fp_validation"]: - return "base_link", DetectedObjects, "detection" + self.__frame_id_str = "base_link" + self.__msg_type = DetectedObjects + self.__topic_ns = "detection" + return True if self.__evaluation_task == "tracking": - return "map", TrackedObjects, "tracking" + self.__frame_id_str = "map" + self.__msg_type = TrackedObjects + self.__topic_ns = "tracking" + return True self.get_logger().error(f"Unexpected evaluation task: {self.__evaluation_task}") - rclpy.shutdown() - return None + return False def timer_cb(self): - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.write_log() - rclpy.shutdown() - - def write_log(self): - self.__pickle_writer = PickleWriter(self.__pkl_path) + super().timer_cb(write_metrics_func=self.write_metrics) + + def write_metrics(self): + self.__pickle_writer = PickleWriter(self._pkl_path) self.__pickle_writer.dump(self.__evaluator.frame_results) if self.__evaluation_task == "fp_validation": final_metrics = self.get_fp_result() - self.__result.add_final_metrics(final_metrics) - self.__result_writer.write(self.__result) + self.__result.set_final_metrics(final_metrics) + self._result_writer.write(self.__result) else: self.get_final_result() score_dict = {} @@ -345,9 +276,8 @@ def write_log(self): error_df.groupby(level=0).apply(lambda df: df.xs(df.name).to_dict()).to_dict() ) final_metrics = {"Score": score_dict, "Error": error_dict} - self.__result.add_final_metrics(final_metrics) - self.__result_writer.write(self.__result) - self.__result_writer.close() + self.__result.set_final_metrics(final_metrics) + self._result_writer.write(self.__result) def list_dynamic_object_from_ros_msg( self, unix_time: int, objects: Union[List[DetectedObject], List[TrackedObject]] @@ -400,7 +330,7 @@ def list_dynamic_object_from_ros_msg( def perception_cb(self, msg: Union[DetectedObjects, TrackedObjects]): try: - map_to_baselink = self.__tf_buffer.lookup_transform( + map_to_baselink = self._tf_buffer.lookup_transform( "map", "base_link", msg.header.stamp, Duration(seconds=0.5) ) except TransformException as ex: @@ -412,32 +342,33 @@ def perception_cb(self, msg: Union[DetectedObjects, TrackedObjects]): ground_truth_now_frame = self.__evaluator.get_ground_truth_now_frame(unix_time) if ground_truth_now_frame is None: self.__skip_counter += 1 - else: - estimated_objects: List[DynamicObject] = self.list_dynamic_object_from_ros_msg( - unix_time, msg.objects - ) - ros_critical_ground_truth_objects = ground_truth_now_frame.objects - # critical_object_filter_configと、frame_pass_fail_configこの中で動的に変えても良い。 - # 動的に変える条件をかけるようになるまでは、初期化時に一括設定 + return - frame_result: PerceptionFrameResult = self.__evaluator.add_frame_result( - unix_time=unix_time, - ground_truth_now_frame=ground_truth_now_frame, - estimated_objects=estimated_objects, - ros_critical_ground_truth_objects=ros_critical_ground_truth_objects, - critical_object_filter_config=self.__critical_object_filter_config, - frame_pass_fail_config=self.__frame_pass_fail_config, - ) - # write result - marker_ground_truth, marker_results = self.__result.add_frame( - frame_result, - self.__skip_counter, - msg.header, - transform_stamped_with_euler_angle(map_to_baselink), - ) - self.__result_writer.write(self.__result) - self.__pub_marker_ground_truth.publish(marker_ground_truth) - self.__pub_marker_results.publish(marker_results) + estimated_objects: List[DynamicObject] = self.list_dynamic_object_from_ros_msg( + unix_time, msg.objects + ) + ros_critical_ground_truth_objects = ground_truth_now_frame.objects + # critical_object_filter_configと、frame_pass_fail_configこの中で動的に変えても良い。 + # 動的に変える条件をかけるようになるまでは、初期化時に一括設定 + + frame_result: PerceptionFrameResult = self.__evaluator.add_frame_result( + unix_time=unix_time, + ground_truth_now_frame=ground_truth_now_frame, + estimated_objects=estimated_objects, + ros_critical_ground_truth_objects=ros_critical_ground_truth_objects, + critical_object_filter_config=self.__critical_object_filter_config, + frame_pass_fail_config=self.__frame_pass_fail_config, + ) + # write result + marker_ground_truth, marker_results = self.__result.set_frame( + frame_result, + self.__skip_counter, + msg.header, + DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), + ) + self._result_writer.write(self.__result) + self.__pub_marker_ground_truth.publish(marker_ground_truth) + self.__pub_marker_results.publish(marker_results) def get_final_result(self) -> MetricsScore: num_critical_fail: int = sum( @@ -508,14 +439,9 @@ def get_fp_result(self) -> Dict: } -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - perception_evaluator = PerceptionEvaluator() - executor.add_node(perception_evaluator) - executor.spin() - perception_evaluator.destroy_node() - rclpy.shutdown() +@evaluator_main +def main(): + return PerceptionEvaluator("perception_evaluator") if __name__ == "__main__": From 26f928dc99bbe7eead82c1b2393eb03d2a6fbba4 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 10:39:53 +0900 Subject: [PATCH 4/7] refactor: 2d abc (#231) --- .../driving_log_replayer/evaluator.py | 37 +++ .../scripts/perception_2d_evaluator_node.py | 305 ++++++------------ .../scripts/perception_evaluator_node.py | 42 +-- 3 files changed, 137 insertions(+), 247 deletions(-) diff --git a/driving_log_replayer/driving_log_replayer/evaluator.py b/driving_log_replayer/driving_log_replayer/evaluator.py index ff929034c..8c6169cfa 100644 --- a/driving_log_replayer/driving_log_replayer/evaluator.py +++ b/driving_log_replayer/driving_log_replayer/evaluator.py @@ -18,10 +18,12 @@ from os.path import expandvars from pathlib import Path from typing import Dict +from typing import List from typing import Optional from typing import TYPE_CHECKING from autoware_adapi_v1_msgs.srv import InitializeLocalization +from autoware_auto_perception_msgs.msg import ObjectClassification from geometry_msgs.msg import PoseStamped from geometry_msgs.msg import PoseWithCovarianceStamped from geometry_msgs.msg import TransformStamped @@ -288,6 +290,41 @@ def set_initial_pose(cls, initial_pose: Optional[Dict]) -> Optional[PoseWithCova else: return ros_init_pose + @classmethod + def get_perception_label_str(cls, classification: ObjectClassification) -> str: + if classification.label == ObjectClassification.UNKNOWN: + return "unknown" + if classification.label == ObjectClassification.CAR: + return "car" + if classification.label == ObjectClassification.TRUCK: + return "truck" + if classification.label == ObjectClassification.BUS: + return "bus" + if classification.label == ObjectClassification.TRAILER: + # not implemented in iv + return "trailer" + if classification.label == ObjectClassification.MOTORCYCLE: + # iv: motorbike, auto: motorbike + return "motorbike" + if classification.label == ObjectClassification.BICYCLE: + return "bicycle" + if classification.label == ObjectClassification.PEDESTRIAN: + return "pedestrian" + return "other" + + @classmethod + def get_most_probable_classification( + cls, + array_classification: List[ObjectClassification], + ) -> ObjectClassification: + highest_probability = 0.0 + highest_classification = None + for classification in array_classification: + if classification.probability >= highest_probability: + highest_probability = classification.probability + highest_classification = classification + return highest_classification + def evaluator_main(func): def wrapper(): diff --git a/driving_log_replayer/scripts/perception_2d_evaluator_node.py b/driving_log_replayer/scripts/perception_2d_evaluator_node.py index 1f042a356..0add77b1e 100755 --- a/driving_log_replayer/scripts/perception_2d_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_2d_evaluator_node.py @@ -16,12 +16,9 @@ import logging import os -from pathlib import Path from typing import Dict from typing import List -from typing import Optional -from autoware_auto_perception_msgs.msg import ObjectClassification from geometry_msgs.msg import TransformStamped from perception_eval.common.object2d import DynamicObject2D from perception_eval.config import PerceptionEvaluationConfig @@ -33,63 +30,17 @@ from perception_eval.tool import PerceptionAnalyzer2D from perception_eval.util.logger_config import configure_logger import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node from rclpy.time import Duration -from rclpy.time import Time from std_msgs.msg import Header -from tf2_ros import Buffer from tf2_ros import TransformException -from tf2_ros import TransformListener from tier4_perception_msgs.msg import DetectedObjectsWithFeature from tier4_perception_msgs.msg import DetectedObjectWithFeature -import yaml -from driving_log_replayer.node_common import transform_stamped_with_euler_angle +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main import driving_log_replayer.perception_eval_conversions as eval_conversions from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter - - -def get_label(classification: ObjectClassification) -> str: - if classification.label == ObjectClassification.UNKNOWN: - return "unknown" - if classification.label == ObjectClassification.CAR: - return "car" - if classification.label == ObjectClassification.TRUCK: - return "truck" - if classification.label == ObjectClassification.BUS: - return "bus" - if classification.label == ObjectClassification.TRAILER: - # not implemented in iv - return "trailer" - if classification.label == ObjectClassification.MOTORCYCLE: - # iv: motorbike, auto: motorbike - return "motorbike" - if classification.label == ObjectClassification.BICYCLE: - return "bicycle" - if classification.label == ObjectClassification.PEDESTRIAN: - return "pedestrian" - # not implemented in auto - # elif classification.label == ObjectClassification.ANIMAL: - # return "animal" - return "other" - - -def get_most_probable_classification( - array_classification: List[ObjectClassification], -) -> ObjectClassification: - highest_probability = 0.0 - highest_classification = None - for classification in array_classification: - if classification.probability >= highest_probability: - highest_probability = classification.probability - highest_classification = classification - return highest_classification class Perception2DResult(ResultBase): @@ -118,7 +69,7 @@ def update(self): self._success = False self._summary = f"Failed:{summary_str}" - def add_frame( + def set_frame( self, frame: PerceptionFrameResult, skip: int, @@ -167,181 +118,126 @@ def add_frame( self._frame = out_frame self.update() - def add_final_metrics(self, final_metrics: Dict): + def set_final_metrics(self, final_metrics: Dict): self._frame = {"FinalScore": final_metrics} -class Perception2DEvaluator(Node): - def __init__(self): - super().__init__("perception_2d_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - self.declare_parameter("t4_dataset_path", "") - self.declare_parameter("result_archive_path", "") +class Perception2DEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() + self.use_t4_dataset() - self.__timer_group = MutuallyExclusiveCallbackGroup() - self.__tf_buffer = Buffer() - self.__tf_listener = TransformListener(self.__tf_buffer, self, spin_thread=True) + self.__result = Perception2DResult(self._condition) - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value + evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( + dataset_paths=self._t4_dataset_paths, + frame_id=list(self.__camera_type_dict.keys()), + result_root_directory=os.path.join(self._perception_eval_log_path, "result", "{TIME}"), + evaluation_config_dict=self.__p_cfg["evaluation_config_dict"], + load_raw_data=False, ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value + _ = configure_logger( + log_file_directory=evaluation_config.log_directory, + console_log_level=logging.INFO, + file_log_level=logging.INFO, ) - result_archive_path = Path( - os.path.expandvars( - self.get_parameter("result_archive_path").get_parameter_value().string_value + # どれを注目物体とするかのparam + self.__critical_object_filter_config: CriticalObjectFilterConfig = ( + CriticalObjectFilterConfig( + evaluator_config=evaluation_config, + ignore_attributes=self.__c_cfg["ignore_attributes"], + target_labels=self.__c_cfg["target_labels"], ) ) - result_archive_path.mkdir(exist_ok=True) - - self.__pkl_path = result_archive_path.joinpath("scene_result.pkl").as_posix() - self.__t4_dataset_paths = [ - os.path.expandvars( - self.get_parameter("t4_dataset_path").get_parameter_value().string_value + # Pass fail を決めるパラメータ + self.__frame_pass_fail_config: PerceptionPassFailConfig = PerceptionPassFailConfig( + evaluator_config=evaluation_config, + target_labels=self.__f_cfg["target_labels"], + matching_threshold_list=self.__f_cfg["matching_threshold_list"], + ) + self.__evaluator = PerceptionEvaluationManager(evaluation_config=evaluation_config) + + self.__subscribers = {} + self.__skip_counter = {} + for camera_type, camera_no in self.__camera_type_dict.items(): + self.__subscribers[camera_type] = self.create_subscription( + DetectedObjectsWithFeature, + self.get_topic_name(camera_no), + lambda msg, local_type=camera_type: self.detected_objs_cb(msg, local_type), + 1, ) - ] - self.__perception_eval_log_path = result_archive_path.parent.joinpath( - "perception_eval_log" - ).as_posix() + self.__skip_counter[camera_type] = 0 + def check_scenario(self) -> None: try: - self.__condition = self.__scenario_yaml_obj["Evaluation"]["Conditions"] - self.__result = Perception2DResult(self.__condition) - - self.__result_writer = ResultWriter( - self.__result_json_path, self.get_clock(), self.__condition - ) - - p_cfg = self.__scenario_yaml_obj["Evaluation"]["PerceptionEvaluationConfig"] - c_cfg = self.__scenario_yaml_obj["Evaluation"]["CriticalObjectFilterConfig"] - f_cfg = self.__scenario_yaml_obj["Evaluation"]["PerceptionPassFailConfig"] - - evaluation_task = p_cfg["evaluation_config_dict"]["evaluation_task"] - p_cfg["evaluation_config_dict"][ + self.__p_cfg = self._scenario_yaml_obj["Evaluation"]["PerceptionEvaluationConfig"] + self.__c_cfg = self._scenario_yaml_obj["Evaluation"]["CriticalObjectFilterConfig"] + self.__f_cfg = self._scenario_yaml_obj["Evaluation"]["PerceptionPassFailConfig"] + self.__evaluation_task = self.__p_cfg["evaluation_config_dict"]["evaluation_task"] + self.__p_cfg["evaluation_config_dict"][ "label_prefix" ] = "autoware" # Add a fixed value setting - p_cfg["evaluation_config_dict"][ + self.__p_cfg["evaluation_config_dict"][ "count_label_number" ] = True # Add a fixed value setting - - self.__camera_type_dict = self.__condition["TargetCameras"] - if isinstance(self.__camera_type_dict, str) or len(self.__camera_type_dict) == 0: - self.get_logger().error("camera_types is not appropriate.") - rclpy.shutdown() - - if evaluation_task not in ["detection2d", "tracking2d"]: - self.get_logger().error(f"invalid evaluation task {evaluation_task}.") - rclpy.shutdown() - - evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( - dataset_paths=self.__t4_dataset_paths, - frame_id=list(self.__camera_type_dict.keys()), - result_root_directory=os.path.join( - self.__perception_eval_log_path, "result", "{TIME}" - ), - evaluation_config_dict=p_cfg["evaluation_config_dict"], - load_raw_data=False, - ) - _ = configure_logger( - log_file_directory=evaluation_config.log_directory, - console_log_level=logging.INFO, - file_log_level=logging.INFO, - ) - # どれを注目物体とするかのparam - self.__critical_object_filter_config: CriticalObjectFilterConfig = ( - CriticalObjectFilterConfig( - evaluator_config=evaluation_config, - ignore_attributes=c_cfg["ignore_attributes"], - target_labels=c_cfg["target_labels"], - ) - ) - # Pass fail を決めるパラメータ - self.__frame_pass_fail_config: PerceptionPassFailConfig = PerceptionPassFailConfig( - evaluator_config=evaluation_config, - target_labels=f_cfg["target_labels"], - matching_threshold_list=f_cfg["matching_threshold_list"], - ) - self.__evaluator = PerceptionEvaluationManager(evaluation_config=evaluation_config) - - self.__subscribers = {} - self.__skip_counter = {} - for camera_type, camera_no in self.__camera_type_dict.items(): - self.__subscribers[camera_type] = self.create_subscription( - DetectedObjectsWithFeature, - self.get_topic_name(evaluation_task, camera_no), - lambda msg, local_type=camera_type: self.detected_objs_cb(msg, local_type), - 1, - ) - self.__skip_counter[camera_type] = 0 - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - - self.__counter = 0 - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer + self.__camera_type_dict = self._condition["TargetCameras"] except KeyError: - # Immediate termination if the scenario does not contain the required items and is incompatible. self.get_logger().error("Scenario format error.") rclpy.shutdown() + if not self.check_evaluation_task(): + rclpy.shutdown() + if not self.check_camera_type(): + rclpy.shutdown() + + def check_evaluation_task(self) -> bool: + if self.__evaluation_task in ["detection2d", "tracking2d"]: + return True + self.get_logger().error(f"Unexpected evaluation task: {self.__evaluation_task}.") + return False + + def check_camera_type(self) -> bool: + if isinstance(self.__camera_type_dict, str) or len(self.__camera_type_dict) == 0: + self.get_logger().error("camera_types is not appropriate.") + return False + return True - def get_topic_name(self, evaluation_task: str, camera_no: int) -> Optional[str]: - if evaluation_task == "detection2d": + def get_topic_name(self, camera_no: int) -> str: + if self.__evaluation_task == "detection2d": return f"/perception/object_recognition/detection/rois{camera_no}" - if evaluation_task == "tracking2d": - return f"/perception/object_recognition/detection/tracked/rois{camera_no}" - self.get_logger.error(f"invalid evaluation_task {evaluation_task}") - rclpy.shutdown() - return None + return f"/perception/object_recognition/detection/tracked/rois{camera_no}" # tracking2d def timer_cb(self): - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.__pickle_writer = PickleWriter(self.__pkl_path) - self.__pickle_writer.dump(self.__evaluator.frame_results) - self.get_final_result() - - analyzer = PerceptionAnalyzer2D(self.__evaluator.evaluator_config) - analyzer.add(self.__evaluator.frame_results) - score_df, conf_mat_df = analyzer.analyze() - score_dict = {} - conf_mat_dict = {} - if score_df is not None: - score_dict = score_df.to_dict() - if conf_mat_df is not None: - conf_mat_dict = conf_mat_df.to_dict() - final_metrics = {"Score": score_dict, "ConfusionMatrix": conf_mat_dict} - self.__result.add_final_metrics(final_metrics) - self.__result_writer.write(self.__result) - self.__result_writer.close() - rclpy.shutdown() + super().timer_cb(write_metrics_func=self.write_metrics) + + def write_metrics(self): + self.__pickle_writer = PickleWriter(self._pkl_path) + self.__pickle_writer.dump(self.__evaluator.frame_results) + + self.get_final_result() + score_dict = {} + conf_mat_dict = {} + analyzer = PerceptionAnalyzer2D(self.__evaluator.evaluator_config) + analyzer.add(self.__evaluator.frame_results) + score_df, conf_mat_df = analyzer.analyze() + if score_df is not None: + score_dict = score_df.to_dict() + if conf_mat_df is not None: + conf_mat_dict = conf_mat_df.to_dict() + final_metrics = {"Score": score_dict, "ConfusionMatrix": conf_mat_dict} + self.__result.set_final_metrics(final_metrics) + self._result_writer.write(self.__result) def list_dynamic_object_2d_from_ros_msg( self, unix_time: int, feature_objects: List[DetectedObjectWithFeature], camera_type: str ) -> List[DynamicObject2D]: estimated_objects: List[DynamicObject2D] = [] for perception_object in feature_objects: - most_probable_classification = get_most_probable_classification( + most_probable_classification = DLREvaluator.get_most_probable_classification( perception_object.object.classification ) label = self.__evaluator.evaluator_config.label_converter.convert_label( - name=get_label(most_probable_classification) + name=DLREvaluator.get_perception_label_str(most_probable_classification) ) obj_roi = perception_object.feature.roi roi = obj_roi.x_offset, obj_roi.y_offset, obj_roi.width, obj_roi.height @@ -360,7 +256,7 @@ def list_dynamic_object_2d_from_ros_msg( def detected_objs_cb(self, msg: DetectedObjectsWithFeature, camera_type: str): # self.get_logger().error(f"{camera_type} callback") try: - map_to_baselink = self.__tf_buffer.lookup_transform( + map_to_baselink = self._tf_buffer.lookup_transform( "map", "base_link", msg.header.stamp, Duration(seconds=0.5) ) except TransformException as ex: @@ -388,14 +284,14 @@ def detected_objs_cb(self, msg: DetectedObjectsWithFeature, camera_type: str): frame_pass_fail_config=self.__frame_pass_fail_config, ) # write result - self.__result.add_frame( + self.__result.set_frame( frame_result, self.__skip_counter[camera_type], msg.header, - transform_stamped_with_euler_angle(map_to_baselink), + DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), camera_type, ) - self.__result_writer.write(self.__result) + self._result_writer.write(self.__result) def get_final_result(self) -> MetricsScore: final_metric_score = self.__evaluator.get_scene_result() @@ -405,14 +301,9 @@ def get_final_result(self) -> MetricsScore: return final_metric_score -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - perception_2d_evaluator = Perception2DEvaluator() - executor.add_node(perception_2d_evaluator) - executor.spin() - perception_2d_evaluator.destroy_node() - rclpy.shutdown() +@evaluator_main +def main(): + return Perception2DEvaluator("perception_2d_evaluator") if __name__ == "__main__": diff --git a/driving_log_replayer/scripts/perception_evaluator_node.py b/driving_log_replayer/scripts/perception_evaluator_node.py index 0504ea7e4..6a6881c18 100755 --- a/driving_log_replayer/scripts/perception_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_evaluator_node.py @@ -23,7 +23,6 @@ from autoware_auto_perception_msgs.msg import DetectedObject from autoware_auto_perception_msgs.msg import DetectedObjects -from autoware_auto_perception_msgs.msg import ObjectClassification from autoware_auto_perception_msgs.msg import TrackedObject from autoware_auto_perception_msgs.msg import TrackedObjects from geometry_msgs.msg import TransformStamped @@ -55,43 +54,6 @@ from driving_log_replayer.result import ResultBase -def get_label(classification: ObjectClassification) -> str: - if classification.label == ObjectClassification.UNKNOWN: - return "unknown" - if classification.label == ObjectClassification.CAR: - return "car" - if classification.label == ObjectClassification.TRUCK: - return "truck" - if classification.label == ObjectClassification.BUS: - return "bus" - if classification.label == ObjectClassification.TRAILER: - # not implemented in iv - return "trailer" - if classification.label == ObjectClassification.MOTORCYCLE: - # iv: motorbike, auto: motorbike - return "motorbike" - if classification.label == ObjectClassification.BICYCLE: - return "bicycle" - if classification.label == ObjectClassification.PEDESTRIAN: - return "pedestrian" - # not implemented in auto - # elif classification.label == ObjectClassification.ANIMAL: - # return "animal" - return "other" - - -def get_most_probable_classification( - array_classification: List[ObjectClassification], -) -> ObjectClassification: - highest_probability = 0.0 - highest_classification = None - for classification in array_classification: - if classification.probability >= highest_probability: - highest_probability = classification.probability - highest_classification = classification - return highest_classification - - class PerceptionResult(ResultBase): def __init__(self, condition: Dict): super().__init__() @@ -284,11 +246,11 @@ def list_dynamic_object_from_ros_msg( ) -> List[DynamicObject]: estimated_objects: List[DynamicObject] = [] for perception_object in objects: - most_probable_classification = get_most_probable_classification( + most_probable_classification = DLREvaluator.get_most_probable_classification( perception_object.classification ) label = self.__evaluator.evaluator_config.label_converter.convert_label( - name=get_label(most_probable_classification) + name=DLREvaluator.get_perception_label_str(most_probable_classification) ) uuid = None From 46d80e298e2d81eec3b35be7834ac51610d3688f Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 10:44:30 +0900 Subject: [PATCH 5/7] refactor: traffic light abc (#232) --- .../driving_log_replayer/evaluator.py | 36 +++ .../scripts/perception_2d_evaluator_node.py | 11 +- .../scripts/traffic_light_evaluator_node.py | 276 ++++++------------ 3 files changed, 120 insertions(+), 203 deletions(-) diff --git a/driving_log_replayer/driving_log_replayer/evaluator.py b/driving_log_replayer/driving_log_replayer/evaluator.py index 8c6169cfa..5b14b308c 100644 --- a/driving_log_replayer/driving_log_replayer/evaluator.py +++ b/driving_log_replayer/driving_log_replayer/evaluator.py @@ -24,6 +24,8 @@ from autoware_adapi_v1_msgs.srv import InitializeLocalization from autoware_auto_perception_msgs.msg import ObjectClassification +from autoware_auto_perception_msgs.msg import TrafficLight +from builtin_interfaces.msg import Time as Stamp from geometry_msgs.msg import PoseStamped from geometry_msgs.msg import PoseWithCovarianceStamped from geometry_msgs.msg import TransformStamped @@ -34,10 +36,12 @@ from rclpy.clock import ClockType from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node +from rclpy.time import Duration from rclpy.time import Time from rosidl_runtime_py import message_to_ordereddict import simplejson as json from tf2_ros import Buffer +from tf2_ros import TransformException from tf2_ros import TransformListener from tf_transformations import euler_from_quaternion from tier4_localization_msgs.srv import PoseWithCovarianceStamped as PoseWithCovarianceStampedSrv @@ -190,6 +194,15 @@ def initial_pose_cb(self, future): # free self._initial_pose_running self._initial_pose_running = False + def lookup_transform(self, stamp: Stamp) -> TransformStamped: + try: + return self._tf_buffer.lookup_transform( + "map", "base_link", stamp, Duration(seconds=0.5) + ) + except TransformException as ex: + self.get_logger().info(f"Could not transform map to baselink: {ex}") + return TransformStamped() + @abstractmethod def check_scenario(self) -> None: """Check self._scenario_yaml_obj and if has error shutdown.""" @@ -325,6 +338,29 @@ def get_most_probable_classification( highest_classification = classification return highest_classification + @classmethod + def get_traffic_light_label_str(cls, light: TrafficLight) -> str: + if light.color == TrafficLight.RED: + return "red" + if light.color == TrafficLight.AMBER: + return "yellow" + if light.color == TrafficLight.GREEN: + return "green" + return "unknown" + + @classmethod + def get_most_probable_signal( + cls, + lights: List[TrafficLight], + ) -> TrafficLight: + highest_probability = 0.0 + highest_light = None + for light in lights: + if light.confidence >= highest_probability: + highest_probability = light.confidence + highest_light = light + return highest_light + def evaluator_main(func): def wrapper(): diff --git a/driving_log_replayer/scripts/perception_2d_evaluator_node.py b/driving_log_replayer/scripts/perception_2d_evaluator_node.py index 0add77b1e..68016afcd 100755 --- a/driving_log_replayer/scripts/perception_2d_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_2d_evaluator_node.py @@ -19,7 +19,6 @@ from typing import Dict from typing import List -from geometry_msgs.msg import TransformStamped from perception_eval.common.object2d import DynamicObject2D from perception_eval.config import PerceptionEvaluationConfig from perception_eval.evaluation import PerceptionFrameResult @@ -30,9 +29,7 @@ from perception_eval.tool import PerceptionAnalyzer2D from perception_eval.util.logger_config import configure_logger import rclpy -from rclpy.time import Duration from std_msgs.msg import Header -from tf2_ros import TransformException from tier4_perception_msgs.msg import DetectedObjectsWithFeature from tier4_perception_msgs.msg import DetectedObjectWithFeature @@ -255,13 +252,7 @@ def list_dynamic_object_2d_from_ros_msg( def detected_objs_cb(self, msg: DetectedObjectsWithFeature, camera_type: str): # self.get_logger().error(f"{camera_type} callback") - try: - map_to_baselink = self._tf_buffer.lookup_transform( - "map", "base_link", msg.header.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.header.stamp) unix_time: int = eval_conversions.unix_time_from_ros_msg(msg.header) # 現frameに対応するGround truthを取得 ground_truth_now_frame = self.__evaluator.get_ground_truth_now_frame(unix_time) diff --git a/driving_log_replayer/scripts/traffic_light_evaluator_node.py b/driving_log_replayer/scripts/traffic_light_evaluator_node.py index 8f86a8715..9905ddb0c 100755 --- a/driving_log_replayer/scripts/traffic_light_evaluator_node.py +++ b/driving_log_replayer/scripts/traffic_light_evaluator_node.py @@ -16,14 +16,11 @@ import logging import os -from pathlib import Path from typing import Dict from typing import List -from autoware_auto_perception_msgs.msg import TrafficLight from autoware_auto_perception_msgs.msg import TrafficSignal from autoware_auto_perception_msgs.msg import TrafficSignalArray -from geometry_msgs.msg import TransformStamped from perception_eval.common.object2d import DynamicObject2D from perception_eval.config import PerceptionEvaluationConfig from perception_eval.evaluation import PerceptionFrameResult @@ -34,46 +31,13 @@ from perception_eval.tool import PerceptionAnalyzer2D from perception_eval.util.logger_config import configure_logger import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node -from rclpy.time import Duration -from rclpy.time import Time from std_msgs.msg import Header -from tf2_ros import Buffer -from tf2_ros import TransformException -from tf2_ros import TransformListener -import yaml -from driving_log_replayer.node_common import transform_stamped_with_euler_angle +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main import driving_log_replayer.perception_eval_conversions as eval_conversions from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter - - -def get_label(light: TrafficLight) -> str: - if light.color == TrafficLight.RED: - return "red" - if light.color == TrafficLight.AMBER: - return "yellow" - if light.color == TrafficLight.GREEN: - return "green" - return "unknown" - - -def get_most_probable_signal( - lights: List[TrafficLight], -) -> TrafficLight: - highest_probability = 0.0 - highest_light = None - for light in lights: - if light.confidence >= highest_probability: - highest_probability = light.confidence - highest_light = light - return highest_light class TrafficLightResult(ResultBase): @@ -95,7 +59,7 @@ def update(self): self._success = False self._summary = f"Failed: {summary_str}" - def add_frame( + def set_frame( self, frame: PerceptionFrameResult, skip: int, @@ -104,7 +68,6 @@ def add_frame( ): self.__total += 1 has_objects = True - # OptionalでNoneが入る場合と、空配列の場合の2種類ありそうなので、is None判定ではなくnotで判定する if ( frame.pass_fail_result.tp_object_results == [] and frame.pass_fail_result.fp_object_results == [] @@ -137,154 +100,106 @@ def add_frame( self._frame = out_frame self.update() - def add_final_metrics(self, final_metrics: Dict): + def set_final_metrics(self, final_metrics: Dict): self._frame = {"FinalScore": final_metrics} -class TrafficLightEvaluator(Node): - def __init__(self): - super().__init__("traffic_light_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - self.declare_parameter("t4_dataset_path", "") - self.declare_parameter("result_archive_path", "") +class TrafficLightEvaluator(DLREvaluator): + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() + self.use_t4_dataset() - self.__timer_group = MutuallyExclusiveCallbackGroup() - self.__tf_buffer = Buffer() - self.__tf_listener = TransformListener(self.__tf_buffer, self, spin_thread=True) + self.__result = TrafficLightResult(self._condition) - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value + evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( + dataset_paths=self._t4_dataset_paths, + frame_id=self.__camera_type, + result_root_directory=os.path.join(self._perception_eval_log_path, "result", "{TIME}"), + evaluation_config_dict=self.__p_cfg["evaluation_config_dict"], + load_raw_data=False, ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value + _ = configure_logger( + log_file_directory=evaluation_config.log_directory, + console_log_level=logging.INFO, + file_log_level=logging.INFO, ) - result_archive_path = Path( - os.path.expandvars( - self.get_parameter("result_archive_path").get_parameter_value().string_value + # どれを注目物体とするかのparam + self.__critical_object_filter_config: CriticalObjectFilterConfig = ( + CriticalObjectFilterConfig( + evaluator_config=evaluation_config, + target_labels=self.__c_cfg["target_labels"], ) ) - result_archive_path.mkdir(exist_ok=True) - - self.__pkl_path = result_archive_path.joinpath("scene_result.pkl").as_posix() - self.__t4_dataset_paths = [ - os.path.expandvars( - self.get_parameter("t4_dataset_path").get_parameter_value().string_value - ) - ] - self.__perception_eval_log_path = result_archive_path.parent.joinpath( - "perception_eval_log" - ).as_posix() + # Pass fail を決めるパラメータ + self.__frame_pass_fail_config: PerceptionPassFailConfig = PerceptionPassFailConfig( + evaluator_config=evaluation_config, + target_labels=self.__f_cfg["target_labels"], + ) + self.__evaluator = PerceptionEvaluationManager(evaluation_config=evaluation_config) + self.__sub_traffic_signals = self.create_subscription( + TrafficSignalArray, + "/perception/traffic_light_recognition/traffic_signals", + self.traffic_signals_cb, + 1, + ) + self.__skip_counter = 0 + def check_scenario(self) -> None: try: - self.__condition = self.__scenario_yaml_obj["Evaluation"]["Conditions"] - self.__result = TrafficLightResult(self.__condition) - - self.__result_writer = ResultWriter( - self.__result_json_path, self.get_clock(), self.__condition - ) - - p_cfg = self.__scenario_yaml_obj["Evaluation"]["PerceptionEvaluationConfig"] - c_cfg = self.__scenario_yaml_obj["Evaluation"]["CriticalObjectFilterConfig"] - f_cfg = self.__scenario_yaml_obj["Evaluation"]["PerceptionPassFailConfig"] - - self.__camera_type = p_cfg["camera_type"] - p_cfg["evaluation_config_dict"][ + self.__p_cfg = self._scenario_yaml_obj["Evaluation"]["PerceptionEvaluationConfig"] + self.__c_cfg = self._scenario_yaml_obj["Evaluation"]["CriticalObjectFilterConfig"] + self.__f_cfg = self._scenario_yaml_obj["Evaluation"]["PerceptionPassFailConfig"] + self.__evaluation_task = self.__p_cfg["evaluation_config_dict"]["evaluation_task"] + self.__p_cfg["evaluation_config_dict"][ "label_prefix" ] = "traffic_light" # Add a fixed value setting - p_cfg["evaluation_config_dict"][ + self.__p_cfg["evaluation_config_dict"][ "count_label_number" ] = True # Add a fixed value setting - - evaluation_config: PerceptionEvaluationConfig = PerceptionEvaluationConfig( - dataset_paths=self.__t4_dataset_paths, - frame_id=self.__camera_type, - result_root_directory=os.path.join( - self.__perception_eval_log_path, "result", "{TIME}" - ), - evaluation_config_dict=p_cfg["evaluation_config_dict"], - load_raw_data=False, - ) - _ = configure_logger( - log_file_directory=evaluation_config.log_directory, - console_log_level=logging.INFO, - file_log_level=logging.INFO, - ) - # どれを注目物体とするかのparam - self.__critical_object_filter_config: CriticalObjectFilterConfig = ( - CriticalObjectFilterConfig( - evaluator_config=evaluation_config, - target_labels=c_cfg["target_labels"], - ) - ) - # Pass fail を決めるパラメータ - self.__frame_pass_fail_config: PerceptionPassFailConfig = PerceptionPassFailConfig( - evaluator_config=evaluation_config, - target_labels=f_cfg["target_labels"], - ) - self.__evaluator = PerceptionEvaluationManager(evaluation_config=evaluation_config) - self.__sub_traffic_signals = self.create_subscription( - TrafficSignalArray, - "/perception/traffic_light_recognition/traffic_signals", - self.traffic_signals_cb, - 1, - ) - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - - self.__counter = 0 - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer - self.__skip_counter = 0 + self.__camera_type = self.__p_cfg["camera_type"] except KeyError: - # Immediate termination if the scenario does not contain the required items and is incompatible. self.get_logger().error("Scenario format error.") rclpy.shutdown() + if not self.check_evaluation_task(): + rclpy.shutdown() + # if not self.check_camera_type(): + # rclpy.shutdown() + + def check_evaluation_task(self) -> bool: + if self.__evaluation_task != "classification2d": + self.get_logger().error(f"Unexpected evaluation task: {self.__evaluation_task}") + return False + return True def timer_cb(self): - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - if self.__current_time == self.__prev_time: - self.__counter += 1 - else: - self.__counter = 0 - self.__prev_time = self.__current_time - if self.__counter >= 5: - self.__pickle_writer = PickleWriter(self.__pkl_path) - self.__pickle_writer.dump(self.__evaluator.frame_results) - self.get_final_result() - analyzer = PerceptionAnalyzer2D(self.__evaluator.evaluator_config) - analyzer.add(self.__evaluator.frame_results) - score_df, conf_mat_df = analyzer.analyze() - score_dict = {} - conf_mat_dict = {} - if score_df is not None: - score_dict = score_df.to_dict() - if conf_mat_df is not None: - conf_mat_dict = conf_mat_df.to_dict() - final_metrics = {"Score": score_dict, "ConfusionMatrix": conf_mat_dict} - self.__result.add_final_metrics(final_metrics) - self.__result_writer.write(self.__result) - self.__result_writer.close() - rclpy.shutdown() + super().timer_cb(write_metrics_func=self.write_metrics) + + def write_metrics(self): + self.__pickle_writer = PickleWriter(self._pkl_path) + self.__pickle_writer.dump(self.__evaluator.frame_results) + self.get_final_result() + score_dict = {} + conf_mat_dict = {} + analyzer = PerceptionAnalyzer2D(self.__evaluator.evaluator_config) + analyzer.add(self.__evaluator.frame_results) + score_df, conf_mat_df = analyzer.analyze() + if score_df is not None: + score_dict = score_df.to_dict() + if conf_mat_df is not None: + conf_mat_dict = conf_mat_df.to_dict() + final_metrics = {"Score": score_dict, "ConfusionMatrix": conf_mat_dict} + self.__result.set_final_metrics(final_metrics) + self._result_writer.write(self.__result) def list_dynamic_object_2d_from_ros_msg( self, unix_time: int, signals: List[TrafficSignal] ) -> List[DynamicObject2D]: estimated_objects: List[DynamicObject2D] = [] for signal in signals: - most_probable_light = get_most_probable_signal(signal.lights) + most_probable_light = DLREvaluator.get_most_probable_signal(signal.lights) label = self.__evaluator.evaluator_config.label_converter.convert_label( - name=get_label(most_probable_light) + name=DLREvaluator.get_traffic_light_label_str(most_probable_light) ) estimated_object = DynamicObject2D( @@ -299,32 +214,16 @@ def list_dynamic_object_2d_from_ros_msg( return estimated_objects def traffic_signals_cb(self, msg: TrafficSignalArray): - try: - map_to_baselink = self.__tf_buffer.lookup_transform( - "map", "base_link", msg.header.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.header.stamp) unix_time: int = eval_conversions.unix_time_from_ros_msg(msg.header) - # 現frameに対応するGround truthを取得 ground_truth_now_frame = self.__evaluator.get_ground_truth_now_frame(unix_time) if ground_truth_now_frame is None: self.__skip_counter += 1 else: - # self.get_logger().error( - # f"debug: get ground truth: {self.__current_time.sec}.{self.__current_time.nanosec}" - # ) estimated_objects: List[DynamicObject2D] = self.list_dynamic_object_2d_from_ros_msg( unix_time, msg.signals ) - # self.get_logger().error( - # f"debug: get dynamic object 2d: {self.__current_time.sec}.{self.__current_time.nanosec}" - # ) ros_critical_ground_truth_objects = ground_truth_now_frame.objects - # critical_object_filter_configと、frame_pass_fail_configこの中で動的に変えても良い。 - # 動的に変える条件をかけるようになるまでは、初期化時に一括設定 - frame_result: PerceptionFrameResult = self.__evaluator.add_frame_result( unix_time=unix_time, ground_truth_now_frame=ground_truth_now_frame, @@ -333,17 +232,13 @@ def traffic_signals_cb(self, msg: TrafficSignalArray): critical_object_filter_config=self.__critical_object_filter_config, frame_pass_fail_config=self.__frame_pass_fail_config, ) - # self.get_logger().error( - # f"debug: get frame result: {self.__current_time.sec}.{self.__current_time.nanosec}" - # ) - # write result - self.__result.add_frame( + self.__result.set_frame( frame_result, self.__skip_counter, msg.header, - transform_stamped_with_euler_angle(map_to_baselink), + DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), ) - self.__result_writer.write(self.__result) + self._result_writer.write(self.__result) def get_final_result(self) -> MetricsScore: final_metric_score = self.__evaluator.get_scene_result() @@ -353,14 +248,9 @@ def get_final_result(self) -> MetricsScore: return final_metric_score -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - traffic_light_evaluator = TrafficLightEvaluator() - executor.add_node(traffic_light_evaluator) - executor.spin() - traffic_light_evaluator.destroy_node() - rclpy.shutdown() +@evaluator_main +def main(): + return TrafficLightEvaluator("traffic_light_evaluator") if __name__ == "__main__": From 1ef40d5efcc8c0bac580ed3296801f306096d3ea Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 10:49:03 +0900 Subject: [PATCH 6/7] refactor: lookup transform (#233) --- .../scripts/localization_evaluator_node.py | 27 +++---------------- .../scripts/perception_evaluator_node.py | 11 +------- .../performance_diag_evaluator_node.py | 11 +------- 3 files changed, 5 insertions(+), 44 deletions(-) diff --git a/driving_log_replayer/scripts/localization_evaluator_node.py b/driving_log_replayer/scripts/localization_evaluator_node.py index 4a2c81016..5650ad428 100755 --- a/driving_log_replayer/scripts/localization_evaluator_node.py +++ b/driving_log_replayer/scripts/localization_evaluator_node.py @@ -22,13 +22,10 @@ from diagnostic_msgs.msg import DiagnosticArray from example_interfaces.msg import Float64 from geometry_msgs.msg import PoseStamped -from geometry_msgs.msg import TransformStamped from nav_msgs.msg import Odometry import numpy as np import rclpy -from rclpy.time import Duration from rosidl_runtime_py import message_to_ordereddict -from tf2_ros import TransformException from tf_transformations import euler_from_quaternion from tier4_debug_msgs.msg import Float32Stamped from tier4_debug_msgs.msg import Int32Stamped @@ -303,13 +300,7 @@ def tp_cb(self, msg: Float32Stamped): if self.__reliability_method != "TP": # evaluates when reliability_method is TP return - try: - map_to_baselink = self._tf_buffer.lookup_transform( - "map", "base_link", msg.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.stamp) self.__result.set_frame( msg, DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), @@ -322,26 +313,14 @@ def nvtl_cb(self, msg: Float32Stamped): if self.__reliability_method != "NVTL": # evaluates when reliability_method is NVTL return - try: - map_to_baselink = self._tf_buffer.lookup_transform( - "map", "base_link", msg.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.stamp) self.__result.set_frame( msg, DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), self.__latest_tp ) self._result_writer.write(self.__result) def pose_cb(self, msg: PoseStamped): - try: - map_to_baselink = self._tf_buffer.lookup_transform( - "map", "base_link", msg.header.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.header.stamp) msg_lateral_distance = self.__result.set_frame( msg, DLREvaluator.transform_stamped_with_euler_angle(map_to_baselink), diff --git a/driving_log_replayer/scripts/perception_evaluator_node.py b/driving_log_replayer/scripts/perception_evaluator_node.py index 6a6881c18..077e76b36 100755 --- a/driving_log_replayer/scripts/perception_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_evaluator_node.py @@ -25,7 +25,6 @@ from autoware_auto_perception_msgs.msg import DetectedObjects from autoware_auto_perception_msgs.msg import TrackedObject from autoware_auto_perception_msgs.msg import TrackedObjects -from geometry_msgs.msg import TransformStamped from perception_eval.common.object import DynamicObject from perception_eval.common.schema import FrameID from perception_eval.common.shape import Shape @@ -41,10 +40,8 @@ from perception_eval.tool import PerceptionAnalyzer3D from perception_eval.util.logger_config import configure_logger import rclpy -from rclpy.time import Duration from std_msgs.msg import ColorRGBA from std_msgs.msg import Header -from tf2_ros import TransformException from visualization_msgs.msg import MarkerArray from driving_log_replayer.evaluator import DLREvaluator @@ -291,13 +288,7 @@ def list_dynamic_object_from_ros_msg( return estimated_objects def perception_cb(self, msg: Union[DetectedObjects, TrackedObjects]): - try: - map_to_baselink = self._tf_buffer.lookup_transform( - "map", "base_link", msg.header.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.header.stamp) # DetectedObjectとTrackedObjectで違う型ではあるが、estimated_objectを作る上で使用している項目は共通で保持しているので同じ関数で処理できる unix_time: int = eval_conversions.unix_time_from_ros_msg(msg.header) # 現frameに対応するGround truthを取得 diff --git a/driving_log_replayer/scripts/performance_diag_evaluator_node.py b/driving_log_replayer/scripts/performance_diag_evaluator_node.py index b0d396d09..7c76f85c0 100755 --- a/driving_log_replayer/scripts/performance_diag_evaluator_node.py +++ b/driving_log_replayer/scripts/performance_diag_evaluator_node.py @@ -25,10 +25,7 @@ from diagnostic_msgs.msg import DiagnosticStatus from example_interfaces.msg import Byte from example_interfaces.msg import Float64 -from geometry_msgs.msg import TransformStamped -from rclpy.time import Duration from std_msgs.msg import Header -from tf2_ros import TransformException from driving_log_replayer.evaluator import DLREvaluator from driving_log_replayer.evaluator import evaluator_main @@ -331,13 +328,7 @@ def diag_cb(self, msg: DiagnosticArray) -> None: if msg.header == self.__diag_header_prev: return self.__diag_header_prev = msg.header - try: - map_to_baselink = self._tf_buffer.lookup_transform( - "map", "base_link", msg.header.stamp, Duration(seconds=0.5) - ) - except TransformException as ex: - self.get_logger().info(f"Could not transform map to baselink: {ex}") - map_to_baselink = TransformStamped() + map_to_baselink = self.lookup_transform(msg.header.stamp) ( msg_visibility_value, msg_visibility_level, From 1b1e2e7ef53e946f835f0c300fc9388a7f2c70f5 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 8 Sep 2023 14:09:00 +0900 Subject: [PATCH 7/7] refactor: obstacle segmentation abc (#234) --- .../driving_log_replayer/evaluator.py | 19 +- .../driving_log_replayer/result.py | 13 +- .../obstacle_segmentation_evaluator_node.py | 259 +++++++----------- .../scripts/perception_2d_evaluator_node.py | 7 +- .../scripts/perception_evaluator_node.py | 6 +- .../scripts/traffic_light_evaluator_node.py | 6 +- 6 files changed, 126 insertions(+), 184 deletions(-) diff --git a/driving_log_replayer/driving_log_replayer/evaluator.py b/driving_log_replayer/driving_log_replayer/evaluator.py index 5b14b308c..e21364de2 100644 --- a/driving_log_replayer/driving_log_replayer/evaluator.py +++ b/driving_log_replayer/driving_log_replayer/evaluator.py @@ -14,9 +14,11 @@ from abc import ABC from abc import abstractmethod +from collections.abc import Callable import os from os.path import expandvars from pathlib import Path +from typing import Any from typing import Dict from typing import List from typing import Optional @@ -47,6 +49,7 @@ from tier4_localization_msgs.srv import PoseWithCovarianceStamped as PoseWithCovarianceStampedSrv import yaml +from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultWriter if TYPE_CHECKING: @@ -118,10 +121,17 @@ def use_t4_dataset(self) -> None: "perception_eval_log" ).as_posix() - def timer_cb(self, *, write_metrics_func=None) -> None: + def timer_cb( + self, + *, + register_loop_func: Optional[Callable] = None, + register_shutdown_func: Optional[Callable] = None, + ) -> None: self._current_time = self.get_clock().now().to_msg() # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") if self._current_time.sec > 0: + if register_loop_func is not None: + register_loop_func() if self._initial_pose is not None: self.call_initialpose_service() if self._current_time == self._prev_time: @@ -130,8 +140,8 @@ def timer_cb(self, *, write_metrics_func=None) -> None: self._clock_stop_counter = 0 self._prev_time = self._current_time if self._clock_stop_counter >= DLREvaluator.COUNT_SHUTDOWN_NODE: - if write_metrics_func is not None: - write_metrics_func() + if register_shutdown_func is not None: + register_shutdown_func() self._result_writer.close() rclpy.shutdown() @@ -203,6 +213,9 @@ def lookup_transform(self, stamp: Stamp) -> TransformStamped: self.get_logger().info(f"Could not transform map to baselink: {ex}") return TransformStamped() + def save_pkl(self, save_object: Any): + PickleWriter(self._pkl_path, save_object) + @abstractmethod def check_scenario(self) -> None: """Check self._scenario_yaml_obj and if has error shutdown.""" diff --git a/driving_log_replayer/driving_log_replayer/result.py b/driving_log_replayer/driving_log_replayer/result.py index 8339f3b45..6c7c16290 100644 --- a/driving_log_replayer/driving_log_replayer/result.py +++ b/driving_log_replayer/driving_log_replayer/result.py @@ -16,6 +16,7 @@ from abc import abstractmethod import os import pickle +from typing import Any from typing import Dict from rclpy.clock import Clock @@ -94,12 +95,6 @@ def write(self, result: ResultBase): class PickleWriter: - def __init__(self, out_pkl_path: str): - self._pickle_file = open(os.path.expandvars(out_pkl_path), "wb") # noqa - - def dump(self, write_object): - pickle.dump(write_object, self._pickle_file) - self.close() - - def close(self): - self._pickle_file.close() + def __init__(self, out_pkl_path: str, write_object: Any): + with open(os.path.expandvars(out_pkl_path), "wb") as pkl_file: + pickle.dump(write_object, pkl_file) diff --git a/driving_log_replayer/scripts/obstacle_segmentation_evaluator_node.py b/driving_log_replayer/scripts/obstacle_segmentation_evaluator_node.py index 8bdba8df6..a07f0d081 100755 --- a/driving_log_replayer/scripts/obstacle_segmentation_evaluator_node.py +++ b/driving_log_replayer/scripts/obstacle_segmentation_evaluator_node.py @@ -34,12 +34,6 @@ from perception_eval.manager import SensingEvaluationManager from perception_eval.util.logger_config import configure_logger import rclpy -from rclpy.callback_groups import MutuallyExclusiveCallbackGroup -from rclpy.clock import Clock -from rclpy.clock import ClockType -from rclpy.executors import MultiThreadedExecutor -from rclpy.node import Node -from rclpy.time import Time import ros2_numpy from rosidl_runtime_py import message_to_ordereddict from sensor_msgs.msg import PointCloud2 @@ -49,15 +43,13 @@ from tier4_api_msgs.msg import AwapiAutowareStatus from visualization_msgs.msg import Marker from visualization_msgs.msg import MarkerArray -import yaml -from driving_log_replayer.node_common import get_goal_pose_from_t4_dataset -from driving_log_replayer.node_common import transform_stamped_with_euler_angle +from driving_log_replayer.evaluator import DLREvaluator +from driving_log_replayer.evaluator import evaluator_main from driving_log_replayer.obstacle_segmentation_analyzer import default_config_path from driving_log_replayer.obstacle_segmentation_analyzer import get_graph_data import driving_log_replayer.perception_eval_conversions as eval_conversions from driving_log_replayer.result import ResultBase -from driving_log_replayer.result import ResultWriter from driving_log_replayer_analyzer.data import convert_str_to_dist_type from driving_log_replayer_msgs.msg import ObstacleSegmentationInput from driving_log_replayer_msgs.msg import ObstacleSegmentationMarker @@ -368,7 +360,7 @@ def summarize_non_detection( ) return {"Result": result, "Info": info}, ros_pcd, graph_non_detection - def add_frame( + def set_frame( self, frame: SensingFrameResult, skip: int, @@ -414,54 +406,27 @@ def add_frame( ) -class ObstacleSegmentationEvaluator(Node): - def __init__(self): - super().__init__("obstacle_segmentation_evaluator") - self.declare_parameter("scenario_path", "") - self.declare_parameter("result_json_path", "") - self.declare_parameter("t4_dataset_path", "") - self.declare_parameter("result_archive_path", "") - self.declare_parameter("vehicle_model", "") +class ObstacleSegmentationEvaluator(DLREvaluator): + COUNT_FINISH_PUB_GOAL_POSE = 5 - self.__timer_group = MutuallyExclusiveCallbackGroup() + def __init__(self, name: str): + super().__init__(name) + self.check_scenario() + self.use_t4_dataset() - scenario_path = os.path.expandvars( - self.get_parameter("scenario_path").get_parameter_value().string_value - ) - self.__scenario_yaml_obj = None - with open(scenario_path) as scenario_file: - self.__scenario_yaml_obj = yaml.safe_load(scenario_file) - self.__result_json_path = os.path.expandvars( - self.get_parameter("result_json_path").get_parameter_value().string_value - ) - self.__t4_dataset_paths = [ - os.path.expandvars( - self.get_parameter("t4_dataset_path").get_parameter_value().string_value - ) - ] + self.declare_parameter("vehicle_model", "") self.__vehicle_model = ( self.get_parameter("vehicle_model").get_parameter_value().string_value ) - self.__perception_eval_log_path = os.path.join( - os.path.dirname(self.__result_json_path), "perception_eval_log" - ) - # read setting from scenario yaml - self.__condition = self.__scenario_yaml_obj["Evaluation"]["Conditions"] - self.__result = ObstacleSegmentationResult(self.__condition) + self.__result = ObstacleSegmentationResult(self._condition) self.__goal_pose_counter = 0 - self.__goal_pose = get_goal_pose_from_t4_dataset(self.__t4_dataset_paths[0]) - self.__result_writer = ResultWriter( - self.__result_json_path, self.get_clock(), self.__condition - ) - - e_cfg = self.__scenario_yaml_obj["Evaluation"]["SensingEvaluationConfig"] - e_cfg["evaluation_config_dict"]["label_prefix"] = "autoware" # Add a fixed value setting + self.__goal_pose = DLREvaluator.get_goal_pose_from_t4_dataset(self._t4_dataset_paths[0]) evaluation_config: SensingEvaluationConfig = SensingEvaluationConfig( - dataset_paths=self.__t4_dataset_paths, + dataset_paths=self._t4_dataset_paths, frame_id="base_link", - result_root_directory=os.path.join(self.__perception_eval_log_path, "result", "{TIME}"), - evaluation_config_dict=e_cfg["evaluation_config_dict"], + result_root_directory=os.path.join(self._perception_eval_log_path, "result", "{TIME}"), + evaluation_config_dict=self.__s_cfg["evaluation_config_dict"], load_raw_data=False, ) @@ -506,42 +471,34 @@ def __init__(self): self.__pub_graph_non_detection = self.create_publisher( ObstacleSegmentationMarker, "graph/non_detection", 1 ) - - self.__current_time = Time().to_msg() - self.__prev_time = Time().to_msg() - - self.__shutdown_counter = 0 - self.__timer = self.create_timer( - 1.0, - self.timer_cb, - callback_group=self.__timer_group, - clock=Clock(clock_type=ClockType.SYSTEM_TIME), - ) # wall timer self.__skip_counter = 0 + def check_scenario(self) -> None: + try: + self.__s_cfg = self._scenario_yaml_obj["Evaluation"]["SensingEvaluationConfig"] + self.__s_cfg["evaluation_config_dict"][ + "label_prefix" + ] = "autoware" # Add a fixed value setting + except KeyError: + self.get_logger().error("Scenario format error.") + rclpy.shutdown() + def timer_cb(self): - self.__current_time = self.get_clock().now().to_msg() - # self.get_logger().error(f"time: {self.__current_time.sec}.{self.__current_time.nanosec}") - if self.__current_time.sec > 0: - self.__goal_pose_counter = self.__goal_pose_counter + 1 - if self.__goal_pose_counter <= 5: - self.__goal_pose.header.stamp = self.__current_time - self.__pub_goal_pose.publish(self.__goal_pose) - if self.__current_time == self.__prev_time: - self.__shutdown_counter += 1 - else: - self.__shutdown_counter = 0 - self.__prev_time = self.__current_time - if self.__shutdown_counter >= 5: - # self.save_pkl() - self.write_graph_data() - rclpy.shutdown() + super().timer_cb( + register_loop_func=self.publish_goal_pose, register_shutdown_func=self.write_graph_data + ) + + def publish_goal_pose(self) -> None: + if self.__goal_pose_counter < ObstacleSegmentationEvaluator.COUNT_FINISH_PUB_GOAL_POSE: + self.__goal_pose_counter += 1 + self.__goal_pose.header.stamp = self._current_time + self.__pub_goal_pose.publish(self.__goal_pose) def write_graph_data(self): # jsonlを一旦閉じて開きなおす - self.__result_writer.close() + self._result_writer.close() jsonl_file_path = Path( - os.path.splitext(os.path.expandvars(self.__result_json_path))[0] + ".jsonl" + os.path.splitext(os.path.expandvars(self._result_json_path))[0] + ".jsonl" ) # self.get_logger().error(f"jsonl file: {jsonl_file_path}") detection_dist, pointcloud_numpoints = get_graph_data( @@ -565,15 +522,6 @@ def write_graph_data(self): jsonl_file.write(str_last_line) except json.JSONDecodeError: pass - jsonl_file.close() - - def save_pkl(self): - # for debug - from driving_log_replayer.result import PickleWriter - - self.__pkl_path = os.path.join(os.path.dirname(self.__result_json_path), "frame.pkl") - self.__pickle_writer = PickleWriter(self.__pkl_path) - self.__pickle_writer.dump(self.__evaluator.frame_results) def obstacle_segmentation_input_cb(self, msg: ObstacleSegmentationInput): self.__pub_marker_non_detection.publish(msg.marker_array) @@ -585,71 +533,69 @@ def obstacle_segmentation_input_cb(self, msg: ObstacleSegmentationInput): # Ground truthがない場合はスキップされたことを記録する if ground_truth_now_frame is None: self.__skip_counter += 1 - else: - # create sensing_frame_config - frame_ok, sensing_frame_config = get_sensing_frame_config( - pcd_header, self.__scenario_yaml_obj - ) - if not frame_ok: - self.__skip_counter += 1 - return - numpy_pcd = ros2_numpy.numpify(msg.pointcloud) - pointcloud = np.zeros((numpy_pcd.shape[0], 3)) - pointcloud[:, 0] = numpy_pcd["x"] - pointcloud[:, 1] = numpy_pcd["y"] - pointcloud[:, 2] = numpy_pcd["z"] - - non_detection_areas: List[List[Tuple[float, float, float]]] = [] - for marker in msg.marker_array.markers: - non_detection_area: List[Tuple[float, float, float]] = [] - for point in marker.points: - non_detection_area.append((point.x, point.y, point.z)) - non_detection_areas.append(non_detection_area) - - frame_result: SensingFrameResult = self.__evaluator.add_frame_result( - unix_time=unix_time, - ground_truth_now_frame=ground_truth_now_frame, - pointcloud=pointcloud, - non_detection_areas=non_detection_areas, - sensing_frame_config=sensing_frame_config, - ) + return + # create sensing_frame_config + frame_ok, sensing_frame_config = get_sensing_frame_config( + pcd_header, self._scenario_yaml_obj + ) + if not frame_ok: + self.__skip_counter += 1 + return + numpy_pcd = ros2_numpy.numpify(msg.pointcloud) + pointcloud = np.zeros((numpy_pcd.shape[0], 3)) + pointcloud[:, 0] = numpy_pcd["x"] + pointcloud[:, 1] = numpy_pcd["y"] + pointcloud[:, 2] = numpy_pcd["z"] + + non_detection_areas: List[List[Tuple[float, float, float]]] = [] + for marker in msg.marker_array.markers: + non_detection_area: List[Tuple[float, float, float]] = [] + for point in marker.points: + non_detection_area.append((point.x, point.y, point.z)) + non_detection_areas.append(non_detection_area) + + frame_result: SensingFrameResult = self.__evaluator.add_frame_result( + unix_time=unix_time, + ground_truth_now_frame=ground_truth_now_frame, + pointcloud=pointcloud, + non_detection_areas=non_detection_areas, + sensing_frame_config=sensing_frame_config, + ) - # write result - ( - marker_detection, - pcd_detection, - graph_detection, - pcd_non_detection, - graph_non_detection, - ) = self.__result.add_frame( - frame_result, - self.__skip_counter, - transform_stamped_with_euler_angle(msg.map_to_baselink), - self.__latest_stop_reasons, - msg.topic_rate, - pcd_header, - ) - self.__result_writer.write(self.__result) + # write result + ( + marker_detection, + pcd_detection, + graph_detection, + pcd_non_detection, + graph_non_detection, + ) = self.__result.set_frame( + frame_result, + self.__skip_counter, + DLREvaluator.transform_stamped_with_euler_angle(msg.map_to_baselink), + self.__latest_stop_reasons, + msg.topic_rate, + pcd_header, + ) + self._result_writer.write(self.__result) - topic_rate_data = ObstacleSegmentationMarker() - topic_rate_data.header = msg.pointcloud.header - topic_rate_data.status = ( - ObstacleSegmentationMarker.OK - if msg.topic_rate - else ObstacleSegmentationMarker.ERROR - ) - self.__pub_graph_topic_rate.publish(topic_rate_data) - - if marker_detection is not None: - self.__pub_marker_detection.publish(marker_detection) - if pcd_detection is not None: - self.__pub_pcd_detection.publish(pcd_detection) - if graph_detection is not None: - self.__pub_graph_detection.publish(graph_detection) - if pcd_non_detection is not None: - self.__pub_pcd_non_detection.publish(pcd_non_detection) - if graph_non_detection is not None: - self.__pub_graph_non_detection.publish(graph_non_detection) + topic_rate_data = ObstacleSegmentationMarker() + topic_rate_data.header = msg.pointcloud.header + topic_rate_data.status = ( + ObstacleSegmentationMarker.OK if msg.topic_rate else ObstacleSegmentationMarker.ERROR + ) + self.__pub_graph_topic_rate.publish(topic_rate_data) + + if marker_detection is not None: + self.__pub_marker_detection.publish(marker_detection) + if pcd_detection is not None: + self.__pub_pcd_detection.publish(pcd_detection) + if graph_detection is not None: + self.__pub_graph_detection.publish(graph_detection) + if pcd_non_detection is not None: + self.__pub_pcd_non_detection.publish(pcd_non_detection) + if graph_non_detection is not None: + self.__pub_graph_non_detection.publish(graph_non_detection) def awapi_status_cb(self, msg: AwapiAutowareStatus): self.__latest_stop_reasons = [] @@ -660,14 +606,9 @@ def awapi_status_cb(self, msg: AwapiAutowareStatus): self.__latest_stop_reasons.append(message_to_ordereddict(msg_reason)) -def main(args=None): - rclpy.init(args=args) - executor = MultiThreadedExecutor() - obstacle_segmentation_evaluator = ObstacleSegmentationEvaluator() - executor.add_node(obstacle_segmentation_evaluator) - executor.spin() - obstacle_segmentation_evaluator.destroy_node() - rclpy.shutdown() +@evaluator_main +def main(): + return ObstacleSegmentationEvaluator("obstacle_segmentation_evaluator") if __name__ == "__main__": diff --git a/driving_log_replayer/scripts/perception_2d_evaluator_node.py b/driving_log_replayer/scripts/perception_2d_evaluator_node.py index 68016afcd..4c4080127 100755 --- a/driving_log_replayer/scripts/perception_2d_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_2d_evaluator_node.py @@ -36,7 +36,6 @@ from driving_log_replayer.evaluator import DLREvaluator from driving_log_replayer.evaluator import evaluator_main import driving_log_replayer.perception_eval_conversions as eval_conversions -from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultBase @@ -205,12 +204,10 @@ def get_topic_name(self, camera_no: int) -> str: return f"/perception/object_recognition/detection/tracked/rois{camera_no}" # tracking2d def timer_cb(self): - super().timer_cb(write_metrics_func=self.write_metrics) + super().timer_cb(register_shutdown_func=self.write_metrics) def write_metrics(self): - self.__pickle_writer = PickleWriter(self._pkl_path) - self.__pickle_writer.dump(self.__evaluator.frame_results) - + self.save_pkl(self.__evaluator.frame_results) self.get_final_result() score_dict = {} conf_mat_dict = {} diff --git a/driving_log_replayer/scripts/perception_evaluator_node.py b/driving_log_replayer/scripts/perception_evaluator_node.py index 077e76b36..3ea5c8a4c 100755 --- a/driving_log_replayer/scripts/perception_evaluator_node.py +++ b/driving_log_replayer/scripts/perception_evaluator_node.py @@ -47,7 +47,6 @@ from driving_log_replayer.evaluator import DLREvaluator from driving_log_replayer.evaluator import evaluator_main import driving_log_replayer.perception_eval_conversions as eval_conversions -from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultBase @@ -212,11 +211,10 @@ def check_evaluation_task(self) -> bool: return False def timer_cb(self): - super().timer_cb(write_metrics_func=self.write_metrics) + super().timer_cb(register_shutdown_func=self.write_metrics) def write_metrics(self): - self.__pickle_writer = PickleWriter(self._pkl_path) - self.__pickle_writer.dump(self.__evaluator.frame_results) + self.save_pkl(self.__evaluator.frame_results) if self.__evaluation_task == "fp_validation": final_metrics = self.get_fp_result() self.__result.set_final_metrics(final_metrics) diff --git a/driving_log_replayer/scripts/traffic_light_evaluator_node.py b/driving_log_replayer/scripts/traffic_light_evaluator_node.py index 9905ddb0c..46c903173 100755 --- a/driving_log_replayer/scripts/traffic_light_evaluator_node.py +++ b/driving_log_replayer/scripts/traffic_light_evaluator_node.py @@ -36,7 +36,6 @@ from driving_log_replayer.evaluator import DLREvaluator from driving_log_replayer.evaluator import evaluator_main import driving_log_replayer.perception_eval_conversions as eval_conversions -from driving_log_replayer.result import PickleWriter from driving_log_replayer.result import ResultBase @@ -173,11 +172,10 @@ def check_evaluation_task(self) -> bool: return True def timer_cb(self): - super().timer_cb(write_metrics_func=self.write_metrics) + super().timer_cb(register_shutdown_func=self.write_metrics) def write_metrics(self): - self.__pickle_writer = PickleWriter(self._pkl_path) - self.__pickle_writer.dump(self.__evaluator.frame_results) + self.save_pkl(self.__evaluator.frame_results) self.get_final_result() score_dict = {} conf_mat_dict = {}