Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add 2d attribute merger #42

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions config/add_2d_attribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
task: add_2d_attribute
description:
camera_index:
CAM_FRONT: 0
CAM_FRONT_RIGHT: 1
CAM_BACK_RIGHT: 2
CAM_BACK: 3
CAM_BACK_LEFT: 4
CAM_FRONT_LEFT: 5
conversion:
input_base: ./data/input
input_anno_file: ./data/deepen_format/2d_attribute_deepen.json
output_base: ./data/t4_format
dataset_corresponding:
# input dataset_name: deepen dataset_id
t4_dataset_1-2: dummy_dataset_id
16 changes: 16 additions & 0 deletions config/label/attribute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,19 @@ object_state:
truncation_state:
non-truncated: [truncation_state.non-truncated]
truncated: [truncation_state.truncated]
left_blinker:
unknown: [left_blinker.unknown]
on: [left_blinker.on]
off: [left_blinker.off]
right_blinker:
unknown: [right_blinker.unknown]
on: [right_blinker.on]
off: [right_blinker.off]
brake_lamp:
unknown: [brake_lamp.unknown]
on: [brake_lamp.on]
off: [brake_lamp.off]
vehicle_front_or_rear_or_side:
front: [vehicle_front_or_rear_or_side.front]
rear: [vehicle_front_or_rear_or_side.rear]
side: [vehicle_front_or_rear_or_side.side]
22 changes: 22 additions & 0 deletions perception_dataset/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ def main():
converter.convert()
logger.info("[END] Conversion Completed")

elif task == "add_2d_attribute":
from perception_dataset.t4_dataset.attribute_merger import T4dataset2DAttributeMerger

input_base = config_dict["conversion"]["input_base"]
input_anno_file = config_dict["conversion"]["input_anno_file"]
output_base = config_dict["conversion"]["output_base"]
dataset_corresponding = config_dict["conversion"]["dataset_corresponding"]
description = config_dict["description"]

converter = T4dataset2DAttributeMerger(
input_base=input_base,
input_anno_file=input_anno_file,
output_base=output_base,
overwrite_mode=args.overwrite,
dataset_corresponding=dataset_corresponding,
description=description,
)

logger.info(f"[BEGIN] Merging T4 dataset ({input_base}) into T4 dataset ({output_base})")
converter.convert()
logger.info(f"[Done] Merging T4 dataset ({input_base}) into T4 dataset ({output_base})")

else:
raise NotImplementedError()

Expand Down
165 changes: 165 additions & 0 deletions perception_dataset/t4_dataset/attribute_merger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
from pathlib import Path
import re
import shutil
from typing import Any, Dict, List

from nuimages import NuImages
from tqdm import tqdm

from perception_dataset.deepen.deepen_to_t4_converter import DeepenToT4Converter
from perception_dataset.t4_dataset.classes.attribute import AttributeTable
from perception_dataset.utils.logger import configure_logger

logger = configure_logger(modname=__name__)


class T4dataset2DAttributeMerger(DeepenToT4Converter):
def __init__(
self,
input_base: str,
output_base: str,
input_anno_file: str,
overwrite_mode: bool,
dataset_corresponding: Dict[str, int],
description: Dict[str, Dict[str, str]],
):
self._input_base = Path(input_base)
self._output_base = Path(output_base)
self._input_anno_file: str = input_anno_file
self._overwrite_mode: bool = overwrite_mode
self._t4dataset_name_to_merge: Dict[str, str] = dataset_corresponding
self._description: Dict[str, Dict[str, str]] = description
self._ignore_interpolate_label: bool = True

# Initialize attribute table with empty values
self._attribute_table = AttributeTable(
name_to_description={},
default_value="",
)

def convert(self):
# Load Deepen annotation from JSON file
deepen_anno_json = self._load_deepen_annotation()

# Format Deepen annotation into a more usable structure
scenes_anno_dict: Dict[str, Dict[str, Any]] = self._format_deepen_annotation(
deepen_anno_json["labels"], self._description["camera_index"]
)

for t4dataset_name, dataset_id in self._t4dataset_name_to_merge.items():
input_dir = self._input_base / t4dataset_name
output_dir = self._output_base / t4dataset_name

if not input_dir.exists():
logger.warning(f"{input_dir} does not exist")
continue

is_dir_exist = output_dir.exists()
if self._overwrite_mode or not is_dir_exist:
shutil.rmtree(output_dir, ignore_errors=True)
self._copy_data(input_dir, output_dir)
else:
raise ValueError("If you want to overwrite files, use --overwrite option.")

# Start merging attributes
nuim = NuImages(
version="annotation", dataroot=self._input_base / t4dataset_name, verbose=False
)
out_object_ann, out_attribute = nuim.object_ann, nuim.attribute

for each_object_ann in tqdm(out_object_ann):
# Find corresponding annotation
max_iou_anno = self._find_corresponding_annotation(
nuim, each_object_ann, scenes_anno_dict[dataset_id]
)
if max_iou_anno is None:
continue

# Append attribute
self._update_attribute_table(max_iou_anno, out_attribute)
self._update_object_annotation(each_object_ann, max_iou_anno, out_attribute)

# Save modified data to files
object_ann_filename = output_dir / "annotation" / "object_ann.json"
attribute_filename = output_dir / "annotation" / "attribute.json"

with open(object_ann_filename, "w") as f:
json.dump(out_object_ann, f, indent=4)

with open(attribute_filename, "w") as f:
json.dump(out_attribute, f, indent=4)

def _load_deepen_annotation(self):
with open(self._input_anno_file) as f:
return json.load(f)

def _find_corresponding_annotation(
self, nuim: NuImages, object_ann: Dict[str, Any], scenes_anno: Dict[str, Any]
) -> Dict[str, Any]:
"""
Find corresponding annotation in Deepen annotation
Args:
nuim (NuImages): NuImages object
object_ann (Dict[str, Any]): object annotation
scenes_anno (Dict[str, Any]): Deepen annotation
"""
filename = nuim.get("sample_data", object_ann["sample_data_token"])["filename"]
camera_name = filename.split("/")[1]
frame_no = int(re.findall(r"\d+", filename.split("/")[2])[0])

# find largest IoU annotation
frame_annotations = [
a
for a in scenes_anno[frame_no]
if a["sensor_id"] == self._description["camera_index"][camera_name]
]
max_iou = 0
max_iou_anno = None
for anno in frame_annotations:
iou = self._get_IoU(object_ann["bbox"], anno["two_d_box"])
if iou > max_iou:
max_iou = iou
max_iou_anno = anno

return max_iou_anno

def _get_IoU(self, bbox1: List[float], bbox2: List[float]):
"""
Calculate IoU between two bounding boxes
Args:
bbox1 (List[float]): bounding box 1
bbox2 (List[float]): bounding box 2
"""
x1 = max(bbox1[0], bbox2[0])
y1 = max(bbox1[1], bbox2[1])
x2 = min(bbox1[2], bbox2[2])
y2 = min(bbox1[3], bbox2[3])
if x1 >= x2 or y1 >= y2:
return 0
else:
intersection = (x2 - x1) * (y2 - y1)
union = (
(bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
+ (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
- intersection
)
return intersection / union

def _update_attribute_table(self, max_iou_anno, out_attribute):
attribute_names = [a["name"] for a in out_attribute]
for attr_name in max_iou_anno["attribute_names"]:
if attr_name not in attribute_names:
out_attribute.append(
{
"token": self._attribute_table.get_token_from_name(name=attr_name),
"name": attr_name,
"description": "",
}
)

def _update_object_annotation(self, each_object_ann, max_iou_anno, out_attribute):
for attr_name in max_iou_anno["attribute_names"]:
# update object_ann
token = [a["token"] for a in out_attribute if a["name"] == attr_name][0]
each_object_ann["attribute_tokens"].append(token)
Loading