From 2aea909c269cc46f33f770f99d66d24b89bf7b81 Mon Sep 17 00:00:00 2001 From: Hayato Mizushima Date: Thu, 19 Oct 2023 19:01:09 +0900 Subject: [PATCH] refactor: generate.py --- .../simulation/generate.py | 145 +++++++----------- 1 file changed, 57 insertions(+), 88 deletions(-) diff --git a/driving_log_replayer_cli/simulation/generate.py b/driving_log_replayer_cli/simulation/generate.py index 04ca7217b..990363f48 100644 --- a/driving_log_replayer_cli/simulation/generate.py +++ b/driving_log_replayer_cli/simulation/generate.py @@ -1,9 +1,6 @@ -import glob -import os from pathlib import Path import subprocess -from natsort import natsorted import termcolor import yaml @@ -17,6 +14,8 @@ class TestScriptGenerator: "radar", ) + USE_T4_DATASET = ("perception", "obstacle_segmentation", "perception_2d", "traffic_light") + def __init__( self, data_directory: str, @@ -25,61 +24,59 @@ def __init__( rate: float, delay: float, ) -> None: - self.__data_directory = data_directory - self.__output_directory = output_directory - self.__autoware_path = autoware_path - self.__script_path = os.path.join(self.__output_directory, "run.bash") + self.__data_directory = Path(data_directory) + self.__output_directory = Path(output_directory) + self.__autoware_path = Path(autoware_path) + self.__script_path = self.__output_directory.joinpath("run.bash") self.__rate = rate self.__delay = delay # os.path.join(config.output_directory, datetime.datetime.now().strftime("%Y-%m%d-%H%M%S"))が渡ってくるので被ることはない - os.makedirs(self.__output_directory) + self.__output_directory.mkdir() - symlink_dst = Path(self.__output_directory).parent.joinpath("latest").as_posix() - update_latest_dir = f"ln -snf {self.__output_directory} {symlink_dst}" - subprocess.run(update_latest_dir, shell=True) + symlink_dst = self.__output_directory.parent.joinpath("latest").as_posix() + update_symlink = f"ln -snf {self.__output_directory.as_posix()} {symlink_dst}" + subprocess.run(update_symlink, shell=True) @property - def script_path(self) -> str: + def script_path(self) -> Path: return self.__script_path def run(self) -> bool: - return self.__create_script() - - def __create_script(self) -> bool: - if Path(self.__output_directory).exists(): - generated_cmd = "" - if self.__autoware_path != "": - setup_bash = Path(self.__autoware_path).joinpath("install", "setup.bash").as_posix() - generated_cmd += f"source {setup_bash}\n" - regex = os.path.join(self.__data_directory, "*") - dataset_paths = glob.glob(regex) - for dataset_path in natsorted(dataset_paths): - if Path(dataset_path).is_dir(): - command = self.__parse_scenario(dataset_path) - if command is not None: - generated_cmd += command + "\n" - # echo return value of ros2 launch (0: ok, others: ng) - generated_cmd += "echo $?\n" - # kill zombie ros2 process - generated_cmd += 'pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n' - generated_cmd += 'pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n' - # kill rviz awk '{ print \"kill -9\", $2 }' - generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n' - generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n' - # sleep 1 sec - generated_cmd += "sleep 1\n" - with open(self.script_path, "w") as f: - f.write(generated_cmd) - return True - return False + if not self.__output_directory.exists(): + return False + generated_cmd = ( + f"source {self.__autoware_path.joinpath('install', 'setup.bash').as_posix()}\n" + ) + for dataset_path in self.__data_directory.glob("**"): + # pathlib.Path.glob("**") detect directories only + launch_command = self.__parse_scenario(dataset_path) + if launch_command is None: + continue + # echo return value of ros2 launch (0: ok, others: ng) + generated_cmd += "echo $?\n" + # kill zombie ros2 process + generated_cmd += 'pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n' + generated_cmd += ( + 'pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n' + ) + # kill rviz awk '{ print \"kill -9\", $2 }' + generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n' + generated_cmd += ( + 'pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n' + ) + # sleep 1 sec + generated_cmd += "sleep 1\n" + # create shell script file + with open(self.script_path, "w") as f: + f.write(generated_cmd) + return True - def __parse_scenario(self, scenario_directory: str) -> str | None: - scenario_root = Path(scenario_directory) - scenario_output_dir = os.path.join(self.__output_directory, scenario_root.name) - os.makedirs(scenario_output_dir) - scenario_path = scenario_root.joinpath("scenario.yaml") + def __parse_scenario(self, dataset_path: Path) -> str | None: + scenario_output_dir = self.__output_directory.joinpath(dataset_path.name) + scenario_output_dir.mkdir() + scenario_path = dataset_path.joinpath("scenario.yaml") if not scenario_path.exists(): - scenario_path = scenario_root.joinpath("scenario.yml") + scenario_path = scenario_path.joinpath("scenario.yml") if not scenario_path.exists(): termcolor.cprint( scenario_path.as_posix() + " does not exist.", @@ -87,22 +84,17 @@ def __parse_scenario(self, scenario_directory: str) -> str | None: ) return None - with open(scenario_path) as scenario_file: + with scenario_path.open("r") as scenario_file: scenario_yaml_obj = yaml.safe_load(scenario_file) - if scenario_yaml_obj["Evaluation"]["UseCaseName"] in [ - "perception", - "obstacle_segmentation", - "perception_2d", - "traffic_light", - ]: + if scenario_yaml_obj["Evaluation"]["UseCaseName"] in TestScriptGenerator.USE_T4_DATASET: return self.__create_launch_command_with_t4_dataset( - scenario_root.as_posix(), + dataset_path, scenario_path.name, scenario_output_dir, scenario_yaml_obj, ) return self.__create_launch_command( - scenario_root.as_posix(), + dataset_path, scenario_path.name, scenario_output_dir, scenario_yaml_obj, @@ -110,36 +102,22 @@ def __parse_scenario(self, scenario_directory: str) -> str | None: def __create_launch_command( self, - scenario_root: str, + scenario_root: Path, scenario_name: str, - scenario_output_dir: str, + scenario_output_dir: Path, scenario_yaml_obj: dict, ) -> str | None: - map_path = "" - if "LocalMapPath" in scenario_yaml_obj: - map_path = os.path.expandvars(scenario_yaml_obj["LocalMapPath"]) - if not os.path.exists(map_path): - termcolor.cprint( - "map: " + map_path + " used in scenario: " + scenario_root + " does not exist ", - "red", - ) - return None - else: - termcolor.cprint("LocalMapPath is not described in your Scenario.", "red") - return None - need_annotation = scenario_yaml_obj["Evaluation"]["UseCaseName"] == "obstacle_detection" - input_bag = Path(scenario_root).joinpath("input_bag") - if not input_bag.exists(): - termcolor.cprint("input_bag file" + input_bag.as_posix() + " does not exist.", "red") - return None - annotation_bag = Path(scenario_root).joinpath("annotation_bag") - if need_annotation and not annotation_bag.exists(): + map_path = Path(scenario_yaml_obj.get("LocalMapPath", "/dlr_not_exist_dir")) + if not map_path.exists(): termcolor.cprint( - "annotation_bag file" + annotation_bag.as_posix() + " does not exist.", + f"map: {map_path.as_posix()} used in scenario: {scenario_root.as_posix()} does not exist ", "red", ) return None - + input_bag = scenario_root.joinpath("input_bag") + if not input_bag.exists(): + termcolor.cprint(f"input_bag file {input_bag.as_posix()} does not exist.", "red") + return None use_case_name = scenario_yaml_obj["Evaluation"]["UseCaseName"] launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py" # evaluation component args @@ -155,15 +133,6 @@ def __create_launch_command( result_bag_path = os.path.join(scenario_output_dir, "result_bag") launch_args += " result_bag_path:=" + result_bag_path - if need_annotation: - launch_args += " annotation_path:=" + os.path.join(scenario_root, "Annotations") - # annotation bag play args - launch_args += " annotation_bag:=" + annotation_bag.as_posix() - if "RecordRate" in scenario_yaml_obj["Annotation"]: - launch_args += " annotation_record_rate:=" + str( - scenario_yaml_obj["Annotation"]["RecordRate"], - ) - # logging_simulator.launch args launch_args += " map_path:=" + map_path launch_args += " vehicle_model:=" + scenario_yaml_obj["VehicleModel"]