diff --git a/src/isar/apis/models/start_mission_definition.py b/src/isar/apis/models/start_mission_definition.py index f09e1df9..7aca5e49 100644 --- a/src/isar/apis/models/start_mission_definition.py +++ b/src/isar/apis/models/start_mission_definition.py @@ -11,6 +11,7 @@ from robot_interface.models.inspection.inspection import Inspection from robot_interface.models.mission.mission import Mission from robot_interface.models.mission.task import ( + TASKS, DockingProcedure, Localize, RecordAudio, @@ -65,10 +66,10 @@ class StartMissionDefinition(BaseModel): def to_isar_mission(start_mission_definition: StartMissionDefinition) -> Mission: - isar_tasks: List[Task] = [] + isar_tasks: List[TASKS] = [] for start_mission_task_definition in start_mission_definition.tasks: - task: Task = create_isar_task(start_mission_task_definition) + task: TASKS = create_isar_task(start_mission_task_definition) if start_mission_task_definition.id: task.id = start_mission_task_definition.id isar_tasks.append(task) @@ -114,7 +115,7 @@ def to_isar_mission(start_mission_definition: StartMissionDefinition) -> Mission return isar_mission -def check_for_duplicate_ids(items: List[Task]): +def check_for_duplicate_ids(items: List[TASKS]): duplicate_ids = get_duplicate_ids(items=items) if len(duplicate_ids) > 0: raise MissionPlannerError( @@ -213,7 +214,7 @@ def create_dock_task() -> DockingProcedure: return DockingProcedure(behavior="dock") -def get_duplicate_ids(items: List[Task]) -> List[str]: +def get_duplicate_ids(items: List[TASKS]) -> List[str]: unique_ids: List[str] = [] duplicate_ids: List[str] = [] for item in items: diff --git a/src/isar/mission_planner/local_planner.py b/src/isar/mission_planner/local_planner.py index 2b8e7819..cad87f8e 100644 --- a/src/isar/mission_planner/local_planner.py +++ b/src/isar/mission_planner/local_planner.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import List +from typing import List, Optional from alitra import Frame from injector import inject @@ -14,6 +14,7 @@ from isar.services.readers.base_reader import BaseReader, BaseReaderError from robot_interface.models.mission.mission import Mission from robot_interface.models.mission.task import ( + TASK_CLASSES, DockingProcedure, Localize, MoveArm, @@ -52,39 +53,6 @@ def get_mission(self, mission_id) -> Mission: def read_mission_from_file(mission_path: Path) -> Mission: mission_dict: dict = BaseReader.read_json(location=mission_path) - mission_tasks: List[Task] = [] - task_dataclass: Task = None - - for task in mission_dict["tasks"]: - if task["type"] == "return_to_home": - task_dataclass = ReturnToHome - elif task["type"] == "localize": - task_dataclass = Localize - elif task["type"] == "move_arm": - task_dataclass = MoveArm - elif task["type"] == "take_image": - task_dataclass = TakeImage - elif task["type"] == "take_thermal_image": - task_dataclass = TakeThermalImage - elif task["type"] == "take_video": - task_dataclass = TakeVideo - elif task["type"] == "take_thermal_video": - task_dataclass = TakeThermalVideo - elif task["type"] == "record_audio": - task_dataclass = RecordAudio - elif task["type"] == "docking_procedure": - task_dataclass = DockingProcedure - - if task_dataclass: - task: Task = BaseReader.dict_to_dataclass( - dataclass_dict=task, - target_dataclass=task_dataclass, - cast_config=[Frame], - strict_config=True, - ) - mission_tasks.append(task) - - mission_dict["tasks"] = [] mission: Mission = BaseReader.dict_to_dataclass( dataclass_dict=mission_dict, target_dataclass=Mission, @@ -92,8 +60,6 @@ def read_mission_from_file(mission_path: Path) -> Mission: strict_config=True, ) - mission.tasks = mission_tasks - return mission def get_predefined_missions(self) -> dict: diff --git a/src/isar/mission_planner/sequential_task_selector.py b/src/isar/mission_planner/sequential_task_selector.py index c563eb64..6edb7ff5 100644 --- a/src/isar/mission_planner/sequential_task_selector.py +++ b/src/isar/mission_planner/sequential_task_selector.py @@ -4,7 +4,7 @@ TaskSelectorInterface, TaskSelectorStop, ) -from robot_interface.models.mission.task import Task +from robot_interface.models.mission.task import TASKS, Task class SequentialTaskSelector(TaskSelectorInterface): @@ -12,7 +12,7 @@ def __init__(self) -> None: super().__init__() self._iterator: Iterator = None - def initialize(self, tasks: List[Task]) -> None: + def initialize(self, tasks: List[TASKS]) -> None: super().initialize(tasks=tasks) self._iterator = iter(self.tasks) diff --git a/src/isar/mission_planner/task_selector_interface.py b/src/isar/mission_planner/task_selector_interface.py index d3d34f0e..15c075c2 100644 --- a/src/isar/mission_planner/task_selector_interface.py +++ b/src/isar/mission_planner/task_selector_interface.py @@ -1,18 +1,18 @@ from abc import ABCMeta, abstractmethod from typing import List -from robot_interface.models.mission.task import Task +from robot_interface.models.mission.task import TASKS, Task class TaskSelectorInterface(metaclass=ABCMeta): def __init__(self) -> None: - self.tasks: List[Task] = None + self.tasks: List[TASKS] = None - def initialize(self, tasks: List[Task]) -> None: + def initialize(self, tasks: List[TASKS]) -> None: self.tasks = tasks @abstractmethod - def next_task(self) -> Task: + def next_task(self) -> TASKS: """ Returns ------- diff --git a/src/robot_interface/models/mission/mission.py b/src/robot_interface/models/mission/mission.py index f985d252..4fb03a49 100644 --- a/src/robot_interface/models/mission/mission.py +++ b/src/robot_interface/models/mission/mission.py @@ -5,13 +5,13 @@ from robot_interface.models.exceptions.robot_exceptions import ErrorMessage from robot_interface.models.mission.status import MissionStatus -from robot_interface.models.mission.task import Task +from robot_interface.models.mission.task import TASKS from robot_interface.utilities.uuid_string_factory import uuid4_string @dataclass class Mission: - tasks: List[Task] + tasks: List[TASKS] id: str = field(default_factory=uuid4_string, init=True) name: str = "" start_pose: Optional[Pose] = None diff --git a/src/robot_interface/models/mission/task.py b/src/robot_interface/models/mission/task.py index ccc9d8c0..92b67c32 100644 --- a/src/robot_interface/models/mission/task.py +++ b/src/robot_interface/models/mission/task.py @@ -1,6 +1,6 @@ from dataclasses import dataclass, field from enum import Enum -from typing import Iterator, Literal, Optional, Type +from typing import Iterator, Literal, Optional, Type, Union from alitra import Pose, Position @@ -182,3 +182,16 @@ class RecordAudio(InspectionTask): @staticmethod def get_inspection_type() -> Type[Inspection]: return Audio + + +TASKS = Union[ + ReturnToHome, + Localize, + MoveArm, + TakeImage, + TakeThermalImage, + TakeVideo, + TakeThermalVideo, + RecordAudio, + DockingProcedure, +] diff --git a/tests/isar/models/test_start_mission_definition.py b/tests/isar/models/test_start_mission_definition.py index a8dd75c2..dee0e983 100644 --- a/tests/isar/models/test_start_mission_definition.py +++ b/tests/isar/models/test_start_mission_definition.py @@ -11,7 +11,7 @@ to_isar_mission, ) from robot_interface.models.mission.mission import Mission -from robot_interface.models.mission.task import Task +from robot_interface.models.mission.task import TASKS, Task task_1: Task = Task(tag_id=None, id="123") task_2: Task = Task(tag_id=None, id="123") @@ -33,7 +33,7 @@ ), ], ) -def test_duplicate_id_check(item_list: List[Task], expected_boolean: bool): +def test_duplicate_id_check(item_list: List[TASKS], expected_boolean: bool): duplicates: List[str] = get_duplicate_ids(item_list) has_duplicates: bool = len(duplicates) > 0 assert has_duplicates == expected_boolean