Skip to content

Commit

Permalink
add 2d attribute merger (#42)
Browse files Browse the repository at this point in the history
Signed-off-by: Shunsuke Miura <[email protected]>
  • Loading branch information
miursh authored Nov 14, 2023
1 parent a88fad2 commit 2965b4a
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
16 changes: 16 additions & 0 deletions config/add_2d_attribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
task: add_2d_attribute
description:
camera_index:
CAM_FRONT: 0
CAM_FRONT_RIGHT: 1
CAM_BACK_RIGHT: 2
CAM_BACK: 3
CAM_BACK_LEFT: 4
CAM_FRONT_LEFT: 5
conversion:
input_base: ./data/input
input_anno_file: ./data/deepen_format/2d_attribute_deepen.json
output_base: ./data/t4_format
dataset_corresponding:
# input dataset_name: deepen dataset_id
t4_dataset_1-2: dummy_dataset_id
16 changes: 16 additions & 0 deletions config/label/attribute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,19 @@ object_state:
truncation_state:
non-truncated: [truncation_state.non-truncated]
truncated: [truncation_state.truncated]
left_blinker:
unknown: [left_blinker.unknown]
on: [left_blinker.on]
off: [left_blinker.off]
right_blinker:
unknown: [right_blinker.unknown]
on: [right_blinker.on]
off: [right_blinker.off]
brake_lamp:
unknown: [brake_lamp.unknown]
on: [brake_lamp.on]
off: [brake_lamp.off]
vehicle_front_or_rear_or_side:
front: [vehicle_front_or_rear_or_side.front]
rear: [vehicle_front_or_rear_or_side.rear]
side: [vehicle_front_or_rear_or_side.side]
22 changes: 22 additions & 0 deletions perception_dataset/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ def main():
converter.convert()
logger.info("[END] Conversion Completed")

elif task == "add_2d_attribute":
from perception_dataset.t4_dataset.attribute_merger import T4dataset2DAttributeMerger

input_base = config_dict["conversion"]["input_base"]
input_anno_file = config_dict["conversion"]["input_anno_file"]
output_base = config_dict["conversion"]["output_base"]
dataset_corresponding = config_dict["conversion"]["dataset_corresponding"]
description = config_dict["description"]

converter = T4dataset2DAttributeMerger(
input_base=input_base,
input_anno_file=input_anno_file,
output_base=output_base,
overwrite_mode=args.overwrite,
dataset_corresponding=dataset_corresponding,
description=description,
)

logger.info(f"[BEGIN] Merging T4 dataset ({input_base}) into T4 dataset ({output_base})")
converter.convert()
logger.info(f"[Done] Merging T4 dataset ({input_base}) into T4 dataset ({output_base})")

else:
raise NotImplementedError()

Expand Down
165 changes: 165 additions & 0 deletions perception_dataset/t4_dataset/attribute_merger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
from pathlib import Path
import re
import shutil
from typing import Any, Dict, List

from nuimages import NuImages
from tqdm import tqdm

from perception_dataset.deepen.deepen_to_t4_converter import DeepenToT4Converter
from perception_dataset.t4_dataset.classes.attribute import AttributeTable
from perception_dataset.utils.logger import configure_logger

logger = configure_logger(modname=__name__)


class T4dataset2DAttributeMerger(DeepenToT4Converter):
def __init__(
self,
input_base: str,
output_base: str,
input_anno_file: str,
overwrite_mode: bool,
dataset_corresponding: Dict[str, int],
description: Dict[str, Dict[str, str]],
):
self._input_base = Path(input_base)
self._output_base = Path(output_base)
self._input_anno_file: str = input_anno_file
self._overwrite_mode: bool = overwrite_mode
self._t4dataset_name_to_merge: Dict[str, str] = dataset_corresponding
self._description: Dict[str, Dict[str, str]] = description
self._ignore_interpolate_label: bool = True

# Initialize attribute table with empty values
self._attribute_table = AttributeTable(
name_to_description={},
default_value="",
)

def convert(self):
# Load Deepen annotation from JSON file
deepen_anno_json = self._load_deepen_annotation()

# Format Deepen annotation into a more usable structure
scenes_anno_dict: Dict[str, Dict[str, Any]] = self._format_deepen_annotation(
deepen_anno_json["labels"], self._description["camera_index"]
)

for t4dataset_name, dataset_id in self._t4dataset_name_to_merge.items():
input_dir = self._input_base / t4dataset_name
output_dir = self._output_base / t4dataset_name

if not input_dir.exists():
logger.warning(f"{input_dir} does not exist")
continue

is_dir_exist = output_dir.exists()
if self._overwrite_mode or not is_dir_exist:
shutil.rmtree(output_dir, ignore_errors=True)
self._copy_data(input_dir, output_dir)
else:
raise ValueError("If you want to overwrite files, use --overwrite option.")

# Start merging attributes
nuim = NuImages(
version="annotation", dataroot=self._input_base / t4dataset_name, verbose=False
)
out_object_ann, out_attribute = nuim.object_ann, nuim.attribute

for each_object_ann in tqdm(out_object_ann):
# Find corresponding annotation
max_iou_anno = self._find_corresponding_annotation(
nuim, each_object_ann, scenes_anno_dict[dataset_id]
)
if max_iou_anno is None:
continue

# Append attribute
self._update_attribute_table(max_iou_anno, out_attribute)
self._update_object_annotation(each_object_ann, max_iou_anno, out_attribute)

# Save modified data to files
object_ann_filename = output_dir / "annotation" / "object_ann.json"
attribute_filename = output_dir / "annotation" / "attribute.json"

with open(object_ann_filename, "w") as f:
json.dump(out_object_ann, f, indent=4)

with open(attribute_filename, "w") as f:
json.dump(out_attribute, f, indent=4)

def _load_deepen_annotation(self):
with open(self._input_anno_file) as f:
return json.load(f)

def _find_corresponding_annotation(
self, nuim: NuImages, object_ann: Dict[str, Any], scenes_anno: Dict[str, Any]
) -> Dict[str, Any]:
"""
Find corresponding annotation in Deepen annotation
Args:
nuim (NuImages): NuImages object
object_ann (Dict[str, Any]): object annotation
scenes_anno (Dict[str, Any]): Deepen annotation
"""
filename = nuim.get("sample_data", object_ann["sample_data_token"])["filename"]
camera_name = filename.split("/")[1]
frame_no = int(re.findall(r"\d+", filename.split("/")[2])[0])

# find largest IoU annotation
frame_annotations = [
a
for a in scenes_anno[frame_no]
if a["sensor_id"] == self._description["camera_index"][camera_name]
]
max_iou = 0
max_iou_anno = None
for anno in frame_annotations:
iou = self._get_IoU(object_ann["bbox"], anno["two_d_box"])
if iou > max_iou:
max_iou = iou
max_iou_anno = anno

return max_iou_anno

def _get_IoU(self, bbox1: List[float], bbox2: List[float]):
"""
Calculate IoU between two bounding boxes
Args:
bbox1 (List[float]): bounding box 1
bbox2 (List[float]): bounding box 2
"""
x1 = max(bbox1[0], bbox2[0])
y1 = max(bbox1[1], bbox2[1])
x2 = min(bbox1[2], bbox2[2])
y2 = min(bbox1[3], bbox2[3])
if x1 >= x2 or y1 >= y2:
return 0
else:
intersection = (x2 - x1) * (y2 - y1)
union = (
(bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
+ (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
- intersection
)
return intersection / union

def _update_attribute_table(self, max_iou_anno, out_attribute):
attribute_names = [a["name"] for a in out_attribute]
for attr_name in max_iou_anno["attribute_names"]:
if attr_name not in attribute_names:
out_attribute.append(
{
"token": self._attribute_table.get_token_from_name(name=attr_name),
"name": attr_name,
"description": "",
}
)

def _update_object_annotation(self, each_object_ann, max_iou_anno, out_attribute):
for attr_name in max_iou_anno["attribute_names"]:
# update object_ann
token = [a["token"] for a in out_attribute if a["name"] == attr_name][0]
each_object_ann["attribute_tokens"].append(token)

0 comments on commit 2965b4a

Please sign in to comment.