Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feat: logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hayato-m126 committed Oct 20, 2023
1 parent 2aea909 commit e8a0c03
Showing 1 changed file with 124 additions and 144 deletions.
268 changes: 124 additions & 144 deletions driving_log_replayer_cli/simulation/generate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
from dataclasses import dataclass
from os.path import expandvars
from pathlib import Path
import subprocess

import termcolor
import yaml

@dataclass(frozen=True)
class AutowareEssentialParameter:
map_path: Path = Path("/dlr_not_exist_path")
vehicle_model: str = ""
vehicle_id: str = "default"
sensor_model: str = ""


class TestScriptGenerator:
PERCEPTION_MODES = (
Expand Down Expand Up @@ -41,6 +50,20 @@ def __init__(
def script_path(self) -> Path:
return self.__script_path

@classmethod
def clean_up_cmd(cls) -> str:
# echo return value of ros2 launch (0: ok, others: ng)
# kill zombie ros2 process
# kill rviz
# sleep 1 sec
return """echo $?
pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh
pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh
pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh
pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh
sleep 1
"""

def run(self) -> bool:
if not self.__output_directory.exists():
return False
Expand All @@ -49,166 +72,139 @@ def run(self) -> bool:
)
for dataset_path in self.__data_directory.glob("**"):
# pathlib.Path.glob("**") detect directories only
launch_command = self.__parse_scenario(dataset_path)
launch_command = self._create_launch_cmd(dataset_path)
if launch_command is None:
continue
# echo return value of ros2 launch (0: ok, others: ng)
generated_cmd += "echo $?\n"
# kill zombie ros2 process
generated_cmd += 'pgrep ros | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n'
generated_cmd += (
'pgrep ros | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n'
)
# kill rviz awk '{ print \"kill -9\", $2 }'
generated_cmd += 'pgrep rviz | awk \'{ print "kill -9 $(pgrep -P ", $1, ") > /dev/null 2>&1" }\' | sh\n'
generated_cmd += (
'pgrep rviz | awk \'{ print "kill -9 ", $1, " > /dev/null 2>&1" }\' | sh\n'
)
# sleep 1 sec
generated_cmd += "sleep 1\n"
generated_cmd += launch_command
generated_cmd += TestScriptGenerator.clean_up_cmd()
# create shell script file
with open(self.script_path, "w") as f:
with self.script_path.open("w") as f:
f.write(generated_cmd)
return True

def __parse_scenario(self, dataset_path: Path) -> str | None:
def _create_launch_cmd(self, dataset_path: Path) -> str | None:
scenario_output_dir = self.__output_directory.joinpath(dataset_path.name)
scenario_output_dir.mkdir()
# check scenario
scenario_path = dataset_path.joinpath("scenario.yaml")
if not scenario_path.exists():
scenario_path = scenario_path.joinpath("scenario.yml")
if not scenario_path.exists():
termcolor.cprint(
scenario_path.as_posix() + " does not exist.",
"red",
)
return None
if not scenario_path.exists():
termcolor.cprint(
scenario_path.as_posix() + " does not exist.",
"red",
)
return None

with scenario_path.open("r") as scenario_file:
scenario_yaml_obj = yaml.safe_load(scenario_file)
if scenario_yaml_obj["Evaluation"]["UseCaseName"] in TestScriptGenerator.USE_T4_DATASET:
return self.__create_launch_command_with_t4_dataset(
dataset_path,
scenario_path.name,
scenario_output_dir,
scenario_yaml_obj,
)
return self.__create_launch_command(
dataset_path,
scenario_path.name,
scenario_output_dir,
if scenario_yaml_obj["Evaluation"]["UseCaseName"] in TestScriptGenerator.USE_T4_DATASET:
return self._cmd_use_t4_dataset(
scenario_path,
scenario_yaml_obj,
)
return self._cmd_use_bag_only(
scenario_path,
scenario_yaml_obj,
)

def __create_launch_command(
def _create_common_arg(
self,
scenario_root: Path,
scenario_name: str,
scenario_output_dir: Path,
optional_arg: str,
scenario_path: Path,
output_path: Path,
input_bag: Path,
essential_param: AutowareEssentialParameter,
) -> str:
return f"""{optional_arg}\
scenario_path:={scenario_path.as_posix()} \
result_json_path:={output_path.joinpath("result.json").as_posix()} \
play_rate:={self.__rate} \
play_delay:={self.__delay} \
input_bag:={input_bag.as_posix()} \
result_bag_path:={output_path.joinpath("result_bag").as_posix()} \
map_path:={essential_param.map_path} \
vehicle_model:={essential_param.vehicle_model} \
sensor_model:={essential_param.sensor_model} \
vehicle_id:={essential_param.vehicle_id} \
rviz:=true
"""

def _cmd_use_bag_only(
self,
scenario_path: Path,
scenario_yaml_obj: dict,
) -> str | None:
map_path = Path(scenario_yaml_obj.get("LocalMapPath", "/dlr_not_exist_dir"))
dataset_path = scenario_path.parent
# check resource
map_path = Path(expandvars(scenario_yaml_obj.get("LocalMapPath", "/dlr_not_exist_path")))
if not map_path.exists():
termcolor.cprint(
f"map: {map_path.as_posix()} used in scenario: {scenario_root.as_posix()} does not exist ",
f"map: {map_path.as_posix()} used in scenario: {dataset_path.as_posix()} does not exist ",
"red",
)
return None
input_bag = scenario_root.joinpath("input_bag")
input_bag = dataset_path.joinpath("input_bag")
if not input_bag.exists():
termcolor.cprint(f"input_bag file {input_bag.as_posix()} does not exist.", "red")
return None
output_path = self.__output_directory.joinpath(dataset_path.name)
use_case_name = scenario_yaml_obj["Evaluation"]["UseCaseName"]
launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py"
# evaluation component args
launch_args = " scenario_path:=" + os.path.join(scenario_root, scenario_name)
launch_args += " result_json_path:=" + os.path.join(scenario_output_dir, "result.json")

# ros2 bag play args
launch_args += " play_rate:=" + str(self.__rate)
launch_args += " play_delay:=" + str(self.__delay)
launch_args += " input_bag:=" + input_bag.as_posix()

# ros2 bag record args
result_bag_path = os.path.join(scenario_output_dir, "result_bag")
launch_args += " result_bag_path:=" + result_bag_path

# logging_simulator.launch args
launch_args += " map_path:=" + map_path
launch_args += " vehicle_model:=" + scenario_yaml_obj["VehicleModel"]
launch_args += " sensor_model:=" + scenario_yaml_obj["SensorModel"]
launch_args += " vehicle_id:=" + scenario_yaml_obj["VehicleId"]
launch_args += " rviz:=true"
# diag launch localization
if scenario_yaml_obj["Evaluation"]["UseCaseName"] == "performance_diag":
launch_args += " localization:=" + str(
scenario_yaml_obj["Evaluation"]["LaunchLocalization"],
)
if scenario_yaml_obj["Evaluation"]["UseCaseName"] == "obstacle_detection":
launch_planning = scenario_yaml_obj["Evaluation"].get("LaunchPlanning", False)
launch_args += " planning:=" + str(launch_planning)
target_pointcloud = scenario_yaml_obj["Evaluation"]["Conditions"].get(
"TargetPointCloud",
"/perception/obstacle_segmentation/pointcloud",
)
launch_args += " target_pointcloud:=" + target_pointcloud
launch_command = launch_base_command + launch_args
if scenario_yaml_obj["Evaluation"]["UseCaseName"] == "ndt_convergence":
# I/O sync for rosbag save
launch_command += "\nsync\n"
# add ndt_convergence launch command
ndt_launch_base_command = (
"ros2 launch ndt_convergence_evaluation ndt_convergence_evaluation.launch.py"
)
ndt_launch_args = " rosbag_file_name:=" + result_bag_path
ndt_launch_args += " map_path:=" + map_path
ndt_launch_args += " save_dir:=" + os.path.join(scenario_output_dir, "result_archive")
launch_command += ndt_launch_base_command + ndt_launch_args
launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py "
optional_arg = ""
launch_localization = scenario_yaml_obj["Evaluation"].get("LaunchLocalization")
if launch_localization is not None:
optional_arg = f"localization:={launch_localization} "

launch_command = launch_base_command + self._create_common_arg(
optional_arg,
scenario_path,
output_path,
input_bag,
AutowareEssentialParameter(
map_path,
scenario_yaml_obj["VehicleModel"],
scenario_yaml_obj["VehicleId"],
scenario_yaml_obj["SensorModel"],
),
)
print("launch command generated! => " + launch_command) # noqa
return launch_command

def __create_launch_command_with_t4_dataset( # noqa TODO: fix logic
def _cmd_use_t4_dataset(
self,
scenario_root: str,
scenario_name: str,
scenario_output_dir: str,
scenario_path: Path,
scenario_yaml_obj: dict,
) -> str | None:
dataset_path = scenario_path.parent
launch_command_for_all_dataset = ""
scenario_path = os.path.join(scenario_root, scenario_name)
t4_dataset_base_path = os.path.join(scenario_root, "t4_dataset")
t4_dataset_base_path = dataset_path.joinpath("t4_dataset")
t4_datasets = scenario_yaml_obj["Evaluation"]["Datasets"]
is_database_evaluation = bool(len(t4_datasets) > 1)
for dataset in t4_datasets:
# get dataset_id
key = next(iter(dataset))
# create sub directory for the dataset
output_dir_per_dataset = os.path.join(scenario_output_dir, key)
os.makedirs(output_dir_per_dataset)
output_dir_per_dataset = self.__output_directory.joinpath(dataset_path.name, key)
output_dir_per_dataset.mkdir()
vehicle_id = dataset[key].get("VehicleId", "")
map_path = os.path.expandvars(dataset[key].get("LocalMapPath", ""))
t4_dataset_path = os.path.join(t4_dataset_base_path, key)
launch_sensing = str(dataset[key].get("LaunchSensing", True))

map_path = Path(expandvars(dataset[key].get("LocalMapPath", "/dlr_not_exist_path")))
t4_dataset_path = t4_dataset_base_path.joinpath(key)
if vehicle_id == "":
termcolor.cprint(
f"vehicle_id is not defined in dataset: {key}",
"red",
)
continue

if map_path != "":
if not os.path.exists(map_path):
termcolor.cprint(
"map: " + map_path + " used in dataset: " + key + " does not exist ",
"red",
)
continue
else:
termcolor.cprint(f"LocalMapPath is not described in {key}.", "red")
if not map_path.exists():
termcolor.cprint(
"map: " + map_path.as_posix() + " used in dataset: " + key + " does not exist ",
"red",
)
continue

if not os.path.exists(t4_dataset_path):
if not t4_dataset_path.exists():
termcolor.cprint(
f"t4_dataset: {key} does not exist",
"red",
Expand All @@ -217,57 +213,41 @@ def __create_launch_command_with_t4_dataset( # noqa TODO: fix logic

use_case_name = scenario_yaml_obj["Evaluation"]["UseCaseName"]
launch_base_command = f"ros2 launch driving_log_replayer {use_case_name}.launch.py"
# evaluation component args

launch_args = " scenario_path:=" + scenario_path
launch_args += " result_json_path:=" + os.path.join(
output_dir_per_dataset,
"result.json",
)

# ros2 bag play args
launch_args += " play_rate:=" + str(self.__rate)
launch_args += " play_delay:=" + str(self.__delay)
launch_args += " input_bag:=" + os.path.join(t4_dataset_path, "input_bag")

# ros2 bag record args
result_bag_path = os.path.join(output_dir_per_dataset, "result_bag")
launch_args += " result_bag_path:=" + result_bag_path

# logging_simulator.launch args
launch_args += " map_path:=" + map_path
launch_args += " vehicle_model:=" + scenario_yaml_obj["VehicleModel"]
launch_args += " sensor_model:=" + scenario_yaml_obj["SensorModel"]
launch_args += " vehicle_id:=" + vehicle_id
launch_args += " rviz:=true"
if scenario_yaml_obj.get("PerceptionMode") is not None:
perception_mode: str = scenario_yaml_obj["PerceptionMode"]
optional_arg = ""
perception_mode: str | None = scenario_yaml_obj.get("PerceptionMode")
if perception_mode is not None:
assert perception_mode in self.PERCEPTION_MODES, (
f"perception_mode must be chosen from {self.PERCEPTION_MODES}, "
f"but got {perception_mode}"
)
launch_args += " perception_mode:=" + perception_mode

# t4_dataset
launch_args += " t4_dataset_path:=" + t4_dataset_path
launch_args += " result_archive_path:=" + os.path.join(
optional_arg = f"perception_mode:={perception_mode} "
launch_args = self._create_common_arg(
optional_arg,
scenario_path,
output_dir_per_dataset,
"result_archive",
t4_dataset_path.joinpath("input_bag"),
AutowareEssentialParameter(
map_path,
scenario_yaml_obj["VehicleModel"],
vehicle_id,
scenario_yaml_obj["SensorModel"],
),
)
launch_args += " sensing:=" + launch_sensing
# t4_dataset
launch_args += f" t4_dataset_path:={t4_dataset_path}"
launch_args += f" result_archive_path:={output_dir_per_dataset.joinpath("result_archive")}"
launch_args += f" sensing:={dataset[key].get('LaunchSensing', True)}"
launch_command = launch_base_command + launch_args + "\n"
launch_command_for_all_dataset += launch_command
if is_database_evaluation:
database_result_script_path = os.path.join(
self.__autoware_path,
database_result_script_path = self.__autoware_path.joinpath(
"install",
"driving_log_replayer",
"lib",
"driving_log_replayer",
"perception_database_result.py",
)
database_result_command = f"python3 {database_result_script_path}"
database_result_command += f" -s {scenario_path} -r {scenario_output_dir}\n"
database_result_command = f"python3 {database_result_script_path.as_posix()} -s {scenario_path} -r {self.__output_directory.joinpath(dataset_path.name)}\n"
launch_command_for_all_dataset += database_result_command
print("launch command generated! => " + launch_command_for_all_dataset) # noqa
return launch_command_for_all_dataset

0 comments on commit e8a0c03

Please sign in to comment.