From e8a0c0327925e4e44d7619779975c5b243558729 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Fri, 20 Oct 2023 17:12:15 +0900 Subject: [PATCH] feat: logic --- .../simulation/generate.py | 268 ++++++++---------- 1 file changed, 124 insertions(+), 144 deletions(-) diff --git a/driving_log_replayer_cli/simulation/generate.py b/driving_log_replayer_cli/simulation/generate.py index 990363f48..eeb5d175f 100644 --- a/driving_log_replayer_cli/simulation/generate.py +++ b/driving_log_replayer_cli/simulation/generate.py @@ -1,9 +1,18 @@ +from dataclasses import dataclass +from os.path import expandvars from pathlib import Path import subprocess import termcolor import yaml +@dataclass(frozen=True) +class AutowareEssentialParameter: + map_path: Path = Path("/dlr_not_exist_path") + vehicle_model: str = "" + vehicle_id: str = "default" + sensor_model: str = "" + class TestScriptGenerator: PERCEPTION_MODES = ( @@ -41,6 +50,20 @@ def __init__( def script_path(self) -> Path: return self.__script_path + @classmethod + def clean_up_cmd(cls) -> str: + # echo return value of ros2 launch (0: ok, others: ng) + # kill zombie ros2 process + # kill rviz + # sleep 1 sec + return """echo $? + pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh + pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh + pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh + pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh + sleep 1 + """ + def run(self) -> bool: if not self.__output_directory.exists(): return False @@ -49,147 +72,124 @@ def run(self) -> bool: ) for dataset_path in self.__data_directory.glob("**"): # pathlib.Path.glob("**") detect directories only - launch_command = self.__parse_scenario(dataset_path) + launch_command = self._create_launch_cmd(dataset_path) if launch_command is None: continue - # echo return value of ros2 launch (0: ok, others: ng) - generated_cmd += "echo $?\n" - # kill zombie ros2 process - generated_cmd += 'pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n' - generated_cmd += ( - 'pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n' - ) - # kill rviz awk '{ print \"kill -9\", $2 }' - generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n' - generated_cmd += ( - 'pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n' - ) - # sleep 1 sec - generated_cmd += "sleep 1\n" + generated_cmd += launch_command + generated_cmd += TestScriptGenerator.clean_up_cmd() # create shell script file - with open(self.script_path, "w") as f: + with self.script_path.open("w") as f: f.write(generated_cmd) return True - def __parse_scenario(self, dataset_path: Path) -> str | None: + def _create_launch_cmd(self, dataset_path: Path) -> str | None: scenario_output_dir = self.__output_directory.joinpath(dataset_path.name) scenario_output_dir.mkdir() + # check scenario scenario_path = dataset_path.joinpath("scenario.yaml") if not scenario_path.exists(): scenario_path = scenario_path.joinpath("scenario.yml") - if not scenario_path.exists(): - termcolor.cprint( - scenario_path.as_posix() + " does not exist.", - "red", - ) - return None + if not scenario_path.exists(): + termcolor.cprint( + scenario_path.as_posix() + " does not exist.", + "red", + ) + return None with scenario_path.open("r") as scenario_file: scenario_yaml_obj = yaml.safe_load(scenario_file) - if scenario_yaml_obj["Evaluation"]["UseCaseName"] in TestScriptGenerator.USE_T4_DATASET: - return self.__create_launch_command_with_t4_dataset( - dataset_path, - scenario_path.name, - scenario_output_dir, - scenario_yaml_obj, - ) - return self.__create_launch_command( - dataset_path, - scenario_path.name, - scenario_output_dir, + if scenario_yaml_obj["Evaluation"]["UseCaseName"] in TestScriptGenerator.USE_T4_DATASET: + return self._cmd_use_t4_dataset( + scenario_path, scenario_yaml_obj, ) + return self._cmd_use_bag_only( + scenario_path, + scenario_yaml_obj, + ) - def __create_launch_command( + def _create_common_arg( self, - scenario_root: Path, - scenario_name: str, - scenario_output_dir: Path, + optional_arg: str, + scenario_path: Path, + output_path: Path, + input_bag: Path, + essential_param: AutowareEssentialParameter, + ) -> str: + return f"""{optional_arg}\ + scenario_path:={scenario_path.as_posix()} \ + result_json_path:={output_path.joinpath("result.json").as_posix()} \ + play_rate:={self.__rate} \ + play_delay:={self.__delay} \ + input_bag:={input_bag.as_posix()} \ + result_bag_path:={output_path.joinpath("result_bag").as_posix()} \ + map_path:={essential_param.map_path} \ + vehicle_model:={essential_param.vehicle_model} \ + sensor_model:={essential_param.sensor_model} \ + vehicle_id:={essential_param.vehicle_id} \ + rviz:=true + """ + + def _cmd_use_bag_only( + self, + scenario_path: Path, scenario_yaml_obj: dict, ) -> str | None: - map_path = Path(scenario_yaml_obj.get("LocalMapPath", "/dlr_not_exist_dir")) + dataset_path = scenario_path.parent + # check resource + map_path = Path(expandvars(scenario_yaml_obj.get("LocalMapPath", "/dlr_not_exist_path"))) if not map_path.exists(): termcolor.cprint( - f"map: {map_path.as_posix()} used in scenario: {scenario_root.as_posix()} does not exist ", + f"map: {map_path.as_posix()} used in scenario: {dataset_path.as_posix()} does not exist ", "red", ) return None - input_bag = scenario_root.joinpath("input_bag") + input_bag = dataset_path.joinpath("input_bag") if not input_bag.exists(): termcolor.cprint(f"input_bag file {input_bag.as_posix()} does not exist.", "red") return None + output_path = self.__output_directory.joinpath(dataset_path.name) use_case_name = scenario_yaml_obj["Evaluation"]["UseCaseName"] - launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py" - # evaluation component args - launch_args = " scenario_path:=" + os.path.join(scenario_root, scenario_name) - launch_args += " result_json_path:=" + os.path.join(scenario_output_dir, "result.json") - - # ros2 bag play args - launch_args += " play_rate:=" + str(self.__rate) - launch_args += " play_delay:=" + str(self.__delay) - launch_args += " input_bag:=" + input_bag.as_posix() - - # ros2 bag record args - result_bag_path = os.path.join(scenario_output_dir, "result_bag") - launch_args += " result_bag_path:=" + result_bag_path - - # logging_simulator.launch args - launch_args += " map_path:=" + map_path - launch_args += " vehicle_model:=" + scenario_yaml_obj["VehicleModel"] - launch_args += " sensor_model:=" + scenario_yaml_obj["SensorModel"] - launch_args += " vehicle_id:=" + scenario_yaml_obj["VehicleId"] - launch_args += " rviz:=true" - # diag launch localization - if scenario_yaml_obj["Evaluation"]["UseCaseName"] == "performance_diag": - launch_args += " localization:=" + str( - scenario_yaml_obj["Evaluation"]["LaunchLocalization"], - ) - if scenario_yaml_obj["Evaluation"]["UseCaseName"] == "obstacle_detection": - launch_planning = scenario_yaml_obj["Evaluation"].get("LaunchPlanning", False) - launch_args += " planning:=" + str(launch_planning) - target_pointcloud = scenario_yaml_obj["Evaluation"]["Conditions"].get( - "TargetPointCloud", - "/perception/obstacle_segmentation/pointcloud", - ) - launch_args += " target_pointcloud:=" + target_pointcloud - launch_command = launch_base_command + launch_args - if scenario_yaml_obj["Evaluation"]["UseCaseName"] == "ndt_convergence": - # I/O sync for rosbag save - launch_command += "\nsync\n" - # add ndt_convergence launch command - ndt_launch_base_command = ( - "ros2 launch ndt_convergence_evaluation ndt_convergence_evaluation.launch.py" - ) - ndt_launch_args = " rosbag_file_name:=" + result_bag_path - ndt_launch_args += " map_path:=" + map_path - ndt_launch_args += " save_dir:=" + os.path.join(scenario_output_dir, "result_archive") - launch_command += ndt_launch_base_command + ndt_launch_args + launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py " + optional_arg = "" + launch_localization = scenario_yaml_obj["Evaluation"].get("LaunchLocalization") + if launch_localization is not None: + optional_arg = f"localization:={launch_localization} " + + launch_command = launch_base_command + self._create_common_arg( + optional_arg, + scenario_path, + output_path, + input_bag, + AutowareEssentialParameter( + map_path, + scenario_yaml_obj["VehicleModel"], + scenario_yaml_obj["VehicleId"], + scenario_yaml_obj["SensorModel"], + ), + ) print("launch command generated! => " + launch_command) # noqa return launch_command - def __create_launch_command_with_t4_dataset( # noqa TODO: fix logic + def _cmd_use_t4_dataset( self, - scenario_root: str, - scenario_name: str, - scenario_output_dir: str, + scenario_path: Path, scenario_yaml_obj: dict, ) -> str | None: + dataset_path = scenario_path.parent launch_command_for_all_dataset = "" - scenario_path = os.path.join(scenario_root, scenario_name) - t4_dataset_base_path = os.path.join(scenario_root, "t4_dataset") + t4_dataset_base_path = dataset_path.joinpath("t4_dataset") t4_datasets = scenario_yaml_obj["Evaluation"]["Datasets"] is_database_evaluation = bool(len(t4_datasets) > 1) for dataset in t4_datasets: # get dataset_id key = next(iter(dataset)) # create sub directory for the dataset - output_dir_per_dataset = os.path.join(scenario_output_dir, key) - os.makedirs(output_dir_per_dataset) + output_dir_per_dataset = self.__output_directory.joinpath(dataset_path.name, key) + output_dir_per_dataset.mkdir() vehicle_id = dataset[key].get("VehicleId", "") - map_path = os.path.expandvars(dataset[key].get("LocalMapPath", "")) - t4_dataset_path = os.path.join(t4_dataset_base_path, key) - launch_sensing = str(dataset[key].get("LaunchSensing", True)) - + map_path = Path(expandvars(dataset[key].get("LocalMapPath", "/dlr_not_exist_path"))) + t4_dataset_path = t4_dataset_base_path.joinpath(key) if vehicle_id == "": termcolor.cprint( f"vehicle_id is not defined in dataset: {key}", @@ -197,18 +197,14 @@ def __create_launch_command_with_t4_dataset( # noqa TODO: fix logic ) continue - if map_path != "": - if not os.path.exists(map_path): - termcolor.cprint( - "map: " + map_path + " used in dataset: " + key + " does not exist ", - "red", - ) - continue - else: - termcolor.cprint(f"LocalMapPath is not described in {key}.", "red") + if not map_path.exists(): + termcolor.cprint( + "map: " + map_path.as_posix() + " used in dataset: " + key + " does not exist ", + "red", + ) continue - if not os.path.exists(t4_dataset_path): + if not t4_dataset_path.exists(): termcolor.cprint( f"t4_dataset: {key} does not exist", "red", @@ -217,57 +213,41 @@ def __create_launch_command_with_t4_dataset( # noqa TODO: fix logic use_case_name = scenario_yaml_obj["Evaluation"]["UseCaseName"] launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py" - # evaluation component args - - launch_args = " scenario_path:=" + scenario_path - launch_args += " result_json_path:=" + os.path.join( - output_dir_per_dataset, - "result.json", - ) - - # ros2 bag play args - launch_args += " play_rate:=" + str(self.__rate) - launch_args += " play_delay:=" + str(self.__delay) - launch_args += " input_bag:=" + os.path.join(t4_dataset_path, "input_bag") - - # ros2 bag record args - result_bag_path = os.path.join(output_dir_per_dataset, "result_bag") - launch_args += " result_bag_path:=" + result_bag_path - - # logging_simulator.launch args - launch_args += " map_path:=" + map_path - launch_args += " vehicle_model:=" + scenario_yaml_obj["VehicleModel"] - launch_args += " sensor_model:=" + scenario_yaml_obj["SensorModel"] - launch_args += " vehicle_id:=" + vehicle_id - launch_args += " rviz:=true" - if scenario_yaml_obj.get("PerceptionMode") is not None: - perception_mode: str = scenario_yaml_obj["PerceptionMode"] + optional_arg = "" + perception_mode: str | None = scenario_yaml_obj.get("PerceptionMode") + if perception_mode is not None: assert perception_mode in self.PERCEPTION_MODES, ( f"perception_mode must be chosen from {self.PERCEPTION_MODES}, " f"but got {perception_mode}" ) - launch_args += " perception_mode:=" + perception_mode - - # t4_dataset - launch_args += " t4_dataset_path:=" + t4_dataset_path - launch_args += " result_archive_path:=" + os.path.join( + optional_arg = f"perception_mode:={perception_mode} " + launch_args = self._create_common_arg( + optional_arg, + scenario_path, output_dir_per_dataset, - "result_archive", + t4_dataset_path.joinpath("input_bag"), + AutowareEssentialParameter( + map_path, + scenario_yaml_obj["VehicleModel"], + vehicle_id, + scenario_yaml_obj["SensorModel"], + ), ) - launch_args += " sensing:=" + launch_sensing + # t4_dataset + launch_args += f" t4_dataset_path:={t4_dataset_path}" + launch_args += f" result_archive_path:={output_dir_per_dataset.joinpath("result_archive")}" + launch_args += f" sensing:={dataset[key].get('LaunchSensing', True)}" launch_command = launch_base_command + launch_args + "\n" launch_command_for_all_dataset += launch_command if is_database_evaluation: - database_result_script_path = os.path.join( - self.__autoware_path, + database_result_script_path = self.__autoware_path.joinpath( "install", "driving_log_replayer", "lib", "driving_log_replayer", "perception_database_result.py", ) - database_result_command = f"python3 {database_result_script_path}" - database_result_command += f" -s {scenario_path} -r {scenario_output_dir}\n" + database_result_command = f"python3 {database_result_script_path.as_posix()} -s {scenario_path} -r {self.__output_directory.joinpath(dataset_path.name)}\n" launch_command_for_all_dataset += database_result_command print("launch command generated! => " + launch_command_for_all_dataset) # noqa return launch_command_for_all_dataset