Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
refactor: generate.py
Browse files Browse the repository at this point in the history
  • Loading branch information
hayato-m126 committed Oct 19, 2023
1 parent 73c59f5 commit 2aea909
Showing 1 changed file with 57 additions and 88 deletions.
145 changes: 57 additions & 88 deletions driving_log_replayer_cli/simulation/generate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import glob
import os
from pathlib import Path
import subprocess

from natsort import natsorted
import termcolor
import yaml

Expand All @@ -17,6 +14,8 @@ class TestScriptGenerator:
"radar",
)

USE_T4_DATASET = ("perception", "obstacle_segmentation", "perception_2d", "traffic_light")

def __init__(
self,
data_directory: str,
Expand All @@ -25,121 +24,100 @@ def __init__(
rate: float,
delay: float,
) -> None:
self.__data_directory = data_directory
self.__output_directory = output_directory
self.__autoware_path = autoware_path
self.__script_path = os.path.join(self.__output_directory, "run.bash")
self.__data_directory = Path(data_directory)
self.__output_directory = Path(output_directory)
self.__autoware_path = Path(autoware_path)
self.__script_path = self.__output_directory.joinpath("run.bash")
self.__rate = rate
self.__delay = delay
# os.path.join(config.output_directory, datetime.datetime.now().strftime("%Y-%m%d-%H%M%S"))が渡ってくるので被ることはない
os.makedirs(self.__output_directory)
self.__output_directory.mkdir()

symlink_dst = Path(self.__output_directory).parent.joinpath("latest").as_posix()
update_latest_dir = f"ln -snf {self.__output_directory} {symlink_dst}"
subprocess.run(update_latest_dir, shell=True)
symlink_dst = self.__output_directory.parent.joinpath("latest").as_posix()
update_symlink = f"ln -snf {self.__output_directory.as_posix()} {symlink_dst}"
subprocess.run(update_symlink, shell=True)

@property
def script_path(self) -> str:
def script_path(self) -> Path:
return self.__script_path

def run(self) -> bool:
return self.__create_script()

def __create_script(self) -> bool:
if Path(self.__output_directory).exists():
generated_cmd = ""
if self.__autoware_path != "":
setup_bash = Path(self.__autoware_path).joinpath("install", "setup.bash").as_posix()
generated_cmd += f"source {setup_bash}\n"
regex = os.path.join(self.__data_directory, "*")
dataset_paths = glob.glob(regex)
for dataset_path in natsorted(dataset_paths):
if Path(dataset_path).is_dir():
command = self.__parse_scenario(dataset_path)
if command is not None:
generated_cmd += command + "\n"
# echo return value of ros2 launch (0: ok, others: ng)
generated_cmd += "echo $?\n"
# kill zombie ros2 process
generated_cmd += 'pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n'
generated_cmd += 'pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n'
# kill rviz awk '{ print \"kill -9\", $2 }'
generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n'
generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n'
# sleep 1 sec
generated_cmd += "sleep 1\n"
with open(self.script_path, "w") as f:
f.write(generated_cmd)
return True
return False
if not self.__output_directory.exists():
return False
generated_cmd = (
f"source {self.__autoware_path.joinpath('install', 'setup.bash').as_posix()}\n"
)
for dataset_path in self.__data_directory.glob("**"):
# pathlib.Path.glob("**") detect directories only
launch_command = self.__parse_scenario(dataset_path)
if launch_command is None:
continue
# echo return value of ros2 launch (0: ok, others: ng)
generated_cmd += "echo $?\n"
# kill zombie ros2 process
generated_cmd += 'pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n'
generated_cmd += (
'pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n'
)
# kill rviz awk '{ print \"kill -9\", $2 }'
generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n'
generated_cmd += (
'pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n'
)
# sleep 1 sec
generated_cmd += "sleep 1\n"
# create shell script file
with open(self.script_path, "w") as f:
f.write(generated_cmd)
return True

def __parse_scenario(self, scenario_directory: str) -> str | None:
scenario_root = Path(scenario_directory)
scenario_output_dir = os.path.join(self.__output_directory, scenario_root.name)
os.makedirs(scenario_output_dir)
scenario_path = scenario_root.joinpath("scenario.yaml")
def __parse_scenario(self, dataset_path: Path) -> str | None:
scenario_output_dir = self.__output_directory.joinpath(dataset_path.name)
scenario_output_dir.mkdir()
scenario_path = dataset_path.joinpath("scenario.yaml")
if not scenario_path.exists():
scenario_path = scenario_root.joinpath("scenario.yml")
scenario_path = scenario_path.joinpath("scenario.yml")
if not scenario_path.exists():
termcolor.cprint(
scenario_path.as_posix() + " does not exist.",
"red",
)
return None

with open(scenario_path) as scenario_file:
with scenario_path.open("r") as scenario_file:
scenario_yaml_obj = yaml.safe_load(scenario_file)
if scenario_yaml_obj["Evaluation"]["UseCaseName"] in [
"perception",
"obstacle_segmentation",
"perception_2d",
"traffic_light",
]:
if scenario_yaml_obj["Evaluation"]["UseCaseName"] in TestScriptGenerator.USE_T4_DATASET:
return self.__create_launch_command_with_t4_dataset(
scenario_root.as_posix(),
dataset_path,
scenario_path.name,
scenario_output_dir,
scenario_yaml_obj,
)
return self.__create_launch_command(
scenario_root.as_posix(),
dataset_path,
scenario_path.name,
scenario_output_dir,
scenario_yaml_obj,
)

def __create_launch_command(
self,
scenario_root: str,
scenario_root: Path,
scenario_name: str,
scenario_output_dir: str,
scenario_output_dir: Path,
scenario_yaml_obj: dict,
) -> str | None:
map_path = ""
if "LocalMapPath" in scenario_yaml_obj:
map_path = os.path.expandvars(scenario_yaml_obj["LocalMapPath"])
if not os.path.exists(map_path):
termcolor.cprint(
"map: " + map_path + " used in scenario: " + scenario_root + " does not exist ",
"red",
)
return None
else:
termcolor.cprint("LocalMapPath is not described in your Scenario.", "red")
return None
need_annotation = scenario_yaml_obj["Evaluation"]["UseCaseName"] == "obstacle_detection"
input_bag = Path(scenario_root).joinpath("input_bag")
if not input_bag.exists():
termcolor.cprint("input_bag file" + input_bag.as_posix() + " does not exist.", "red")
return None
annotation_bag = Path(scenario_root).joinpath("annotation_bag")
if need_annotation and not annotation_bag.exists():
map_path = Path(scenario_yaml_obj.get("LocalMapPath", "/dlr_not_exist_dir"))
if not map_path.exists():
termcolor.cprint(
"annotation_bag file" + annotation_bag.as_posix() + " does not exist.",
f"map: {map_path.as_posix()} used in scenario: {scenario_root.as_posix()} does not exist ",
"red",
)
return None

input_bag = scenario_root.joinpath("input_bag")
if not input_bag.exists():
termcolor.cprint(f"input_bag file {input_bag.as_posix()} does not exist.", "red")
return None
use_case_name = scenario_yaml_obj["Evaluation"]["UseCaseName"]
launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py"
# evaluation component args
Expand All @@ -155,15 +133,6 @@ def __create_launch_command(
result_bag_path = os.path.join(scenario_output_dir, "result_bag")
launch_args += " result_bag_path:=" + result_bag_path

if need_annotation:
launch_args += " annotation_path:=" + os.path.join(scenario_root, "Annotations")
# annotation bag play args
launch_args += " annotation_bag:=" + annotation_bag.as_posix()
if "RecordRate" in scenario_yaml_obj["Annotation"]:
launch_args += " annotation_record_rate:=" + str(
scenario_yaml_obj["Annotation"]["RecordRate"],
)

# logging_simulator.launch args
launch_args += " map_path:=" + map_path
launch_args += " vehicle_model:=" + scenario_yaml_obj["VehicleModel"]
Expand Down

0 comments on commit 2aea909

Please sign in to comment.