From 2965b4a6fcf1c05490d9700cea21db33241202f1 Mon Sep 17 00:00:00 2001 From: Shunsuke Miura <37187849+miursh@users.noreply.github.com> Date: Tue, 14 Nov 2023 16:25:03 +0900 Subject: [PATCH] add 2d attribute merger (#42) Signed-off-by: Shunsuke Miura --- config/add_2d_attribute.yaml | 16 ++ config/label/attribute.yaml | 16 ++ perception_dataset/convert.py | 22 +++ .../t4_dataset/attribute_merger.py | 165 ++++++++++++++++++ 4 files changed, 219 insertions(+) create mode 100644 config/add_2d_attribute.yaml create mode 100644 perception_dataset/t4_dataset/attribute_merger.py diff --git a/config/add_2d_attribute.yaml b/config/add_2d_attribute.yaml new file mode 100644 index 00000000..b8ca7b32 --- /dev/null +++ b/config/add_2d_attribute.yaml @@ -0,0 +1,16 @@ +task: add_2d_attribute +description: + camera_index: + CAM_FRONT: 0 + CAM_FRONT_RIGHT: 1 + CAM_BACK_RIGHT: 2 + CAM_BACK: 3 + CAM_BACK_LEFT: 4 + CAM_FRONT_LEFT: 5 +conversion: + input_base: ./data/input + input_anno_file: ./data/deepen_format/2d_attribute_deepen.json + output_base: ./data/t4_format + dataset_corresponding: + # input dataset_name: deepen dataset_id + t4_dataset_1-2: dummy_dataset_id diff --git a/config/label/attribute.yaml b/config/label/attribute.yaml index 701c6ad2..271e8da0 100644 --- a/config/label/attribute.yaml +++ b/config/label/attribute.yaml @@ -29,3 +29,19 @@ object_state: truncation_state: non-truncated: [truncation_state.non-truncated] truncated: [truncation_state.truncated] +left_blinker: + unknown: [left_blinker.unknown] + on: [left_blinker.on] + off: [left_blinker.off] +right_blinker: + unknown: [right_blinker.unknown] + on: [right_blinker.on] + off: [right_blinker.off] +brake_lamp: + unknown: [brake_lamp.unknown] + on: [brake_lamp.on] + off: [brake_lamp.off] +vehicle_front_or_rear_or_side: + front: [vehicle_front_or_rear_or_side.front] + rear: [vehicle_front_or_rear_or_side.rear] + side: [vehicle_front_or_rear_or_side.side] diff --git a/perception_dataset/convert.py b/perception_dataset/convert.py index 01ac22e6..42ba0662 100644 --- a/perception_dataset/convert.py +++ b/perception_dataset/convert.py @@ -141,6 +141,28 @@ def main(): converter.convert() logger.info("[END] Conversion Completed") + elif task == "add_2d_attribute": + from perception_dataset.t4_dataset.attribute_merger import T4dataset2DAttributeMerger + + input_base = config_dict["conversion"]["input_base"] + input_anno_file = config_dict["conversion"]["input_anno_file"] + output_base = config_dict["conversion"]["output_base"] + dataset_corresponding = config_dict["conversion"]["dataset_corresponding"] + description = config_dict["description"] + + converter = T4dataset2DAttributeMerger( + input_base=input_base, + input_anno_file=input_anno_file, + output_base=output_base, + overwrite_mode=args.overwrite, + dataset_corresponding=dataset_corresponding, + description=description, + ) + + logger.info(f"[BEGIN] Merging T4 dataset ({input_base}) into T4 dataset ({output_base})") + converter.convert() + logger.info(f"[Done] Merging T4 dataset ({input_base}) into T4 dataset ({output_base})") + else: raise NotImplementedError() diff --git a/perception_dataset/t4_dataset/attribute_merger.py b/perception_dataset/t4_dataset/attribute_merger.py new file mode 100644 index 00000000..4fe59c39 --- /dev/null +++ b/perception_dataset/t4_dataset/attribute_merger.py @@ -0,0 +1,165 @@ +import json +from pathlib import Path +import re +import shutil +from typing import Any, Dict, List + +from nuimages import NuImages +from tqdm import tqdm + +from perception_dataset.deepen.deepen_to_t4_converter import DeepenToT4Converter +from perception_dataset.t4_dataset.classes.attribute import AttributeTable +from perception_dataset.utils.logger import configure_logger + +logger = configure_logger(modname=__name__) + + +class T4dataset2DAttributeMerger(DeepenToT4Converter): + def __init__( + self, + input_base: str, + output_base: str, + input_anno_file: str, + overwrite_mode: bool, + dataset_corresponding: Dict[str, int], + description: Dict[str, Dict[str, str]], + ): + self._input_base = Path(input_base) + self._output_base = Path(output_base) + self._input_anno_file: str = input_anno_file + self._overwrite_mode: bool = overwrite_mode + self._t4dataset_name_to_merge: Dict[str, str] = dataset_corresponding + self._description: Dict[str, Dict[str, str]] = description + self._ignore_interpolate_label: bool = True + + # Initialize attribute table with empty values + self._attribute_table = AttributeTable( + name_to_description={}, + default_value="", + ) + + def convert(self): + # Load Deepen annotation from JSON file + deepen_anno_json = self._load_deepen_annotation() + + # Format Deepen annotation into a more usable structure + scenes_anno_dict: Dict[str, Dict[str, Any]] = self._format_deepen_annotation( + deepen_anno_json["labels"], self._description["camera_index"] + ) + + for t4dataset_name, dataset_id in self._t4dataset_name_to_merge.items(): + input_dir = self._input_base / t4dataset_name + output_dir = self._output_base / t4dataset_name + + if not input_dir.exists(): + logger.warning(f"{input_dir} does not exist") + continue + + is_dir_exist = output_dir.exists() + if self._overwrite_mode or not is_dir_exist: + shutil.rmtree(output_dir, ignore_errors=True) + self._copy_data(input_dir, output_dir) + else: + raise ValueError("If you want to overwrite files, use --overwrite option.") + + # Start merging attributes + nuim = NuImages( + version="annotation", dataroot=self._input_base / t4dataset_name, verbose=False + ) + out_object_ann, out_attribute = nuim.object_ann, nuim.attribute + + for each_object_ann in tqdm(out_object_ann): + # Find corresponding annotation + max_iou_anno = self._find_corresponding_annotation( + nuim, each_object_ann, scenes_anno_dict[dataset_id] + ) + if max_iou_anno is None: + continue + + # Append attribute + self._update_attribute_table(max_iou_anno, out_attribute) + self._update_object_annotation(each_object_ann, max_iou_anno, out_attribute) + + # Save modified data to files + object_ann_filename = output_dir / "annotation" / "object_ann.json" + attribute_filename = output_dir / "annotation" / "attribute.json" + + with open(object_ann_filename, "w") as f: + json.dump(out_object_ann, f, indent=4) + + with open(attribute_filename, "w") as f: + json.dump(out_attribute, f, indent=4) + + def _load_deepen_annotation(self): + with open(self._input_anno_file) as f: + return json.load(f) + + def _find_corresponding_annotation( + self, nuim: NuImages, object_ann: Dict[str, Any], scenes_anno: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Find corresponding annotation in Deepen annotation + Args: + nuim (NuImages): NuImages object + object_ann (Dict[str, Any]): object annotation + scenes_anno (Dict[str, Any]): Deepen annotation + """ + filename = nuim.get("sample_data", object_ann["sample_data_token"])["filename"] + camera_name = filename.split("/")[1] + frame_no = int(re.findall(r"\d+", filename.split("/")[2])[0]) + + # find largest IoU annotation + frame_annotations = [ + a + for a in scenes_anno[frame_no] + if a["sensor_id"] == self._description["camera_index"][camera_name] + ] + max_iou = 0 + max_iou_anno = None + for anno in frame_annotations: + iou = self._get_IoU(object_ann["bbox"], anno["two_d_box"]) + if iou > max_iou: + max_iou = iou + max_iou_anno = anno + + return max_iou_anno + + def _get_IoU(self, bbox1: List[float], bbox2: List[float]): + """ + Calculate IoU between two bounding boxes + Args: + bbox1 (List[float]): bounding box 1 + bbox2 (List[float]): bounding box 2 + """ + x1 = max(bbox1[0], bbox2[0]) + y1 = max(bbox1[1], bbox2[1]) + x2 = min(bbox1[2], bbox2[2]) + y2 = min(bbox1[3], bbox2[3]) + if x1 >= x2 or y1 >= y2: + return 0 + else: + intersection = (x2 - x1) * (y2 - y1) + union = ( + (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]) + + (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1]) + - intersection + ) + return intersection / union + + def _update_attribute_table(self, max_iou_anno, out_attribute): + attribute_names = [a["name"] for a in out_attribute] + for attr_name in max_iou_anno["attribute_names"]: + if attr_name not in attribute_names: + out_attribute.append( + { + "token": self._attribute_table.get_token_from_name(name=attr_name), + "name": attr_name, + "description": "", + } + ) + + def _update_object_annotation(self, each_object_ann, max_iou_anno, out_attribute): + for attr_name in max_iou_anno["attribute_names"]: + # update object_ann + token = [a["token"] for a in out_attribute if a["name"] == attr_name][0] + each_object_ann["attribute_tokens"].append(token)