From 9580cabc5a2e8dd1a1d050a004eb9c2d2f094106 Mon Sep 17 00:00:00 2001 From: David Conner Date: Sat, 24 Aug 2024 23:07:50 -0400 Subject: [PATCH] use state id consistently to avoid long path strings; modify preempt and published outcome to improve sync; add flexbe_outcome_listener node for simple monitoring; this breaks API with flexbe_app and requires version 4.1.0+ of the FlexBE WebUI API --- README.md | 55 +++-- flexbe_core/flexbe_core/__init__.py | 2 +- flexbe_core/flexbe_core/behavior.py | 35 ++- flexbe_core/flexbe_core/core/__init__.py | 32 ++- .../flexbe_core/core/concurrency_container.py | 58 +++-- flexbe_core/flexbe_core/core/event_state.py | 14 +- .../flexbe_core/core/lockable_state.py | 25 +- .../core/lockable_state_machine.py | 12 +- .../core/manually_transitionable_state.py | 6 +- .../flexbe_core/core/operatable_state.py | 21 +- .../core/operatable_state_machine.py | 110 +++++---- .../flexbe_core/core/preemptable_state.py | 3 +- .../core/preemptable_state_machine.py | 9 +- flexbe_core/flexbe_core/core/state.py | 3 +- flexbe_core/flexbe_core/core/state_machine.py | 4 +- flexbe_core/flexbe_core/core/state_map.py | 23 +- flexbe_core/flexbe_core/core/topics.py | 15 +- flexbe_core/test/test_core.py | 232 +++++++++++------- flexbe_core/test/test_exceptions.py | 34 ++- flexbe_core/test/test_exceptions_spin.py | 38 ++- flexbe_core/test/test_logger.py | 24 +- flexbe_core/test/test_proxies.py | 46 ++-- flexbe_mirror/flexbe_mirror/flexbe_mirror.py | 44 ++-- .../mirror_concurrency_container.py | 19 +- flexbe_mirror/flexbe_mirror/mirror_state.py | 28 ++- .../flexbe_mirror/mirror_state_machine.py | 37 +-- flexbe_msgs/CMakeLists.txt | 1 + flexbe_msgs/action/BehaviorExecution.action | 4 +- flexbe_msgs/msg/OutcomeRequest.msg | 2 +- flexbe_msgs/msg/StateMapMsg.msg | 8 + .../flexbe_onboard/flexbe_onboard.py | 13 +- flexbe_widget/CMakeLists.txt | 1 + flexbe_widget/bin/flexbe_outcome_listener | 6 + .../flexbe_widget/behavior_action_server.py | 6 +- .../flexbe_widget/behavior_launcher.py | 36 ++- .../flexbe_widget/flexbe_outcome_listener.py | 103 ++++++++ 36 files changed, 726 insertions(+), 383 deletions(-) create mode 100644 flexbe_msgs/msg/StateMapMsg.msg create mode 100644 flexbe_widget/bin/flexbe_outcome_listener create mode 100644 flexbe_widget/flexbe_widget/flexbe_outcome_listener.py diff --git a/README.md b/README.md index fb7ad55..eb03aea 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ FlexBE is a high-level behavior engine coordinating the capabilities of a robot in order to solve complex tasks. Behaviors are modeled as hierarchical state machines (HFSM) where states correspond to active actions and transitions describe the reaction to outcomes. -Main advantage over similar approaches is the good operator integration and an -intuitive user interface. + +The main advantage of FlexBE over similar approaches is the good operator integration and an intuitive user interface. Besides executing behaviors in full autonomy, the operator can restrict execution of certain transitions or trigger them manually. Furthermore, FlexBE supports modifying the whole structure of a behavior during its execution without restarting it. The user interface features a runtime control interface as well as a graphical editor for state machines. @@ -23,64 +23,73 @@ Rolling ![ROS Build Farm](https://build.ros2.org/job/Rdev__flexbe_behavior_engin ## Installation -For released versions, FlexBE is available as ` apt install` package `ros--flexbe-*` +For released versions, FlexBE is available as `apt install` package `ros--flexbe-*` To build from source, execute the following commands to install FlexBE for ROS 2 systems: - cd "ros2_ws"/src - git clone https://github.com/FlexBE/flexbe_behavior_engine.git + `cd "ros2_ws"/src` -Next, navigate to the "ros2_ws" top-level directory and build FlexBE: + `git clone https://github.com/FlexBE/flexbe_behavior_engine.git` - colcon build +Next, navigate to the "ros2_ws" top-level directory and build FlexBE: + `colcon build` ## Creating new FlexBE Behavior packages To begin, create your own repository for behavior development in the `${WORKSPACE_ROOT}/src` folder: - `ros2 run flexbe_widget create_repo [your_project_name] <--non-interactive>` + `ros2 run flexbe_widget create_repo [your_project_name] <--non-interactive>` This will clone a project template (requires internet access) that contains examples and proper package definitions, and create the ROS 2 package structure and three subfolders. For example, running + `ros2 run flexbe_widget create_repo my_project my_flexbe_project` from the `${WORKSPACE_ROOT}/src` folder will create: - * `${WORKSPACE_ROOT}/src/my_flexbe_project` - * `${WORKSPACE_ROOT}/src/my_flexbe_project/my_flexbe_project` - the ROS meta package - * `${WORKSPACE_ROOT}/src/my_flexbe_project/my_project_flexbe_behaviors` - * `${WORKSPACE_ROOT}/src/my_flexbe_project/my_project_flexbe_states` - + - `${WORKSPACE_ROOT}/src/my_flexbe_project` + - `${WORKSPACE_ROOT}/src/my_flexbe_project/my_flexbe_project` - the ROS meta package + - `${WORKSPACE_ROOT}/src/my_flexbe_project/my_project_flexbe_behaviors` + - `${WORKSPACE_ROOT}/src/my_flexbe_project/my_project_flexbe_states` These are intended to contain your custom FlexBE state implementations and HFSM-based behaviors. -This version of the flexbe_behavior_engine requires version 4.0+ of the FlexBE user interface. +This release of the FlexBE Behavior Engine requires version 4.1+ of the FlexBE UI. +This breaks compatability with the older FlexBE App and now requires use of the FlexBE WebUI tool. + +It is recommended to install the FlexBE WebUI user interface: -It is recommended to install the FlexBE user interface by following one of these steps: - * https://github.com/FlexBE/flexbe_webui.git - new Python-based webserver version (preferred) - * https://github.com/FlexBE/flexbe_app.git - classic FlexBE App (iron or ros2-devel branches) + [FlexBE WebUI](https://github.com/FlexBE/flexbe_webui.git) - Python-based webserver version ## Usage Use the following launch file for running the onboard engine: - ros2 launch flexbe_onboard behavior_onboard.launch.py + `ros2 launch flexbe_onboard behavior_onboard.launch.py` Use the following launch file for running the operator control station (requires the FlexBE App or WebUI): - ros2 launch flexbe_webui flexbe_ocs.launch.py + `ros2 launch flexbe_webui flexbe_ocs.launch.py` - > Note: replace `flexbe_webui` with `flexbe_app` to run the "classic" UI (after `ros2 run flexbe_app nwjs_install`). +During testing is is recommended to start the base nodes and the UI client separately: -Use the following launch file to run both of the above, e.g., for testing on a single computer: + `ros2 launch flexbe_webui flexbe_ocs.launch.py headless:=True` - ros2 launch flexbe_webui flexbe_full.launch.py + `ros2 run flexbe_webui webui_client` + + See the `flexbe_webui` README for more details. + + +Use the following launch file to run the entire FlexBE system, both onboard and OCS, e.g., for testing on a single computer: + + `ros2 launch flexbe_webui flexbe_full.launch.py` For running tests use: -`colcon test --ctest-args --packages-select ` + + `colcon test --ctest-args --packages-select ` ## Next Steps diff --git a/flexbe_core/flexbe_core/__init__.py b/flexbe_core/flexbe_core/__init__.py index d1d92f2..0f07353 100644 --- a/flexbe_core/flexbe_core/__init__.py +++ b/flexbe_core/flexbe_core/__init__.py @@ -43,7 +43,7 @@ from .logger import Logger # noqa: F401 from .state_logger import StateLogger # noqa: F401 -MIN_UI_VERSION = '4.0.0' # Minimum FlexBE App or UI version required to interact with this version of flexbe_core +MIN_UI_VERSION = '4.1.0' # Minimum FlexBE UI version required to interact with this version of flexbe_core # pylint: disable=R0903 diff --git a/flexbe_core/flexbe_core/behavior.py b/flexbe_core/flexbe_core/behavior.py index b1674c4..bc9cdb0 100644 --- a/flexbe_core/flexbe_core/behavior.py +++ b/flexbe_core/flexbe_core/behavior.py @@ -30,7 +30,7 @@ """This defines the superclass for all implemented behaviors.""" -from flexbe_core.core import LockableStateMachine, OperatableStateMachine, PreemptableState +from flexbe_core.core import LockableStateMachine, OperatableStateMachine, PreemptableState, StateMap from flexbe_core.logger import Logger from flexbe_msgs.msg import BehaviorSync @@ -44,6 +44,7 @@ def __init__(self): self._state_machine = None self.name = 'unnamed behavior' self.beh_id = 0 # Behavior id checksum assigned by processing the file contents + self._state_map = None self.contains = {} self._behaviors = {} @@ -51,7 +52,7 @@ def __init__(self): self._autonomy_level = 3 self._debug = False - self.requested_state_path = None + self.requested_state_id = None # Please implement this abstract method: def create(self): @@ -178,15 +179,25 @@ def set_parameter(self, name, value): def confirm(self): """Confirm that this behavior is ready for execution.""" - LockableStateMachine.path_for_switch = self.requested_state_path - - self._state_machine.confirm(self.name, self.beh_id) - - # def define_structure(self): - # """ - # Calculate all state ids and prepare the ContainerStructure message - # """ - # self._state_machine.define_structure() + self._state_map = StateMap() + self._state_machine.confirm(self.name, self.beh_id, self._state_map) + LockableStateMachine.path_for_switch = None + if self.requested_state_id is not None: + requested_state = self._state_map[self.requested_state_id] + LockableStateMachine.path_for_switch = requested_state.path + + @property + def state_map_items(self): + """Return two lists of keys and values from state map.""" + if self._state_machine is not None: + return list(zip(*self._state_map.items)) + return [], [] + + def get_state_by_id(self, st_id): + """Return state reference from state map by id.""" + if self._state_map is not None: + return self._state_map.get(st_id) + return None def execute(self): """ @@ -225,7 +236,7 @@ def prepare_for_switch(self, state): sm.replace_userdata(state_container.userdata) state_container = state_container._parent states[1].replace_state(state) # add to new state machine - self.requested_state_path = state.path # set start after switch + self.requested_state_id = state.state_id # set start after switch def get_current_states(self): """Get all currently active (sub-)states.""" diff --git a/flexbe_core/flexbe_core/core/__init__.py b/flexbe_core/flexbe_core/core/__init__.py index 0772e08..c3ae7f1 100644 --- a/flexbe_core/flexbe_core/core/__init__.py +++ b/flexbe_core/flexbe_core/core/__init__.py @@ -31,6 +31,9 @@ from .concurrency_container import ConcurrencyContainer # noqa: F401 from .event_state import EventState # noqa: F401 +from .exceptions import StateError # noqa: F401 +from .exceptions import StateMachineError # noqa: F401 +from .exceptions import UserDataError # noqa: F401 from .lockable_state import LockableState # noqa: F401 from .lockable_state_machine import LockableStateMachine # noqa: F401 from .manually_transitionable_state import ManuallyTransitionableState # noqa: F401 @@ -44,23 +47,28 @@ from .state import State # noqa: F401 from .state_machine import StateMachine # noqa: F401 from .state_map import StateMap # noqa: F401 +from .topics import Topics from .user_data import UserData # noqa: F401 __all__ = [ - 'PreemptableStateMachine', - 'OperatableStateMachine', - 'LockableStateMachine', - 'RosStateMachine', - 'StateMachine', 'ConcurrencyContainer', - 'PriorityContainer', - 'State', - 'RosState', - 'ManuallyTransitionableState', + 'EventState', 'LockableState', - 'PreemptableState', + 'LockableStateMachine', + 'ManuallyTransitionableState', 'OperatableState', - 'EventState', + 'OperatableStateMachine', + 'PreemptableState', + 'PreemptableStateMachine', + 'PriorityContainer', + 'RosState', + 'RosStateMachine', + 'State', + 'StateError', + 'StateMachine', + 'StateMachineError', 'StateMap', - 'UserData' + 'Topics', + 'UserData', + 'UserDataError' ] diff --git a/flexbe_core/flexbe_core/core/concurrency_container.py b/flexbe_core/flexbe_core/core/concurrency_container.py index 0d8d501..04fbe94 100644 --- a/flexbe_core/flexbe_core/core/concurrency_container.py +++ b/flexbe_core/flexbe_core/core/concurrency_container.py @@ -38,6 +38,7 @@ from flexbe_core.core.lockable_state_machine import LockableStateMachine from flexbe_core.core.operatable_state_machine import OperatableStateMachine from flexbe_core.core.priority_container import PriorityContainer +from flexbe_core.core.state import State from flexbe_core.core.topics import Topics from flexbe_core.core.user_data import UserData from flexbe_core.logger import Logger @@ -95,10 +96,6 @@ def get_required_autonomy(self, outcome, state): def _execute_current_state(self): """Execute the current states within this concurrency container.""" # execute all states that are done with sleeping and determine next sleep duration - if self._entering: - Logger.localerr(f"CC: Why is entering flag set here '{self.name}' of '{self.path}'?") - self.on_enter(self._userdata) - self._inner_sync_request = False # clear prior request for lower level state self._current_state = [] # Concurrency container has multiple active states so use list @@ -107,18 +104,18 @@ def _execute_current_state(self): # Special handling in concurrency container - can be either ConcurrencyContainer or one of several internal states. command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) - if command_msg.target == self.name: + if command_msg.target == self.state_id: cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer assert cmd_msg2 is command_msg, 'Unexpected change in CMD_TRANSITION_TOPIC buffer' - Logger.localinfo(f"ConcurrencyContainer '{self.name}' is handling the transition cmd msg={command_msg}") + Logger.localinfo(f"ConcurrencyContainer '{self.path}' is handling the transition cmd msg={command_msg}") self._force_transition = True outcome = self.outcomes[command_msg.outcome] self._manual_transition_requested = outcome self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, CommandFeedback(command='transition', - args=[command_msg.target, self.name])) - Logger.localwarn(f"--> Manually triggered outcome {outcome} of concurrency container '{self.name}'") + args=[f'{command_msg.target}', f'{self.state_id}'])) # string[] + Logger.localwarn(f"--> Manually triggered outcome {outcome} of concurrency container '{self.path}'") self._publish_outcome(outcome) self._returned_outcomes = {} @@ -132,7 +129,7 @@ def _execute_current_state(self): if self._is_controlled and self._last_requested_outcome is not None: # We have already processed the current state and received an outcome # We are waiting on outcome confirmation from the OCS - Logger.localinfo(f"CC '{self.path}' is waiting on user to confirm outcome") + Logger.loginfo_throttle(2.0, f"CC '{self.path}' is waiting on user to confirm outcome") return None for state in self._states: @@ -141,13 +138,13 @@ def _execute_current_state(self): continue # already done with executing if self._manual_transition_requested is not None: - if self._manual_transition_requested.target == state.name: + if self._manual_transition_requested.target == state.state_id: # Transition request applies to this state # @TODO - Should we be using path not name here? command_msg = self._manual_transition_requested cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer assert cmd_msg2 is command_msg, 'Something is up with handling of buffer for CMD_TRANSITION_TOPIC' - Logger.localinfo(f"ConcurrencyContainer '{self.name}' state '{state.name}' is handling " + Logger.localinfo(f"ConcurrencyContainer '{self.name}' state '{state.path}' is handling " f"the cmd msg='{command_msg}'") self._manual_transition_requested = None # Reset at this level @@ -167,7 +164,7 @@ def _execute_current_state(self): self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, CommandFeedback(command='transition', - args=[command_msg.target, state.name])) + args=[f'{command_msg.target}', f'{state.state_id}'])) Logger.localerr(f'--> Manually triggered outcome {outcome} ({command_msg.outcome}) ' f"of state '{state.name}' from inside ConcurrencyContainer '{self.name}'") continue @@ -204,9 +201,9 @@ def _execute_current_state(self): # Determine concurrency outcome outcome = None - if any(self._returned_outcomes[state.name] == state._preempted_name + if any(self._returned_outcomes[state.name] == State._preempted_name for state in self._states if state.name in self._returned_outcomes): - return self._preempted_name # handle preemption if required + return State._preempted_name # handle preemption if required # check conditions for item in self._conditions: (out, cond) = item @@ -227,7 +224,7 @@ def _execute_current_state(self): if outcome != self._last_requested_outcome: self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=self.outcomes.index(outcome), - target=self.path)) + target=self.state_id)) Logger.localinfo('<-- Want result: %s > %s' % (self.path, outcome)) StateLogger.log('flexbe.operator', self, type='request', request=outcome, autonomy=self.parent.autonomy_level, @@ -236,8 +233,6 @@ def _execute_current_state(self): outcome = None elif outcome is not None and outcome in self.outcomes: # autonomy level is high enough, report the executed transition - Logger.localinfo(f"controlled CC '{self.name}' from '{self.path}'permitting outcome '{outcome}' ") - self._publish_outcome(outcome) self._force_transition = False self._last_outcome = outcome @@ -251,22 +246,27 @@ def _execute_single_state(self, state, force_exit=False): input_keys=state.input_keys, output_keys=state.output_keys) as userdata: state._inner_sync_request = False # clear any prior sync on call to individual state if force_exit: - state._entering = True + if state._exited: + Logger.localinfo(f"force exit for '{state.name}' ({state.path}) but already exited?") state.on_exit(userdata) + state._entering = True + state._exited = True + if state._last_outcome is None: + Logger.localinfo(f"preempting '{state.name}' ({state.path})") + state._last_outcome = State._preempted_name + state._publish_outcome(State._preempted_name) # Normally by EventState or StateMachine.execute else: - result = state.execute(userdata) + result = state.execute(userdata) # This is call on_exit if necessary except Exception as exc: # pylint: disable=W0703 result = None self._last_exception = exc Logger.logerr('ConcurrencyContainer: Failed to execute state %s:\n%s' % (self.current_state_label, str(exc))) import traceback # pylint: disable=C0415 Logger.localinfo(traceback.format_exc().replace('%', '%%')) - return result def on_enter(self, userdata): # pylint: disable=W0613 """Call on entering the concurrency container.""" - Logger.localinfo(f" CC on_enter for '{self.path}' ... ") super().on_enter(userdata) self._returned_outcomes = {} for state in self._states: @@ -279,17 +279,29 @@ def on_exit(self, userdata, states=None): Logger.localinfo(f"ConcurrencyContainer on_exit for '{self}'.") for state in self._states if states is None else states: if state.name in self._returned_outcomes and self._returned_outcomes[state.name] is not None: + if not state._exited: + Logger.localinfo(f"\x1b[93mCC '{self.name}' - '{state.name}' is in returned outcomes " + f"w/ '{self._returned_outcomes[state.name] }' but has not exited!\x1b[0m") continue # skip states that already exited themselves self._execute_single_state(state, force_exit=True) + self._current_state = None self._returned_outcomes = {} self._entering = True + if self._last_outcome is None: + # Publish outcome is normally invoked by StateMachine.execute + # If no outcome set, then notify that we preempted this state + self._publish_outcome(State._preempted_name) + if self._last_requested_outcome is not None: - Logger.localinfo(f"CC '{self.name}' of '{self.path}' clear prior LRO='{self._last_requested_outcome}'.") - self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.path)) + # Logger.localinfo(f"CC '{self.name}' of '{self.path}' clear prior LRO='{self._last_requested_outcome}'.") + self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.state_id)) self._last_requested_outcome = None + self._exited = True + self._entering = True # for next entry + def get_deep_states(self): """ Recursively look for the currently executing states. diff --git a/flexbe_core/flexbe_core/core/event_state.py b/flexbe_core/flexbe_core/core/event_state.py index be838b4..ed3c69e 100644 --- a/flexbe_core/flexbe_core/core/event_state.py +++ b/flexbe_core/flexbe_core/core/event_state.py @@ -87,6 +87,7 @@ def _event_execute(self, *args, **kwargs): if self._entering: self._entering = False + self._exited = False self._last_outcome = None self.on_enter(*args, **kwargs) @@ -104,9 +105,18 @@ def _event_execute(self, *args, **kwargs): self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, CommandFeedback(command='repeat')) repeat = True - if repeat or outcome is not None and not PreemptableState.preempt: - self._entering = True + if repeat or outcome is not None: + # As this is currently coded, a repeat command will immedately halt + # call on_exit, then reenter the state + # (vs. an alternative to wait until outcome and then repeat) self.on_exit(*args, **kwargs) + self._exited = True + self._entering = True # for next call + if repeat: + outcome = None # clear outcome so we stay on this state + else: + # Publish outcome for this state + self._publish_outcome(outcome) self._last_outcome = outcome return outcome diff --git a/flexbe_core/flexbe_core/core/lockable_state.py b/flexbe_core/flexbe_core/core/lockable_state.py index 2b752ae..0fe2c72 100644 --- a/flexbe_core/flexbe_core/core/lockable_state.py +++ b/flexbe_core/flexbe_core/core/lockable_state.py @@ -37,7 +37,7 @@ from flexbe_msgs.msg import CommandFeedback -from std_msgs.msg import String +from std_msgs.msg import Int32 class LockableState(ManuallyTransitionableState): @@ -99,41 +99,42 @@ def _lockable_execute(self, *args, **kwargs): def _execute_lock(self, target): """Execute lock.""" - if target in (self.path, ''): - target = self.path + + if target in (self.state_id, 0): + target = self.state_id found_target = True self._locked = True else: found_target = self._parent.lock(target) # provide feedback about lock self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, - CommandFeedback(command='lock', args=[target, target if found_target else ''])) + CommandFeedback(command='lock', args=[f'{target}', f'{target}' if found_target else '-1'])) if not found_target: - Logger.logwarn(f"Wanted to lock '{target}', but could not find it in current path '{self.path}'.") + Logger.logwarn(f"Wanted to lock state id '{target}', but could not find it in current path '{self.path}'.") else: - Logger.localinfo(f"--> Locking in state '{target}'") + Logger.localinfo(f"--> Locking in state '{target}' at '{self.path}'") def _execute_unlock(self, target): - if target == self.path or (self._locked and target == ''): - target = self.path + if target == self.state_id or (self._locked and target == 0): + target = self.state_id found_target = True self._locked = False else: found_target = self._parent.unlock(target) # provide feedback about unlock self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, - CommandFeedback(command='unlock', args=[target, target if found_target else ''])) + CommandFeedback(command='unlock', args=[f'{target}', f'{target}' if found_target else '-1'])) if not found_target: Logger.logwarn(f"Wanted to unlock '{target}', but could not find it in current path '{self.path}'.") else: - Logger.localinfo(f"--> Unlocking in state '{target}'") + Logger.localinfo(f"--> Unlocking in state '{target}' at '{self.path}'") def _enable_ros_control(self): if not self._is_controlled: super()._enable_ros_control() self._pub.create_publisher(Topics._CMD_FEEDBACK_TOPIC, CommandFeedback) - self._sub.subscribe(Topics._CMD_LOCK_TOPIC, String, inst_id=id(self)) - self._sub.subscribe(Topics._CMD_UNLOCK_TOPIC, String, inst_id=id(self)) + self._sub.subscribe(Topics._CMD_LOCK_TOPIC, Int32, inst_id=id(self)) + self._sub.subscribe(Topics._CMD_UNLOCK_TOPIC, Int32, inst_id=id(self)) def _disable_ros_control(self): if self._is_controlled: diff --git a/flexbe_core/flexbe_core/core/lockable_state_machine.py b/flexbe_core/flexbe_core/core/lockable_state_machine.py index 50ca269..11c87e5 100644 --- a/flexbe_core/flexbe_core/core/lockable_state_machine.py +++ b/flexbe_core/flexbe_core/core/lockable_state_machine.py @@ -95,25 +95,25 @@ def remove_state(self, state): # for locking - def lock(self, path): + def lock(self, st_id): """Lock this state.""" - if path == self.path: + if st_id == self.state_id: self._locked = True return True if self._parent is not None: - return self._parent.lock(path) + return self._parent.lock(st_id) return False - def unlock(self, path): + def unlock(self, st_id): """Unlock this state.""" - if path == self.path: + if st_id == self.state_id: self._locked = False return True if self._parent is not None: - return self._parent.unlock(path) + return self._parent.unlock(st_id) return False diff --git a/flexbe_core/flexbe_core/core/manually_transitionable_state.py b/flexbe_core/flexbe_core/core/manually_transitionable_state.py index 4deb24f..dfb6ba9 100644 --- a/flexbe_core/flexbe_core/core/manually_transitionable_state.py +++ b/flexbe_core/flexbe_core/core/manually_transitionable_state.py @@ -58,11 +58,11 @@ def _manually_transitionable_execute(self, *args, **kwargs): self._manual_transition_requested = None if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC): command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) - if command_msg.target == self.name: + if command_msg.target == self.state_id: cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) assert cmd_msg2 is command_msg, 'Unexpected change in CMD_TRANSITION_TOPIC buffer' self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, - CommandFeedback(command='transition', args=[command_msg.target, self.name])) + CommandFeedback(command='transition', args=[f'{command_msg.target}', f'{self.state_id}'])) self._force_transition = True outcome = self.outcomes[command_msg.outcome] @@ -71,7 +71,7 @@ def _manually_transitionable_execute(self, *args, **kwargs): return outcome else: Logger.loginfo(f"Requested outcome for state '{command_msg.target}' " - f" but this active state is '{self.name}' - keep looking for potential nested state") + f" but this active state is '{self.path}' - keep looking for potential nested state") # otherwise, return the normal outcome self._force_transition = False diff --git a/flexbe_core/flexbe_core/core/operatable_state.py b/flexbe_core/flexbe_core/core/operatable_state.py index c724d6e..3b3970a 100644 --- a/flexbe_core/flexbe_core/core/operatable_state.py +++ b/flexbe_core/flexbe_core/core/operatable_state.py @@ -32,6 +32,7 @@ """OperatableState.""" from flexbe_core.core.preemptable_state import PreemptableState +from flexbe_core.core.state import State from flexbe_core.core.state_map import StateMap from flexbe_core.core.topics import Topics from flexbe_core.logger import Logger @@ -66,7 +67,7 @@ def _operatable_execute(self, *args, **kwargs): if self._is_controlled: # reset previously requested outcome if applicable (not reset in on_enter/exit like OSM) if self._last_requested_outcome is not None and outcome is None: - self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.path)) + self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.state_id)) self._last_requested_outcome = None # request outcome because autonomy level is too low @@ -74,7 +75,7 @@ def _operatable_execute(self, *args, **kwargs): or outcome is not None and self.is_breakpoint): if outcome != self._last_requested_outcome: self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, - OutcomeRequest(outcome=self.outcomes.index(outcome), target=self.path)) + OutcomeRequest(outcome=self.outcomes.index(outcome), target=self.state_id)) Logger.localinfo("<-- Want result: '%s' > '%s'" % (self.path, outcome)) StateLogger.log('flexbe.operator', self, type='request', request=outcome, autonomy=self.parent.autonomy_level, @@ -84,17 +85,25 @@ def _operatable_execute(self, *args, **kwargs): # autonomy level is high enough, report the executed transition elif outcome is not None and outcome in self.outcomes: - Logger.localinfo(f"controlled State '{self.name}' from '{self.path}'permitting outcome '{outcome}' ") - self._publish_outcome(outcome) + Logger.localinfo(f"controlled State '{self.name}' from '{self.path}'permitting outcome '{outcome}' {self.__class__.__name__}") self._force_transition = False return outcome def _publish_outcome(self, outcome): """Update the UI and logs about this outcome.""" - # 0 outcome status denotes no outcome, not index so add +1 for valid outcome (subtract in mirror) + if outcome == State._preempted_name: + # special case of preempted outcome specify max outcome hash + Logger.localinfo('Publish Preempted: State result: %s > %s (%d) (%d) (%s)' + % (self.name, outcome, StateMap._MAX_OUTCOME, self.state_id, self.__class__.__name__)) + self._pub.publish(Topics._OUTCOME_TOPIC, UInt32(data=StateMap.hash(self, StateMap._MAX_OUTCOME))) + self._pub.publish(Topics._DEBUG_TOPIC, String(data='%s > %s' % (self.path, outcome))) + return + outcome_index = self.outcomes.index(outcome) - Logger.localinfo(f"State result: '{self.name}' -> '{outcome}'") + Logger.localinfo('Publish outcome: State result: %s > %s (%d) (%d) (%s)' + % (self.name, outcome, outcome_index, self.state_id, self.__class__.__name__)) + # 0 outcome status denotes no outcome, not index so add +1 for valid outcome (subtract in mirror) self._pub.publish(Topics._OUTCOME_TOPIC, UInt32(data=StateMap.hash(self, outcome_index))) self._pub.publish(Topics._DEBUG_TOPIC, String(data='%s > %s' % (self.path, outcome))) if self._force_transition: diff --git a/flexbe_core/flexbe_core/core/operatable_state_machine.py b/flexbe_core/flexbe_core/core/operatable_state_machine.py index de3b960..0c28df2 100644 --- a/flexbe_core/flexbe_core/core/operatable_state_machine.py +++ b/flexbe_core/flexbe_core/core/operatable_state_machine.py @@ -32,9 +32,11 @@ """OperatableStateMachine.""" from enum import Enum +from flexbe_core.core.exceptions import StateError, StateMachineError, UserDataError from flexbe_core.core.operatable_state import OperatableState from flexbe_core.core.preemptable_state_machine import PreemptableStateMachine from flexbe_core.core.ros_state import RosState +from flexbe_core.core.state import State from flexbe_core.core.state_map import StateMap from flexbe_core.core.topics import Topics from flexbe_core.core.user_data import UserData @@ -70,7 +72,6 @@ def __init__(self, *args, **kwargs): self._autonomy = {} self._inner_sync_request = False self._last_exception = None - self._state_map = None self._structure = None self._type = OperatableStateMachine.ContainerType.OperatableStateMachine.value @@ -108,16 +109,15 @@ def add(label, state, transitions, autonomy=None, remapping=None): PreemptableStateMachine.add(label, state, transitions, remapping) self._autonomy[label] = autonomy - def define_structure(self): + def define_structure(self, state_map): """Calculate all state ids and prepare the ContainerStructure message.""" - self._state_map = StateMap() - self._structure = self._build_structure_msg() - print(f'\x1b[94mBuilt {self._state_map}\x1b[0m', flush=True) + self._structure = self._build_structure_msg(state_map) + print(f'\x1b[94mBuilt {state_map}\x1b[0m', flush=True) - def _build_structure_msg(self): + def _build_structure_msg(self, state_map): """Create a message to describe the structure of this state machine.""" structure_msg = ContainerStructure() - container_msg = self._add_to_structure_msg(structure_msg, self._state_map) + container_msg = self._add_to_structure_msg(structure_msg, state_map) container_msg.outcomes = self.outcomes structure_msg.behavior_id = self.id return structure_msg @@ -180,18 +180,12 @@ def get_latest_status(self): # execution def _execute_current_state(self): - - if self._entering: - # On entering this state machine - Logger.localerr(f"OSM: Why is entering flag set here '{self.name}' of '{self.path}'?") - raise Exception("entering flag still set L187 of OSM - '{self.path}'") - self._manual_transition_requested = None if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC): # Special handling in statemachine container command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) - if command_msg.target == self.name: + if command_msg.target == self.state_id: cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer assert cmd_msg2 is command_msg, 'Unexpected change in CMD_TRANSITION_TOPIC buffer' Logger.localinfo(f"Statemachine '{self.name}' from '{self.path}' is " @@ -202,29 +196,31 @@ def _execute_current_state(self): self._manual_transition_requested = outcome self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, CommandFeedback(command='transition', - args=[command_msg.target, self.name])) + args=[f'{command_msg.target}', f'{self.state_id}'])) Logger.localwarn(f"--> Manually triggered outcome {outcome} of statemachine '{self.name}'") self._last_outcome = outcome - self._publish_outcome(outcome) return outcome if self._is_controlled and self._last_requested_outcome is not None: # We have already processed the current state and received an outcome # We are waiting on outcome confirmation from the OCS - Logger.localinfo(f"OSM '{self.path}' is waiting on user to confirm outcome") + Logger.loginfo_throttle(2.0, f"OSM '{self.path}' is waiting on user to confirm outcome") return None try: outcome = super()._execute_current_state() self._last_exception = None except Exception as exc: # pylint: disable=W0703 - # catch any exception and log here, but re-raise to stop behavior - outcome = None - self._last_exception = exc - Logger.logerr("Failed to execute state '%s':\n%s" % (self.current_state_label, str(exc))) + # catch any exception and log here, but re-raise to preempt behavior + Logger.logerr("Failed to execute state '%s':\n%s - %s" % (self.current_state_label, str(type(exc)), str(exc))) import traceback # pylint: disable=C0415 Logger.localinfo(traceback.format_exc().replace('%', '%%')) # Guard against exeception including format! - raise exc + outcome = None + if isinstance(exc, (StateError, StateMachineError, UserDataError)): + self._last_exception = exc + else: + self._last_exception = StateError(str(exc)) + raise self._last_exception if self._is_controlled: # request outcome because autonomy level is too low @@ -235,7 +231,7 @@ def _execute_current_state(self): if outcome != self._last_requested_outcome: self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=self.outcomes.index(outcome), - target=self.path)) + target=self.state_id)) Logger.localinfo("<-- Want result: '%s' -> '%s'" % (self.path, outcome)) StateLogger.log('flexbe.operator', self, type='request', request=outcome, autonomy=self.parent.autonomy_level, @@ -243,9 +239,7 @@ def _execute_current_state(self): self._last_requested_outcome = outcome outcome = None - if outcome is not None and outcome in self.outcomes: - Logger.localinfo(f"controlled SM '{self.name}' from '{self.path}'returning outcome '{outcome}' ") - self._publish_outcome(outcome) + if outcome is not None: self._force_transition = False self._last_outcome = outcome @@ -254,16 +248,17 @@ def _execute_current_state(self): def _publish_outcome(self, outcome): """Update the UI and logs about this outcome.""" # 0 outcome status denotes no outcome, not index so add +1 for valid outcome (subtract in mirror) - try: - outcome_index = self.outcomes.index(outcome) - except Exception as exc: # pylint: disable=W0703 - outcome_index = 0 - Logger.localerr('State Machine outcome error : %s > %s (%d) (%d) (%s)' - % (self.name, outcome, outcome_index, self._state_id, self.__class__.__name__)) - raise exc - - Logger.localinfo('State Machine result: %s > %s (%d) (%d) (%s)' - % (self.name, outcome, outcome_index, self._state_id, self.__class__.__name__)) + if outcome == State._preempted_name: + # special case of preempted outcome specify max outcome hash + Logger.localinfo('Publish Preempted: State Machine result: %s > %s (%d) (%d) (%s)' + % (self.name, outcome, StateMap._MAX_OUTCOME, self.state_id, self.__class__.__name__)) + self._pub.publish(Topics._OUTCOME_TOPIC, UInt32(data=StateMap.hash(self, StateMap._MAX_OUTCOME))) + self._pub.publish(Topics._DEBUG_TOPIC, String(data='%s > %s' % (self.path, outcome))) + return + + outcome_index = self.outcomes.index(outcome) + # Logger.localinfo('Publish outcome: State Machine result: %s > %s (%d) (%d) (%s)' + # % (self.name, outcome, outcome_index, self.state_id, self.__class__.__name__)) self._pub.publish(Topics._OUTCOME_TOPIC, UInt32(data=StateMap.hash(self, outcome_index))) self._pub.publish(Topics._DEBUG_TOPIC, String(data='%s > %s' % (self.path, outcome))) if self._force_transition: @@ -296,7 +291,7 @@ def get_required_autonomy(self, outcome, state): assert self.current_state_label == state.name, "get required autonomys in OSM state doesn't match!" return self._autonomy[self.current_state_label][outcome] except Exception: # pylint: disable=W0703 - Logger.error(f"Failure to retrieve autonomy for '{self.name}' - " + Logger.error(f"Failure to retrieve autonomy for '{self.name}' - {self.current_state_label}" f" current state label='{self.name}' outcome='{outcome}'.") Logger.localerr(f'{self._autonomy}') @@ -325,7 +320,7 @@ def destroy(self): Logger.localinfo(' state logger shutdown ...') StateLogger.shutdown() - def confirm(self, name, beh_id): + def confirm(self, name, beh_id, state_map): """ Confirm the state machine and triggers the creation of the structural message. @@ -337,11 +332,14 @@ def confirm(self, name, beh_id): @type beh_id: int @param beh_id: The behavior id of this state machine to identify it. + + @type state_map: StateMap + @param state_map: mapping of state ids to state instance """ self.set_name(name) self.id = beh_id - self.define_structure() + self.define_structure(state_map) Logger.localinfo(f"State machine '{self.name}' ({self.id}) (inst_id={id(self)}) confirmed and structure defined.") Logger.localinfo(f'--> Set up pub/sub for behavior {self.name}: {self.id} ...') @@ -446,26 +444,48 @@ def _notify_stop(self): def on_enter(self, userdata=None): # pylint: disable=W0613 """Call on entering the operatable state machine.""" - Logger.localinfo(f"OSM on enter for '{self.name}' from '{self.path}' ...") + self._last_outcome = None self._last_exception = None self._last_requested_outcome = None super().on_enter(userdata) def on_exit(self, userdata=None): """Call on exiting the statemachine.""" - Logger.localinfo(f"SM on exit for '{self.name}' from '{self.path}' ...") - self._entering = True - if self._current_state is not None: + if self._current_state is None: + if self._exited: + # This should never be true + Logger.localinfo(f"OSM on exit for '{self.name}' from '{self.path}' - " + f'and have already called on_exit? {self._current_state}, ' + f"{self._exited} '{self._last_outcome}' ...") + else: + # Current state is still active, so preempt and call on_exit + if self._current_state._exited or self._current_state._last_outcome is not None: + # This should not be true as should have cleared current state if this was true + Logger.localinfo(f"OSM on exit for '{self.name}' from '{self.path}' (exited={self._exited}) " + f"- with current='{self._current_state}' " + f"(exited={self._current_state._exited}), '{self._current_state._last_outcome}' ...") + with UserData(reference=self._userdata, input_keys=self._current_state.input_keys, output_keys=self._current_state.output_keys, remap=self._remappings[self._current_state.name]) as udata: # Pass userdata to internal states matching as defined in state_machine self._current_state.on_exit(udata) + self._current_state._exited = True + Logger.localinfo(f"preempting '{self._current_state.name}' ({self._current_state.path})") + self._current_state._publish_outcome(State._preempted_name) # Normally published by EventState.execute self._current_state._entering = True self._current_state = None + if self._last_outcome is None: + # no outcome, so notify that we preempted this state + # otherwise, outcome published by SM execute for regular outcome + self._publish_outcome(State._preempted_name) + if self._last_requested_outcome is not None: - Logger.localinfo(f"SM '{self.name}' of '{self.path}' clear prior LRO='{self._last_requested_outcome}'.") - self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.path)) + # Logger.localinfo(f"SM '{self.name}' of '{self.path}' clear prior LRO='{self._last_requested_outcome}'.") + self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.state_id)) self._last_requested_outcome = None + + self._exited = True + self._entering = True # for next entry diff --git a/flexbe_core/flexbe_core/core/preemptable_state.py b/flexbe_core/flexbe_core/core/preemptable_state.py index cbc5372..cc6c0d8 100644 --- a/flexbe_core/flexbe_core/core/preemptable_state.py +++ b/flexbe_core/flexbe_core/core/preemptable_state.py @@ -32,6 +32,7 @@ """PreemptableState.""" from flexbe_core.core.lockable_state import LockableState +from flexbe_core.core.state import State from flexbe_core.core.topics import Topics from flexbe_core.logger import Logger @@ -68,7 +69,7 @@ def _preemptable_execute(self, *args, **kwargs): if not self._is_controlled: Logger.localinfo('Behavior will be preempted') self._force_transition = True - return self._preempted_name + return State._preempted_name return self.__execute(*args, **kwargs) diff --git a/flexbe_core/flexbe_core/core/preemptable_state_machine.py b/flexbe_core/flexbe_core/core/preemptable_state_machine.py index 86f4318..684d942 100644 --- a/flexbe_core/flexbe_core/core/preemptable_state_machine.py +++ b/flexbe_core/flexbe_core/core/preemptable_state_machine.py @@ -34,6 +34,7 @@ from flexbe_core.core.lockable_state_machine import LockableStateMachine from flexbe_core.core.preemptable_state import PreemptableState +from flexbe_core.core.state import State from flexbe_core.core.topics import Topics from flexbe_core.logger import Logger @@ -51,8 +52,6 @@ class PreemptableStateMachine(LockableStateMachine): If preempted, the state machine will return the outcome preempted. """ - _preempted_name = 'preempted' - def __init__(self, *args, **kwargs): """Initialize instance.""" super().__init__(*args, **kwargs) @@ -75,12 +74,12 @@ def _preempt_cb(self, msg): @staticmethod def add(label, state, transitions=None, remapping=None): """Add state to SM.""" - transitions[PreemptableState._preempted_name] = PreemptableStateMachine._preempted_name + transitions[State._preempted_name] = State._preempted_name LockableStateMachine.add(label, state, transitions, remapping) @property def _valid_targets(self): - return super()._valid_targets + [PreemptableStateMachine._preempted_name] + return super()._valid_targets + [State._preempted_name] def spin(self, userdata=None, rclpy_context=None): """Spin the execute loop for preemptable portion.""" @@ -105,7 +104,7 @@ def spin(self, userdata=None, rclpy_context=None): f"cmd='{command_msg.target}' ({command_msg.outcome}) - toss it!") self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, CommandFeedback(command='transition', - args=['invalid', command_msg.target])) + args=['invalid', f'{command_msg.target}'])) self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Store the information for safely passing to heartbeat thread diff --git a/flexbe_core/flexbe_core/core/state.py b/flexbe_core/flexbe_core/core/state.py index a655dec..080a9c6 100644 --- a/flexbe_core/flexbe_core/core/state.py +++ b/flexbe_core/flexbe_core/core/state.py @@ -55,10 +55,11 @@ def __init__(self, *args, **kwargs): # properties of instances of a state machine self._name = None self._parent = None - self._state_id = None # Assigned with structure after all states added to behavior + self._state_id = -1 # Assigned with structure after all states added to behavior self._inner_sync_request = False # Any state can generate request, but should be rare self._type = 0 # Basic states are type 0, containers have non-zero type self._entering = True + self._exited = False # State has exited since last on_enter def __str__(self): """Return name of this state.""" diff --git a/flexbe_core/flexbe_core/core/state_machine.py b/flexbe_core/flexbe_core/core/state_machine.py index 1a52e16..230f4cc 100644 --- a/flexbe_core/flexbe_core/core/state_machine.py +++ b/flexbe_core/flexbe_core/core/state_machine.py @@ -139,6 +139,7 @@ def execute(self, userdata): if outcome: # Exit this statemachine self.on_exit(self._userdata) + self._publish_outcome(outcome) return outcome @@ -146,12 +147,13 @@ def on_enter(self, userdata): """Call on entering the state machine.""" self.assert_consistent_transitions() self._entering = False + self._exited = False self._current_state = self.initial_state self._current_state._entering = True # Force entering action self._userdata = userdata if userdata is not None else UserData() self._userdata(add_from=self._own_userdata) Logger.localinfo(f"Entering StateMachine '{self.name}' of '{self.path}' " - f"({self._state_id}) initial state='{self._current_state.name}'") + f"({self.state_id}) initial state='{self._current_state.name}' ({self.__class__.__name__})") def _execute_current_state(self): """Execute the currently active state in this SM.""" diff --git a/flexbe_core/flexbe_core/core/state_map.py b/flexbe_core/flexbe_core/core/state_map.py index 436fe2f..b5bf48a 100644 --- a/flexbe_core/flexbe_core/core/state_map.py +++ b/flexbe_core/flexbe_core/core/state_map.py @@ -38,7 +38,9 @@ class StateMap: __HASH_BITS = 31 # Keep in signed 32-bit range # 2**31 - 1 - 256 - __HASH_MASK = 0x7FFFFF00 # 23-bits keeps in signed range, and allows for 256 output encoding + __HASH_MASK = 0x7FFFFF00 # 23-bits keeps in signed range, and allows for 254 outputs to be encode + + _MAX_OUTCOME = 254 # Hash function adds one, so this is the number of outcomes including preempted def __init__(self): self._state_map = {} @@ -57,6 +59,11 @@ def __getitem__(self, index): else: return None + @property + def items(self): + """List key-value pairs (index, path).""" + return [(key, state.path) for key, state in self._state_map.items()] + @classmethod def _hash_path(cls, path): """ @@ -87,23 +94,23 @@ def get_path_hash(self, path): def add_state(self, path, state): """Define state id hash and store in map to state instance.""" hash_path = path - if state._state_id is None or state._state_id == -1: + if state.state_id is None or state.state_id == -1: state._state_id = self._hash_path(hash_path) collisions = 0 - while state._state_id in self._state_map: + while state.state_id in self._state_map: # Collision with existing state detected. Define a new state_id by extending path. collisions += 1 if collisions > 20: raise KeyError(f"Unable to avoid collisions in StateMap with '{path}'") hash_path += path state._state_id = self._hash_path(hash_path) - self._state_map[state._state_id] = state + self._state_map[state.state_id] = state self._num_collision_processed += collisions else: - if state._state_id in self._state_map: - Logger.error(f"State '{path}' : id={state._state_id} is already in map!") + if state.state_id in self._state_map: + Logger.error(f"State '{path}' : id={state.state_id} is already in map!") raise KeyError(f"Existing state in StateMap with '{path}'") - self._state_map[state._state_id] = state + self._state_map[state.state_id] = state def get_state(self, state_id): """Return reference to state given id.""" @@ -135,4 +142,4 @@ def unhash(cls, hash_code): @classmethod def hash(cls, state, outcome_index): """Convert state id and outcome to hashed identifier for outcome reports.""" - return state._state_id + 1 + outcome_index + return state.state_id + 1 + outcome_index diff --git a/flexbe_core/flexbe_core/core/topics.py b/flexbe_core/flexbe_core/core/topics.py index 3dfe7ad..ff25646 100644 --- a/flexbe_core/flexbe_core/core/topics.py +++ b/flexbe_core/flexbe_core/core/topics.py @@ -32,8 +32,9 @@ from flexbe_msgs.msg import BEStatus, BehaviorLog, BehaviorRequest, BehaviorSelection, BehaviorSync from flexbe_msgs.msg import CommandFeedback, ContainerStructure from flexbe_msgs.msg import OutcomeRequest +from flexbe_msgs.msg import StateMapMsg -from std_msgs.msg import Bool, Empty, String, UInt32, UInt8 +from std_msgs.msg import Bool, Empty, Int32, String, UInt32, UInt8 class Topics: @@ -47,7 +48,7 @@ class Topics: _CMD_LOCK_TOPIC = 'flexbe/command/lock' # OCS request to lock state _CMD_PAUSE_TOPIC = 'flexbe/command/pause' # OCS request to pause execution _CMD_PREEMPT_TOPIC = 'flexbe/command/preempt' # OCS request behavior preempt - _CMD_REPEAT_TOPIC = 'flexbe/command/repeat' # OCS request to repeat execution of state + _CMD_REPEAT_TOPIC = 'flexbe/command/repeat' # OCS request to repeat execution of a single EventState implementation _CMD_SYNC_TOPIC = 'flexbe/command/sync' # OCS request synchronization _CMD_TRANSITION_TOPIC = 'flexbe/command/transition' # OCS request transition _CMD_UNLOCK_TOPIC = 'flexbe/command/unlock' # OCS request to unlock state @@ -65,23 +66,25 @@ class Topics: _REQUEST_STRUCTURE_TOPIC = 'flexbe/request_mirror_structure' # Request state machine structure from onboard _START_BEHAVIOR_TOPIC = 'flexbe/start_behavior' # OCS or launcher command to start behavior _STATE_LOGGER_TOPIC = 'flexbe/state_logger' + _STATE_MAP_TOPIC = 'flexbe/state_map' # Mapping of state id hash codes to state path from onboard + _STATE_MAP_OCS_TOPIC = 'flexbe/mirror/state_map' # Mapping of state id hash codes to state path from mirror (should be same) _UI_VERSION_TOPIC = 'flexbe/ui_version' # OCS Version topic # Action interfaces _EXECUTE_BEHAVIOR_ACTION = 'flexbe/execute_behavior' _topic_types = {_BEHAVIOR_LOGGING_TOPIC: BehaviorLog, - _BEHAVIOR_UPDATE_TOPIC: String, + _BEHAVIOR_UPDATE_TOPIC: Int32, _CMD_ATTACH_TOPIC: UInt8, _CMD_AUTONOMY_TOPIC: UInt8, _CMD_FEEDBACK_TOPIC: CommandFeedback, - _CMD_LOCK_TOPIC: String, + _CMD_LOCK_TOPIC: Int32, _CMD_PAUSE_TOPIC: Bool, _CMD_PREEMPT_TOPIC: Empty, _CMD_REPEAT_TOPIC: Empty, _CMD_SYNC_TOPIC: Empty, _CMD_TRANSITION_TOPIC: OutcomeRequest, - _CMD_UNLOCK_TOPIC: String, + _CMD_UNLOCK_TOPIC: Int32, _DEBUG_TOPIC: String, _EXECUTE_BEHAVIOR_ACTION: BehaviorExecution, _LAUNCHER_HEARTBEAT_TOPIC: UInt32, @@ -96,6 +99,8 @@ class Topics: _REQUEST_BEHAVIOR_TOPIC: BehaviorRequest, _REQUEST_STRUCTURE_TOPIC: UInt32, _START_BEHAVIOR_TOPIC: BehaviorSelection, + _STATE_MAP_TOPIC: StateMapMsg, + _STATE_MAP_OCS_TOPIC: StateMapMsg, _UI_VERSION_TOPIC: String } diff --git a/flexbe_core/test/test_core.py b/flexbe_core/test/test_core.py index f8ad0a6..7ad2fe7 100755 --- a/flexbe_core/test/test_core.py +++ b/flexbe_core/test/test_core.py @@ -34,9 +34,9 @@ import unittest from flexbe_core import ConcurrencyContainer, EventState, OperatableStateMachine, initialize_flexbe_core -from flexbe_core.core import PreemptableState -from flexbe_core.core.exceptions import StateMachineError -from flexbe_core.core.topics import Topics +from flexbe_core.core import PreemptableState, State +from flexbe_core.core import StateMachineError +from flexbe_core.core import Topics from flexbe_core.proxy import ProxySubscriberCached, shutdown_proxies from flexbe_msgs.msg import CommandFeedback, OutcomeRequest @@ -44,7 +44,7 @@ import rclpy from rclpy.executors import MultiThreadedExecutor -from std_msgs.msg import Bool, Empty, String, UInt32 +from std_msgs.msg import Bool, Empty, Int32, String, UInt32 class CoreTestState(EventState): @@ -96,6 +96,7 @@ class ConcurrencyTestState(CoreTestState): def __init__(self): super().__init__() + # def execute(self, userdata): # outcome = super().execute(userdata) # self._node.get_logger().info(' %s - ConcurrencyTestState execute' @@ -116,6 +117,10 @@ class TestCore(unittest.TestCase): test = 0 + __TIME_SLEEP = 0.025 # Sleep time for loops + __EXECUTE_TIMEOUT_SEC = 0.025 # Timeout in executor loops for spin once + __LOOP_COUNT = 50 # Number of times to execute loops for checking (total time ~ LOOP_COUNT*(TIME_SLEEP + TIMEOUT)) + def __init__(self, *args, **kwargs): """Initialize TestCore instance.""" super().__init__(*args, **kwargs) @@ -134,29 +139,29 @@ def setUp(self): initialize_flexbe_core(self.node) - time.sleep(0.1) + time.sleep(TestCore.__TIME_SLEEP) def tearDown(self): """Tear down the TestCore test.""" self.node.get_logger().info(' shutting down core test %d ... ' % (self.test)) - for _ in range(50): + for _ in range(int(0.25*TestCore.__LOOP_COUNT)): # Allow any lingering pub/sub to clear up - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.01) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) self.node.get_logger().info(' shutting down proxies in core test %d ... ' % (self.test)) shutdown_proxies() - time.sleep(0.1) + time.sleep(TestCore.__TIME_SLEEP) self.node.get_logger().info(' destroy node in core test %d ... ' % (self.test)) self.node.destroy_node() - time.sleep(0.1) + time.sleep(TestCore.__TIME_SLEEP) self.executor.shutdown() - time.sleep(0.1) + time.sleep(TestCore.__TIME_SLEEP) # Kill it with fire to make sure not stray published topics are available rclpy.shutdown(context=self.context) - time.sleep(0.2) + time.sleep(TestCore.__TIME_SLEEP*2) def _create(self): """Create the test.""" @@ -165,6 +170,7 @@ def _create(self): state._enable_ros_control() sm = OperatableStateMachine(outcomes=['done', 'error']) sm._state_id = 1024 + sm.set_name('top-level') with sm: OperatableStateMachine.add('subject', state, transitions={'done': 'done', 'error': 'error'}, @@ -172,26 +178,56 @@ def _create(self): return state, sm def _execute(self, state): - """Execute the test.""" + """Execute the state via container.""" # self.node.get_logger().info(' execute %s ... ' % (str(state.name))) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state.last_events = [] outcome = state.parent.execute(None) # self.node.get_logger().info(' outcome = %s ... ' % (str(outcome))) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) return outcome + def _reset_sm(self, sm, state): + """Reset the state machine for another pass.""" + if sm is not None: + sm._entering = True + sm.on_enter() + if state is not None: + state._entering = True + state.on_enter(None) + + def _execute_state(self, state): + """ + Execute the state. + + For this, you may need to call on_enter and on_exit. + """ + try: + # self.node.get_logger().info(' execute %s ... ' % (str(state.name))) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.05) + + state.last_events = [] + outcome = state.execute(None) + # self.node.get_logger().info(' outcome = %s ... ' % (str(outcome))) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) + return outcome + except Exception as exc: + self.node.get_logger().error(f" exception in state execute for '{state.name}' ... ") + self.node.get_logger().info(f"{exc}") + import traceback + self.node.get_logger().info(f"{traceback.format_exc()}") + def assertMessage(self, sub, topic, msg, timeout=1): """Check message.""" - for _ in range(int(timeout * 100)): - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + for _ in range(int(timeout * TestCore.__LOOP_COUNT)): + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) if sub.has_msg(topic): received = sub.get_last_msg(topic) sub.remove_last_msg(topic) break - time.sleep(0.1) + time.sleep(TestCore.__TIME_SLEEP) else: raise AssertionError('Did not receive message on topic %s, expected:\n%s' % (topic, str(msg))) @@ -214,26 +250,29 @@ def assertMessage(self, sub, topic, msg, timeout=1): def assertNoMessage(self, sub, topic, timeout=1): """Assert no message received.""" - for _ in range(int(timeout * 100)): - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + if not sub.has_msg(topic): + self.node.get_logger().info(f"Wait to verify no message arrives for '{topic}' ...") + for _ in range(int(timeout * TestCore.__LOOP_COUNT)): + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) if sub.has_msg(topic): received = sub.get_last_msg(topic) sub.remove_last_msg(topic) raise AssertionError('Should not receive message on topic %s, but got:\n%s' % (topic, str(received))) - time.sleep(0.1) + time.sleep(TestCore.__TIME_SLEEP) + self.node.get_logger().info(f" No message arrived for '{topic}' - good!") # Test Cases def test_event_state(self): """Test event state.""" self.node.get_logger().info('test_event_state ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state, sm = self._create() fb_topic = Topics._CMD_FEEDBACK_TOPIC - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) sub = ProxySubscriberCached({fb_topic: CommandFeedback}, inst_id=id(self)) time.sleep(0.2) @@ -284,7 +323,7 @@ def test_operatable_state(self): """Test operatable state.""" self.node.get_logger().info('test_operatable_state ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state, sm = self._create() self.node.get_logger().info('test_operatable_state - ProxySubscribe request ...') out_topic = Topics._OUTCOME_TOPIC @@ -293,53 +332,57 @@ def test_operatable_state(self): # wait for pub/sub end_time = time.time() + 1 while time.time() < end_time: - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) - time.sleep(0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) + time.sleep(TestCore.__TIME_SLEEP) + + state.on_enter(None) # return outcome in full autonomy, no request - self.node.get_logger().info('test_operatable_state - request outcome on full autonomy, no request ...') + self.node.get_logger().info(f'test_operatable_state - autonomy level={OperatableStateMachine.autonomy_level} ' + f'request outcome on full autonomy, no request ...') state.result = 'error' - self._execute(state) + self._execute_state(state) # executing single state to we only get the state outputs self.assertNoMessage(sub, req_topic) - self.assertMessage(sub, out_topic, UInt32(data=(2 + state._state_id))) # Test presumes outcome = 1 + offset + self.assertMessage(sub, out_topic, UInt32(data=(2 + state.state_id))) # Test presumes outcome = 1 + offset # request outcome on same autonomy and clear request on loopback self.node.get_logger().info('test_operatable_state - request outcome on autonomy level=2 ...') + self._reset_sm(sm, state) OperatableStateMachine.autonomy_level = 2 - self._execute(state) + self._execute_state(state) # executing single state to we only get the state outputs self.assertNoMessage(sub, out_topic) - self.assertMessage(sub, req_topic, OutcomeRequest(outcome=1, target='/subject')) + self.assertMessage(sub, req_topic, OutcomeRequest(outcome=1, target=CoreTestState._set_state_id)) state.result = None - self._execute(state) - self.assertMessage(sub, req_topic, OutcomeRequest(outcome=255, target='/subject')) + self._execute_state(state) # executing single state to we only get the state outputs + self.assertMessage(sub, req_topic, OutcomeRequest(outcome=255, target=CoreTestState._set_state_id)) # still return other outcomes self.node.get_logger().info('test_operatable_state - still return other outcomes') state.result = 'done' - self._execute(state) + self._execute_state(state) # executing single state to we only get the state outputs self.assertNoMessage(sub, req_topic) - self.assertMessage(sub, out_topic, UInt32(data=(1 + state._state_id))) # Test presumes outcome = 0 + offset + self.assertMessage(sub, out_topic, UInt32(data=(1 + state.state_id))) # Test presumes outcome = 0 + offset # request outcome on lower autonomy, return outcome after level increase self.node.get_logger().info('test_operatable_state - lower autonomy level=0') OperatableStateMachine.autonomy_level = 0 - self._execute(state) + self._execute_state(state) # executing single state to we only get the state outputs self.assertNoMessage(sub, out_topic) - self.assertMessage(sub, req_topic, OutcomeRequest(outcome=0, target='/subject')) + self.assertMessage(sub, req_topic, OutcomeRequest(outcome=0, target=CoreTestState._set_state_id)) OperatableStateMachine.autonomy_level = 3 self.node.get_logger().info('test_operatable_state -autonomy level=3') - self._execute(state) - self.assertMessage(sub, out_topic, UInt32(data=(1 + state._state_id))) # Test presumes outcome = 0 + offset + self._execute_state(state) + self.assertMessage(sub, out_topic, UInt32(data=(1 + state.state_id))) # Test presumes outcome = 0 + offset self.node.get_logger().info('test_operatable_state - OK! ') def test_preemptable_state(self): """Test preemptable state.""" self.node.get_logger().info('test_preemptable_state ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state, sm = self._create() fb_topic = Topics._CMD_FEEDBACK_TOPIC @@ -349,15 +392,15 @@ def test_preemptable_state(self): # wait for pub/sub end_time = time.time() + 1 while time.time() < end_time: - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) - time.sleep(0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) + time.sleep(TestCore.__TIME_SLEEP) # time.sleep(0.2) # preempt when trigger variable is set PreemptableState.preempt = True self.node.get_logger().info('test_preemptable_state - preempt = True ...') outcome = self._execute(state) - self.assertEqual(outcome, PreemptableState._preempted_name) + self.assertEqual(outcome, State._preempted_name) self.assertRaises(StateMachineError, lambda: state.parent.current_state) self.node.get_logger().info('test_preemptable_state - preempt = False ...') @@ -370,7 +413,7 @@ def test_preemptable_state(self): state._sub._callback(Empty(), Topics._CMD_PREEMPT_TOPIC) outcome = self._execute(state) - self.assertEqual(outcome, PreemptableState._preempted_name) + self.assertEqual(outcome, State._preempted_name) self.assertRaises(StateMachineError, lambda: state.parent.current_state) self.assertMessage(sub, fb_topic, CommandFeedback(command='preempt')) @@ -381,7 +424,7 @@ def test_lockable_state(self): """Test lockable state.""" self.node.get_logger().info('test_lockable_state ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state, sm = self._create() fb_topic = Topics._CMD_FEEDBACK_TOPIC sub = ProxySubscriberCached({fb_topic: CommandFeedback}, inst_id=id(self)) @@ -389,55 +432,55 @@ def test_lockable_state(self): # lock and unlock as commanded, return outcome after unlock self.node.get_logger().info(' test lock on command ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) - state._sub._callback(String(data='/subject'), Topics._CMD_LOCK_TOPIC) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) + state._sub._callback(Int32(data=state.state_id), Topics._CMD_LOCK_TOPIC) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state.result = 'done' self.node.get_logger().info(' execute state after lock ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) outcome = self._execute(state) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) self.node.get_logger().info(' check results of test lock after execute ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) self.assertIsNone(outcome) self.assertTrue(state._locked) - self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/subject', '/subject'])) + self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=[f'{state.state_id}', f'{state.state_id}'])) state.result = None self.node.get_logger().info(' test unlock on command ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) - state._sub._callback(String(data='/subject'), Topics._CMD_UNLOCK_TOPIC) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) + state._sub._callback(Int32(data=state.state_id), Topics._CMD_UNLOCK_TOPIC) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) outcome = self._execute(state) self.assertEqual(outcome, 'done') - self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['/subject', '/subject'])) + self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=[f'{state.state_id}', f'{state.state_id}'])) # lock and unlock without target self.node.get_logger().info(' test lock and unlock without target ... ') - state._sub._callback(String(data=''), Topics._CMD_LOCK_TOPIC) + state._sub._callback(Int32(data=0), Topics._CMD_LOCK_TOPIC) state.result = 'done' outcome = self._execute(state) self.assertIsNone(outcome) - self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/subject', '/subject'])) - state._sub._callback(String(data=''), Topics._CMD_UNLOCK_TOPIC) + self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=[f'{state.state_id}', f'{state.state_id}'])) + state._sub._callback(Int32(data=0), Topics._CMD_UNLOCK_TOPIC) outcome = self._execute(state) self.assertEqual(outcome, 'done') - self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['/subject', '/subject'])) + self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=[f'{state.state_id}', f'{state.state_id}'])) # reject invalid lock command self.node.get_logger().info(' test reject invalid lock command ... ') - state._sub._callback(String(data='/invalid'), Topics._CMD_LOCK_TOPIC) + state._sub._callback(Int32(data=12345678), Topics._CMD_LOCK_TOPIC) # give invalid state id outcome = self._execute(state) self.assertEqual(outcome, 'done') - self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['/invalid', ''])) + self.assertMessage(sub, fb_topic, CommandFeedback(command='lock', args=['12345678', '-1'])) # reject generic unlock command when not locked self.node.get_logger().info(' test reject invalid unlock when not locked command ... ') - state._sub._callback(String(data=''), Topics._CMD_UNLOCK_TOPIC) + state._sub._callback(Int32(data=0), Topics._CMD_UNLOCK_TOPIC) self._execute(state) - self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['', ''])) + self.assertMessage(sub, fb_topic, CommandFeedback(command='unlock', args=['0', '-1'])) # do not transition out of locked container self.node.get_logger().info(' test do not transition out of locked container ... ') @@ -454,20 +497,22 @@ def test_manually_transitionable_state(self): """Test manually transitionable state.""" self.node.get_logger().info('test_manually_transitionable_state ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state, sm = self._create() fb_topic = Topics._CMD_FEEDBACK_TOPIC sub = ProxySubscriberCached({fb_topic: CommandFeedback}, inst_id=id(self)) time.sleep(0.2) # return requested outcome - state._sub._callback(OutcomeRequest(target='subject', outcome=1), Topics._CMD_TRANSITION_TOPIC) + state._sub._callback(OutcomeRequest(target=CoreTestState._set_state_id, outcome=1), Topics._CMD_TRANSITION_TOPIC) outcome = self._execute(state) self.assertEqual(outcome, 'error') - self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['subject', 'subject'])) + self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', + args=[f'{CoreTestState._set_state_id}', + f'{CoreTestState._set_state_id}'])) # reject outcome request for different target - state._sub._callback(OutcomeRequest(target='invalid', outcome=1), Topics._CMD_TRANSITION_TOPIC) + state._sub._callback(OutcomeRequest(target=42, outcome=1), Topics._CMD_TRANSITION_TOPIC) outcome = self._execute(state) self.assertIsNone(outcome) # This state won't handle the transition request, so no message is expected @@ -478,7 +523,7 @@ def test_cross_combinations(self): """Test cross combinations.""" self.node.get_logger().info('test_cross_combinations ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) state, sm = self._create() # manual transition works on low autonomy @@ -486,7 +531,7 @@ def test_cross_combinations(self): state.result = 'error' outcome = self._execute(state) self.assertIsNone(outcome) - state._sub._callback(OutcomeRequest(target='subject', outcome=0), Topics._CMD_TRANSITION_TOPIC) + state._sub._callback(OutcomeRequest(target=CoreTestState._set_state_id, outcome=0), Topics._CMD_TRANSITION_TOPIC) outcome = self._execute(state) self.assertEqual(outcome, 'done') OperatableStateMachine.autonomy_level = 3 @@ -494,13 +539,13 @@ def test_cross_combinations(self): # manual transition blocked by lock self.node.get_logger().info('test_cross_combinations - check manual transition blocked by lock ... ') - state._sub._callback(String(data='/subject'), Topics._CMD_LOCK_TOPIC) + state._sub._callback(Int32(data=state.state_id), Topics._CMD_LOCK_TOPIC) outcome = self._execute(state) self.assertIsNone(outcome) - state._sub._callback(OutcomeRequest(target='subject', outcome=1), Topics._CMD_TRANSITION_TOPIC) + state._sub._callback(OutcomeRequest(target=CoreTestState._set_state_id, outcome=1), Topics._CMD_TRANSITION_TOPIC) outcome = self._execute(state) self.assertIsNone(outcome) - state._sub._callback(String(data='/subject'), Topics._CMD_UNLOCK_TOPIC) + state._sub._callback(Int32(data=state.state_id), Topics._CMD_UNLOCK_TOPIC) outcome = self._execute(state) self.assertEqual(outcome, 'error') @@ -512,21 +557,21 @@ def test_cross_combinations(self): self.assertIsNone(outcome) state._sub._callback(Empty(), Topics._CMD_PREEMPT_TOPIC) outcome = self._execute(state) - self.assertEqual(outcome, PreemptableState._preempted_name) + self.assertEqual(outcome, State._preempted_name) PreemptableState.preempt = False OperatableStateMachine.autonomy_level = 3 state.result = None # preempt also works when locked self.node.get_logger().info('test_cross_combinations - verify preempt works when locked ... ') - state._sub._callback(String(data='/subject'), Topics._CMD_LOCK_TOPIC) + state._sub._callback(Int32(data=state.state_id), Topics._CMD_LOCK_TOPIC) outcome = self._execute(state) self.assertIsNone(outcome) state._sub._callback(Empty(), Topics._CMD_PREEMPT_TOPIC) outcome = self._execute(state) - self.assertEqual(outcome, PreemptableState._preempted_name) + self.assertEqual(outcome, State._preempted_name) PreemptableState.preempt = False - state._sub._callback(String(data='/subject'), Topics._CMD_UNLOCK_TOPIC) + state._sub._callback(Int32(data=state.state_id), Topics._CMD_UNLOCK_TOPIC) outcome = self._execute(state) self.assertIsNone(outcome) self.node.get_logger().info('test_cross_combinations - OK! ') @@ -534,7 +579,7 @@ def test_cross_combinations(self): def test_concurrency_container(self): """Test CC.""" self.node.get_logger().info('test_concurrency_container ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) cc = ConcurrencyContainer(outcomes=['done', 'error'], conditions=[ ('error', [('main', 'error')]), @@ -555,7 +600,7 @@ def test_concurrency_container(self): autonomy={'done': 1, 'error': 2}) # self.node.get_logger().info(' after setting up OSM - call execute CC ... ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) # all states are called with their correct rate cc.execute(None) @@ -668,7 +713,7 @@ def test_user_data(self): """Test user data.""" self.node.get_logger().info('test_user_data ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestCore.__EXECUTE_TIMEOUT_SEC) class TestUserdataState(EventState): """Local Test state.""" @@ -677,6 +722,8 @@ def __init__(self, out_content='test_data'): super(TestUserdataState, self).__init__(outcomes=['done'], input_keys=['data_in'], output_keys=['data_out']) self.data = None self._out_content = out_content + CoreTestState._set_state_id = CoreTestState._set_state_id + 1 + self._state_id = CoreTestState._set_state_id def execute(self, userdata): self._node.get_logger().warn('\033[0m%s\n%s' % (self.path, str(userdata))) # log for manual inspection @@ -689,26 +736,35 @@ def execute(self, userdata): inner_sm.userdata.own = 'own_data' with inner_sm: OperatableStateMachine.add('own_state', TestUserdataState('inner_data'), transitions={'done': 'outside_state'}, - remapping={'data_in': 'own', 'data_out': 'sm_out'}) + remapping={'data_in': 'own', 'data_out': 'sm_out'}, + autonomy={'done': 0}) OperatableStateMachine.add('outside_state', TestUserdataState(), transitions={'done': 'internal_state'}, - remapping={'data_in': 'sm_in', 'data_out': 'data_in'}) + remapping={'data_in': 'sm_in', 'data_out': 'data_in'}, + autonomy={'done': 0}) OperatableStateMachine.add('internal_state', TestUserdataState(), transitions={'done': 'done'}, - remapping={}) + remapping={}, + autonomy={'done': 0}) sm = OperatableStateMachine(outcomes=['done']) sm._state_id = 8192 sm.userdata.outside = 'outside_data' + sm.set_name("outer_sm") # No one sets this name, while .add sets names of inner states with sm: OperatableStateMachine.add('before_state', TestUserdataState(), transitions={'done': 'inner_sm'}, - remapping={'data_in': 'outside'}) + remapping={'data_in': 'outside'}, + autonomy={'done': 0}) OperatableStateMachine.add('inner_sm', inner_sm, transitions={'done': 'after_state'}, - remapping={'sm_in': 'outside'}) + remapping={'sm_in': 'outside'}, + autonomy={'done': 0}) OperatableStateMachine.add('after_state', TestUserdataState('last_data'), transitions={'done': 'modify_state'}, - remapping={'data_in': 'sm_out'}) + remapping={'data_in': 'sm_out'}, + autonomy={'done': 0}) OperatableStateMachine.add('modify_state', TestUserdataState(), transitions={'done': 'final_state'}, - remapping={'data_out': 'outside', 'data_in': 'outside'}) + remapping={'data_out': 'outside', 'data_in': 'outside'}, + autonomy={'done': 0}) OperatableStateMachine.add('final_state', TestUserdataState(), transitions={'done': 'done'}, - remapping={'data_in': 'data_out'}) + remapping={'data_in': 'data_out'}, + autonomy={'done': 0}) # can pass userdata to state and get it from state sm.execute(None) diff --git a/flexbe_core/test/test_exceptions.py b/flexbe_core/test/test_exceptions.py index 63432b9..e67417a 100755 --- a/flexbe_core/test/test_exceptions.py +++ b/flexbe_core/test/test_exceptions.py @@ -45,6 +45,8 @@ class TestExceptions(unittest.TestCase): """Test FlexBE Exception handling.""" test = 0 + __EXECUTE_TIMEOUT_SEC=0.025 + __TIME_SLEEP = 0.05 # Sleep time for loops def __init__(self, *args, **kwargs): """Initialize TestExceptions instance.""" @@ -69,24 +71,24 @@ def tearDown(self): self.node.get_logger().info(' shutting down proxies in core test %d ... ' % (self.test)) shutdown_proxies() - time.sleep(0.1) + time.sleep(TestExceptions.__TIME_SLEEP) self.node.get_logger().info(' destroy node in core test %d ... ' % (self.test)) self.node.destroy_node() - time.sleep(0.1) + time.sleep(TestExceptions.__TIME_SLEEP) self.executor.shutdown() - time.sleep(0.1) + time.sleep(TestExceptions.__TIME_SLEEP) # Kill it with fire to make sure not stray published topics are available rclpy.shutdown(context=self.context) - time.sleep(0.2) + time.sleep(TestExceptions.__TIME_SLEEP*2) def test_invalid_outcome(self): """Test invalid outcome.""" self.node.get_logger().info('test_invalid_outcome ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptions.__EXECUTE_TIMEOUT_SEC) class ReturnInvalidOutcomeState(EventState): """Local Test state definition.""" @@ -101,7 +103,9 @@ def execute(self, userdata): try: sm = OperatableStateMachine(outcomes=['done']) with sm: - OperatableStateMachine.add('state', ReturnInvalidOutcomeState(), transitions={'done': 'done'}) + OperatableStateMachine.add('state', ReturnInvalidOutcomeState(), + transitions={'done': 'done'}, + autonomy={'done': 0}) outcome = sm.execute(None) except StateError as exc: self.node.get_logger().info(f" sm had expected exception '{exc}'") @@ -114,7 +118,7 @@ def test_invalid_transition(self): """Test invalid transition.""" self.node.get_logger().info('test_invalid_transition ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptions.__EXECUTE_TIMEOUT_SEC) class ReturnDoneState(EventState): """Local Test state definition.""" @@ -127,7 +131,9 @@ def execute(self, userdata): inner_sm = OperatableStateMachine(outcomes=['done']) with inner_sm: - OperatableStateMachine.add('state', ReturnDoneState(), transitions={'done': 'invalid'}) + OperatableStateMachine.add('state', ReturnDoneState(), + transitions={'done': 'invalid'}, + autonomy={'done': 0}) sm = OperatableStateMachine(outcomes=['done']) with sm: OperatableStateMachine.add('inner', inner_sm, transitions={'done': 'done'}) @@ -146,7 +152,7 @@ def test_invalid_userdata_input(self): """Test invalid user data.""" self.node.get_logger().info('test_invalid_userdata ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptions.__EXECUTE_TIMEOUT_SEC) class AccessInvalidInputState(EventState): """Local Test state definition.""" @@ -160,7 +166,9 @@ def execute(self, userdata): sm = OperatableStateMachine(outcomes=['done']) with sm: - OperatableStateMachine.add('state', AccessInvalidInputState(), transitions={'done': 'done'}) + OperatableStateMachine.add('state', AccessInvalidInputState(), + transitions={'done': 'done'}, + autonomy={'done': 0}) outcome = None try: @@ -176,7 +184,7 @@ def test_invalid_userdata_output(self): """Test invalid userdata output.""" self.node.get_logger().info('test_invalid_userdata_output ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptions.__EXECUTE_TIMEOUT_SEC) class SetInvalidOutputState(EventState): """Local Test state definition.""" @@ -206,7 +214,7 @@ def test_missing_userdata(self): """Test missing userdata.""" self.node.get_logger().info('test_missing_userdata ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptions.__EXECUTE_TIMEOUT_SEC) class AccessValidInputState(EventState): """Local Test state definition.""" @@ -236,7 +244,7 @@ def test_modify_input_key(self): """Test modify input key.""" self.node.get_logger().info('test_modify_input_key ...! ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptions.__EXECUTE_TIMEOUT_SEC) class ModifyInputKeyState(EventState): """Local Test state definition.""" diff --git a/flexbe_core/test/test_exceptions_spin.py b/flexbe_core/test/test_exceptions_spin.py index adde1f5..cc087f3 100644 --- a/flexbe_core/test/test_exceptions_spin.py +++ b/flexbe_core/test/test_exceptions_spin.py @@ -45,6 +45,8 @@ class TestExceptionsSpin(unittest.TestCase): """Test FlexBE Exception handling.""" test = 0 + __EXECUTE_TIMEOUT_SEC=0.025 + __TIME_SLEEP = 0.05 # Sleep time for loops def __init__(self, *args, **kwargs): """Initialize TestExceptionsSpin instance.""" @@ -67,34 +69,35 @@ def setUp(self): def tearDown(self): """Tear down the TestExceptionsSpin test.""" self.node.get_logger().info(' shutting down exceptions test %d (%d) ... ' % (self.test, self.context.ok())) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=2*TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) self.node.get_logger().info(' shutting down proxies in core test %d ... ' % (self.test)) shutdown_proxies() - time.sleep(0.1) + time.sleep(TestExceptionsSpin.__TIME_SLEEP) self.node.get_logger().info(' destroy node in core test %d ... ' % (self.test)) self.node.destroy_node() - time.sleep(0.1) + time.sleep(TestExceptionsSpin.__TIME_SLEEP) self.executor.shutdown() - time.sleep(0.1) + time.sleep(TestExceptionsSpin.__TIME_SLEEP) # Kill it with fire to make sure not stray published topics are available rclpy.shutdown(context=self.context) - time.sleep(0.2) + time.sleep(TestExceptionsSpin.__TIME_SLEEP*2) def test_invalid_outcome(self): """Test invalid outcome.""" self.node.get_logger().info('test_invalid_outcome ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) class ReturnInvalidOutcomeState(EventState): """Local Test state definition.""" def __init__(self): super().__init__(outcomes=['done']) + self._state_id = 256 def execute(self, userdata): self._node.get_logger().info('test_invalid_outcome - execute with invalid outcome ...') @@ -105,6 +108,8 @@ def on_enter(self, userdata): super().on_enter(userdata) sm = OperatableStateMachine(name='test_invalid_outcome', outcomes=['done']) + sm._state_id = 1024 + with sm: OperatableStateMachine.add('state', ReturnInvalidOutcomeState(), transitions={'done': 'done'}) outcome = sm.spin(None, rclpy_context=self.context) @@ -117,21 +122,24 @@ def test_invalid_transition(self): """Test invalid transition.""" self.node.get_logger().info('test_invalid_transition ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) class ReturnDoneState(EventState): """Local Test state definition.""" def __init__(self): super().__init__(outcomes=['done']) + self._state_id = 256 def execute(self, userdata): return 'done' inner_sm = OperatableStateMachine(outcomes=['done']) + inner_sm._state_id = 512 with inner_sm: OperatableStateMachine.add('state', ReturnDoneState(), transitions={'done': 'invalid'}) sm = OperatableStateMachine(name='test_invalid_transition', outcomes=['done']) + sm._state_id = 1024 with sm: OperatableStateMachine.add('inner', inner_sm, transitions={'done': 'done'}) @@ -145,19 +153,21 @@ def test_invalid_userdata_input(self): """Test invalid user data.""" self.node.get_logger().info('test_invalid_userdata ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) class AccessInvalidInputState(EventState): """Local Test state definition.""" def __init__(self): super().__init__(outcomes=['done'], input_keys=['input']) + self._state_id = 256 def execute(self, userdata): print(userdata.invalid) return 'done' sm = OperatableStateMachine(name='test_invalid_userdata_input', outcomes=['done']) + sm._state_id = 1023 with sm: OperatableStateMachine.add('state', AccessInvalidInputState(), transitions={'done': 'done'}) @@ -171,19 +181,21 @@ def test_invalid_userdata_output(self): """Test invalid userdata output.""" self.node.get_logger().info('test_invalid_userdata_output ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) class SetInvalidOutputState(EventState): """Local Test state definition.""" def __init__(self): super().__init__(outcomes=['done'], output_keys=['output']) + self._state_id = 256 def execute(self, userdata): userdata.invalid = False return 'done' sm = OperatableStateMachine(name='test_invalid_userdata_output', outcomes=['done']) + sm._state_id = 1024 with sm: OperatableStateMachine.add('state', SetInvalidOutputState(), transitions={'done': 'done'}) @@ -197,19 +209,21 @@ def test_missing_userdata(self): """Test missing userdata.""" self.node.get_logger().info('test_missing_userdata ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) class AccessValidInputState(EventState): """Local Test state definition.""" def __init__(self): super().__init__(outcomes=['done'], input_keys=['missing']) + self._state_id = 256 def execute(self, userdata): print(userdata.missing) return 'done' sm = OperatableStateMachine(name='test_missing_userdata', outcomes=['done']) + sm._state_id = 1024 with sm: OperatableStateMachine.add('state', AccessValidInputState(), transitions={'done': 'done'}) @@ -223,19 +237,21 @@ def test_modify_input_key(self): """Test modify input key.""" self.node.get_logger().info('test_modify_input_key ...! ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestExceptionsSpin.__EXECUTE_TIMEOUT_SEC) class ModifyInputKeyState(EventState): """Local Test state definition.""" def __init__(self): super().__init__(outcomes=['done'], input_keys=['only_input']) + self._state_id = 256 def execute(self, userdata): userdata.only_input['new'] = 'not_allowed' return 'done' sm = OperatableStateMachine(name='test_modify_input_key', outcomes=['done']) + sm._state_id = 1024 sm.userdata.only_input = {'existing': 'is_allowed'} with sm: OperatableStateMachine.add('state', ModifyInputKeyState(), transitions={'done': 'done'}) diff --git a/flexbe_core/test/test_logger.py b/flexbe_core/test/test_logger.py index 49a93f5..a3b14e5 100644 --- a/flexbe_core/test/test_logger.py +++ b/flexbe_core/test/test_logger.py @@ -45,6 +45,8 @@ class TestLogger(unittest.TestCase): """Test FlexBE Logger handling.""" test = 0 + __EXECUTE_TIMEOUT_SEC=0.025 + __TIME_SLEEP = 0.05 # Sleep time for loops def __init__(self, *args, **kwargs): """Initialize TestLogger instance.""" @@ -64,22 +66,22 @@ def setUp(self): def tearDown(self): """Tear down the test.""" self.node.get_logger().info(' shutting down logger test %d (%d) ... ' % (self.test, self.context.ok())) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC) self.node.get_logger().info(' shutting down proxies in logger test %d ... ' % (self.test)) shutdown_proxies() - time.sleep(0.1) + time.sleep(TestLogger.__TIME_SLEEP) self.node.get_logger().info(' destroy node in core test %d ... ' % (self.test)) self.node.destroy_node() - time.sleep(0.1) + time.sleep(TestLogger.__TIME_SLEEP) self.executor.shutdown() - time.sleep(0.1) + time.sleep(TestLogger.__TIME_SLEEP) # Kill it with fire to make sure not stray published topics are available rclpy.shutdown(context=self.context) - time.sleep(0.2) + time.sleep(TestLogger.__TIME_SLEEP*2) def test_throttle_logger_one(self): """Test throttle logger one.""" @@ -88,7 +90,7 @@ def test_throttle_logger_one(self): self.node.declare_parameter('throttle_logging_clear_ratio', 0.25) initialize_flexbe_core(self.node) # Update the logger node - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC) class ThrottleSingleLog(EventState): """Local Test state definition.""" @@ -128,7 +130,7 @@ def test_throttle_logger_err_multi(self): self.node.declare_parameter('throttle_logging_clear_ratio', 0.35) initialize_flexbe_core(self.node) # Update the logger node - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC) class ThrottleMultiLog(EventState): """Local Test state definition.""" @@ -169,7 +171,7 @@ def test_throttle_logger_multiple_params(self): initialize_flexbe_core(self.node) # Update the logger node - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC) class ThrottleMultiLog(EventState): """Local Test state definition.""" @@ -200,7 +202,7 @@ def execute(self, userdata): while outcome is None: outcome = sm.execute(None) self.assertTrue(1 < len(Logger._last_logged) <= Logger.MAX_LAST_LOGGED_SIZE) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.001) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC*0.5) self.assertEqual(outcome, 'done') self.assertEqual(state_instance._trials, 0) @@ -214,7 +216,7 @@ def test_throttle_logger_multiple(self): self.node.declare_parameter('throttle_logging_clear_ratio', 0.22) initialize_flexbe_core(self.node) # Update the logger node - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC) class ThrottleMultiLog(EventState): """Local Test state definition.""" @@ -245,7 +247,7 @@ def execute(self, userdata): while outcome is None: outcome = sm.execute(None) self.assertTrue(1 < len(Logger._last_logged) <= Logger.MAX_LAST_LOGGED_SIZE) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.001) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestLogger.__EXECUTE_TIMEOUT_SEC*.5) self.assertEqual(outcome, 'done') self.assertEqual(state_instance._trials, 0) diff --git a/flexbe_core/test/test_proxies.py b/flexbe_core/test/test_proxies.py index 49f801a..560e337 100755 --- a/flexbe_core/test/test_proxies.py +++ b/flexbe_core/test/test_proxies.py @@ -54,6 +54,8 @@ class TestProxies(unittest.TestCase): """Test the FlexBE proxies.""" test = 0 + __EXECUTE_TIMEOUT_SEC=0.025 + __TIME_SLEEP = 0.05 # Sleep time for loops def __init__(self, *args, **kwargs): """Initialize TestProxies instance.""" @@ -76,28 +78,28 @@ def setUp(self): def tearDown(self): """Tear down the test.""" self.node.get_logger().info(' shutting down proxies test %d (%d) ... ' % (self.test, self.context.ok())) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.node.get_logger().info(' shutting down proxies in core test %d ... ' % (self.test)) shutdown_proxies() - time.sleep(0.1) + time.sleep(TestProxies.__TIME_SLEEP) self.node.get_logger().info(' destroy node in core test %d ... ' % (self.test)) self.node.destroy_node() - time.sleep(0.1) + time.sleep(TestProxies.__TIME_SLEEP) self.executor.shutdown() - time.sleep(0.1) + time.sleep(TestProxies.__TIME_SLEEP) # Kill it with fire to make sure not stray published topics are available rclpy.shutdown(context=self.context) - time.sleep(0.5) + time.sleep(TestProxies.__TIME_SLEEP*5) def test_publish_subscribe(self): """Test publish and subscribe.""" self.node.get_logger().info('test_publish_subscribe ...') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) ProxyPublisher.initialize(self.node) ProxySubscriberCached.initialize(self.node) @@ -106,23 +108,23 @@ def test_publish_subscribe(self): self.node.get_logger().info('test_publish_subscribe - define publishers ...') pub = ProxyPublisher({topic1: String}) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) pub = ProxyPublisher({topic2: String}) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.node.get_logger().info(' subscribe topic1 only ...') sub = ProxySubscriberCached({topic1: String}, inst_id=id(self)) - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertTrue(pub.is_available(topic1)) # cannot call wait given spin_once structure # self.assertTrue(pub.wait_for_any(topic1)) # self.assertFalse(pub.wait_for_any(topic2)) for _ in range(50): - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.01) - time.sleep(0.04) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) + time.sleep(TestProxies.__TIME_SLEEP) self.assertTrue(pub.number_of_subscribers(topic1) > 0) self.assertFalse(pub.number_of_subscribers(topic2) > 0) @@ -134,8 +136,8 @@ def test_publish_subscribe(self): self.node.get_logger().info(' subscribe topic2 ...') sub = ProxySubscriberCached({topic2: String}, inst_id=id(self)) for _ in range(50): - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.01) - time.sleep(0.05) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) + time.sleep(TestProxies.__TIME_SLEEP) self.assertTrue(pub.number_of_subscribers(topic1) > 0) self.assertTrue(pub.number_of_subscribers(topic2) > 0) @@ -157,7 +159,7 @@ def test_publish_subscribe(self): self.node.get_logger().info(' listen for two messages ...') end_time = time.time() + 5 while time.time() < end_time: - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertTrue(sub.has_msg(topic1)) self.assertEqual(sub.get_last_msg(topic1).data, '1') @@ -186,8 +188,8 @@ def test_subscribe_buffer(self): sub.enable_buffer(topic1) # No wait in this setup - self.assertTrue(pub.wait_for_any(topic1)) for _ in range(10): - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.01) - time.sleep(0.05) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) + time.sleep(TestProxies.__TIME_SLEEP) self.assertTrue(pub.number_of_subscribers(topic1) > 0) @@ -202,7 +204,7 @@ def test_subscribe_buffer(self): # make sure messages can be received end_time = time.time() + 3 while time.time() < end_time: - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertTrue(sub.has_msg(topic1)) self.assertTrue(sub.has_buffered(topic1)) @@ -215,7 +217,7 @@ def test_subscribe_buffer(self): # make sure message can be received end_time = time.time() + 3 while time.time() < end_time: - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertEqual(sub.get_from_buffer(topic1).data, '2') self.assertEqual(sub.get_from_buffer(topic1).data, '3') @@ -244,7 +246,7 @@ def server_callback(request, response): srv.call_async(topic1, Trigger.Request()) end_time = time.time() + 10 while time.time() < end_time: - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertTrue(srv.done(topic1)) @@ -265,7 +267,7 @@ def test_action_client(self): topic1 = '/action_1' def execute_cb(goal_handle): - time.sleep(0.1) + time.sleep(TestProxies.__TIME_SLEEP) if goal_handle.is_cancel_requested: goal_handle.canceled() return BehaviorExecution.Result() @@ -291,7 +293,7 @@ def execute_cb(goal_handle): while time.time() < end_time and not client.has_result(topic1): status = client.get_status(topic1) # self.node.get_logger().info(f' get status = {status} ') - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertTrue(client.is_active(topic1) or client.has_result(topic1)) self.assertTrue(client.has_result(topic1)) @@ -315,7 +317,7 @@ def execute_cb(goal_handle): # end_time = time.time() + 2 while not client.has_result(topic1): - rclpy.spin_once(self.node, executor=self.executor, timeout_sec=0.1) + rclpy.spin_once(self.node, executor=self.executor, timeout_sec=TestProxies.__EXECUTE_TIMEOUT_SEC) self.assertTrue(client.is_active(topic1) or client.has_result(topic1)) status = client.get_status(topic1) # self.node.get_logger().info(f' check status = {status} ') diff --git a/flexbe_mirror/flexbe_mirror/flexbe_mirror.py b/flexbe_mirror/flexbe_mirror/flexbe_mirror.py index c6a5881..fdce862 100644 --- a/flexbe_mirror/flexbe_mirror/flexbe_mirror.py +++ b/flexbe_mirror/flexbe_mirror/flexbe_mirror.py @@ -44,11 +44,11 @@ def set_thread_name(name): from flexbe_core import Logger, MIN_UI_VERSION, initialize_flexbe_core from flexbe_core.core import LockableStateMachine, OperatableStateMachine -from flexbe_core.core import PreemptableState, PreemptableStateMachine, StateMap +from flexbe_core.core import PreemptableState, State, StateMap from flexbe_core.core.topics import Topics from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached -from flexbe_msgs.msg import BEStatus, BehaviorSync, ContainerStructure +from flexbe_msgs.msg import BEStatus, BehaviorSync, ContainerStructure, StateMapMsg from rclpy.clock import Clock from rclpy.node import Node @@ -123,12 +123,14 @@ def __init__(self): # Use proxy publisher/subscriber for access in states # but just initialize here once for all - self._beh_update_pub = ProxyPublisher({Topics._BEHAVIOR_UPDATE_TOPIC: String}) + self._beh_update_pub = ProxyPublisher({Topics._BEHAVIOR_UPDATE_TOPIC: Int32}) self._outcome_sub = ProxySubscriberCached() self._outcome_sub.subscribe(Topics._OUTCOME_TOPIC, UInt32, inst_id=id(self)) self._outcome_sub.enable_buffer(Topics._OUTCOME_TOPIC) + self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_OCS_TOPIC, 2) + # no clean way to wait for publisher to be ready... Logger.loginfo('--> Mirror - setting up publishers and subscribers ...') threading.Event().wait(1.0) # Give publishers time to initialize @@ -143,10 +145,10 @@ def _version_callback(self, msg): vui = FlexbeMirror._parse_version(msg.data) vex = FlexbeMirror._parse_version(MIN_UI_VERSION) if vui < vex: - Logger.logwarn('FlexBE App needs to be updated!\n' + Logger.logwarn('FlexBE WebUI needs to be updated!\n' f'Mirror requires at least version {MIN_UI_VERSION}, ' f' but you have {msg.data}\n' - 'Please update the flexbe_app software.') + 'Please update the flexbe_webui software.') @staticmethod def _parse_version(v): @@ -155,7 +157,7 @@ def _parse_version(v): offset = 1 for n in reversed(v.split('.')): result += int(n) * offset - offset *= 100 + offset *= 1000 return result def get_elapsed_str(self, start_time): @@ -417,16 +419,16 @@ def _stop_mirror(self, msg, start_time): Logger.logwarn('Onboard behavior stop request (from sync)!') elif msg.code == BEStatus.FINISHED: Logger.loginfo('Onboard behavior finished successfully.') - self._beh_update_pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, String()) + self._beh_update_pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, Int32(data=-1)) elif msg.code == BEStatus.SWITCHING: self._starting_path = None Logger.loginfo('Onboard performing behavior switch.') elif msg.code == BEStatus.READY: Logger.loginfo('Onboard engine just started, stopping currently running mirror.') - self._beh_update_pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, String()) + self._beh_update_pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, Int32(data=-1)) else: Logger.logwarn('Onboard behavior failed!') - self._beh_update_pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, String()) + self._beh_update_pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, Int32(data=-1)) self._wait_stop_running(start_time) @@ -753,7 +755,7 @@ def _execute_mirror(self, start_time): Logger.loginfo('Starting mirror in state ' + self._starting_path) self._starting_path = None - result = PreemptableStateMachine._preempted_name + result = State._preempted_name try: result = self._sm.spin(start_time, self._state_map) Logger.localinfo(f"Mirror finished spin with result '{result}' after {self.get_elapsed_str(start_time)} s") @@ -767,7 +769,7 @@ def _execute_mirror(self, start_time): # so just try a simple print print('\n(_execute_mirror Traceback): Caught exception on preempt:\n%s' % str(exc), flush=True) print(traceback.format_exc().replace('%', '%%'), flush=True) - result = PreemptableStateMachine._preempted_name + result = State._preempted_name self._active_thread_start = None self._running = False @@ -788,8 +790,9 @@ def _mirror_state_machine(self, msg): self._add_node(msg, root) if self._sm: - Logger.localinfo(f"Constructed mirror for behavior id ='{self._sm.id}' - begin validation ...") + Logger.localinfo('---------------------------------') self._sm.id = msg.behavior_id + Logger.localinfo(f"Constructed mirror for behavior id ='{self._sm.id}' - begin validation ...") # verify checksums of all states for con_msg in msg.containers: if con_msg.path.find('/') != -1: @@ -801,6 +804,16 @@ def _mirror_state_machine(self, msg): f'({con_msg.state_id}) {con_msg.path}') else: raise KeyError(f'State id {con_msg.state_id} not found in {self._state_map}!') + + try: + state_ids, state_paths = list(zip(*self._state_map.items)) + state_map_msg = StateMapMsg(behavior_id=self._sm.id, + state_ids=state_ids, + state_paths=[path.replace('_mirror', '') for path in state_paths]) + self._state_map_pub.publish(state_map_msg) # Used by the WebUI + except Exception as exc: + Logger.localerr(f'Failed to publish state map: {exc}') + end = time.time() Logger.localinfo(f"Validated constructed mirror for behavior id ='{self._sm.id}' in {end - start} seconds !") return # success here @@ -809,9 +822,6 @@ def _mirror_state_machine(self, msg): except Exception as exc: Logger.localwarn(f"_mirror_statemachine Exception: '{type(exc)}' - {exc}") - # self.get_logger().warn(f"{traceback.format_exc().replace('%', '%%')}") - # print(f"\x1b[93m{self._state_map}", flush=True) - # print('------------------\x1b[0m', flush=True) if self._sm is not None: Logger.localwarn(f' destroy constructed SM id={self._sm.id} - failed validation!') self._sm.destroy() @@ -846,7 +856,7 @@ def _add_node(self, msg, path): else: sm = MirrorStateMachine(container_name, path, outcomes=sm_outcomes) - self._state_map.add_state(path, sm) + self._state_map.add_state(path, sm) # also calculates the state id given path assert sm.state_id == container.state_id, ('Failed to validate container state_id ' f'= {sm.state_id} vs. {container.state_id}') @@ -860,9 +870,7 @@ def _add_node(self, msg, path): MirrorStateMachine.add(container_name + '_mirror', sm, transitions=container_transitions) else: # Add instance attributes to top-level state machine - sm._state_map = self._state_map sm._total_loop_count = 0 - self._sm = sm else: diff --git a/flexbe_mirror/flexbe_mirror/mirror_concurrency_container.py b/flexbe_mirror/flexbe_mirror/mirror_concurrency_container.py index b47201f..505ecad 100644 --- a/flexbe_mirror/flexbe_mirror/mirror_concurrency_container.py +++ b/flexbe_mirror/flexbe_mirror/mirror_concurrency_container.py @@ -30,7 +30,8 @@ """Simplified state machine for use with FlexBE UI State machine mirror.""" from flexbe_core import Logger -from flexbe_core.core import StateMachine +from flexbe_core.core import State, StateMachine +from flexbe_core.core import StateMap from flexbe_mirror.mirror_state import MirrorState from flexbe_mirror.mirror_state_machine import MirrorStateMachine @@ -56,7 +57,7 @@ def on_enter_mirror(self, userdata): state._entering = True # force state to handle enter on first execute state._last_execution = None - MirrorState.publish_update(self._target_path) + MirrorState.publish_update(self.state_id) def on_exit_mirror(self, userdata, desired_outcome=-1, states=None): """Exit state and prepare for next entry (outcome -1 means preempt).""" @@ -70,7 +71,11 @@ def on_exit_mirror(self, userdata, desired_outcome=-1, states=None): self._current_state = None self._returned_outcomes = {} if desired_outcome != -1: - self._last_outcome = self.outcomes[desired_outcome] + if desired_outcome == StateMap._MAX_OUTCOME: + self._last_outcome = State._preempted_name + else: + self._last_outcome = self.outcomes[desired_outcome] + MirrorState.publish_update(self.state_id + 255) # publish that we "entered" container to exit return self._last_outcome def execute_mirror(self, userdata): @@ -86,7 +91,7 @@ def execute_mirror(self, userdata): # Handle outcome of this internal SM if self._last_outcome is not None: Logger.localwarn(f"Mirror SM concurrency execute for '{self.name.replace('_mirror', '')}' of " - f" '{self.path.replace('_mirror', '')}' ({self._state_id}) : " + f" '{self.path.replace('_mirror', '')}' ({self.state_id}) : " f"Already processed outcome='{self._last_outcome}' for " f'outcome index={MirrorState._last_state_outcome} - reprocessing anyway') @@ -118,7 +123,7 @@ def _execute_current_state_mirror(self, userdata): if len(self._current_state) == 0: # No unexited states in concurrent, so notify we have returned to concurrent level # Logger.localinfo(f"Inside CC '{self}' - no active internal - publish update '{self._target_path}'") - MirrorState.publish_update(self._target_path) # Notify back at top-level before exit + MirrorState.publish_update(self.state_id) # Notify back at top-level before exit return None def get_deep_states(self): @@ -139,9 +144,9 @@ def get_deep_states(self): else: deep_states.append(state) elif self._current_state is not None: - Logger.localerr(f"MirrorConcurrentContainer.get_deep_states '{self.name}' ({self._current_state._state_id})\n" + Logger.localerr(f"MirrorConcurrentContainer.get_deep_states '{self.name}' ({self._current_state.state_id})\n" f" - current state is NOT a list! Error type='{type(self._current_state)}'") - Logger.localerr(f" '{self._current_state.name}' ({self._current_state._state_id})") + Logger.localerr(f" '{self._current_state.name}' ({self._current_state.state_id})") raise TypeError(f"MirrorConcurrentContainer.get_deep_states '{self.name}' - " f"current state is NOT a list! Errror type='{type(self._current_state)}'") return deep_states diff --git a/flexbe_mirror/flexbe_mirror/mirror_state.py b/flexbe_mirror/flexbe_mirror/mirror_state.py index d6b7d9d..7558654 100644 --- a/flexbe_mirror/flexbe_mirror/mirror_state.py +++ b/flexbe_mirror/flexbe_mirror/mirror_state.py @@ -31,11 +31,12 @@ from flexbe_core import EventState from flexbe_core import Logger -from flexbe_core.core.preemptable_state import PreemptableState -from flexbe_core.core.topics import Topics +from flexbe_core.core import State +from flexbe_core.core import StateMap +from flexbe_core.core import Topics from flexbe_core.proxy import ProxyPublisher -from std_msgs.msg import String +from std_msgs.msg import Int32 class MirrorState(EventState): @@ -49,7 +50,7 @@ class MirrorState(EventState): # Hold data from last outcome message being processed by mirror _last_state_id = None _last_state_outcome = None - _last_target_path = None + _last_target_id = None _pub = None @@ -57,21 +58,21 @@ def __init__(self, target_name, target_path, given_outcomes, outcome_autonomy): """Initialize MirrorState instance.""" # pylint: disable=unused-argument super().__init__(outcomes=given_outcomes) - self._outcomes.append(PreemptableState._preempted_name) # Add preempted to outcomes list (for -1 outcome) + # self._outcomes.append(State._preempted_name) # Add preempted to outcomes list (for -1 outcome) self._target_name = target_name self._target_path = '/' + '/'.join(target_path.split('/')[1:]) # Drop top-level name - MirrorState._last_target_path = None # reset any time that we build a new state machine + MirrorState._last_target_id = None # reset any time that we build a new state machine if MirrorState._pub is None: # Allow access to standard proxies initialied by flexbe_mirror MirrorState._pub = ProxyPublisher() @classmethod - def publish_update(cls, target_path): + def publish_update(cls, target_id): """Publish latest deep state for UI control.""" - if target_path != MirrorState._last_target_path: - MirrorState._last_target_path = target_path - MirrorState._pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, String(data=target_path)) + if target_id != MirrorState._last_target_id: + MirrorState._last_target_id = target_id + MirrorState._pub.publish(Topics._BEHAVIOR_UPDATE_TOPIC, Int32(data=target_id)) def execute_mirror(self, userdata): """Execute the mirror state.""" @@ -97,13 +98,16 @@ def on_enter_mirror(self, userdata): self._entering = False self._last_outcome = None self._last_execution = None - MirrorState.publish_update(self._target_path) + MirrorState.publish_update(self.state_id) def on_exit_mirror(self, userdata, desired_outcome): """Exit mirror state.""" try: if desired_outcome != -1: - self._last_outcome = self.outcomes[desired_outcome] + if desired_outcome == StateMap._MAX_OUTCOME: + self._last_outcome = State._preempted_name + else: + self._last_outcome = self.outcomes[desired_outcome] return self._last_outcome except Exception as exc: # pylint: disable=W0703 Logger.localerr(f"Error: MirrorState execute for '{self.name}': " diff --git a/flexbe_mirror/flexbe_mirror/mirror_state_machine.py b/flexbe_mirror/flexbe_mirror/mirror_state_machine.py index 6d7f2dc..a28d699 100644 --- a/flexbe_mirror/flexbe_mirror/mirror_state_machine.py +++ b/flexbe_mirror/flexbe_mirror/mirror_state_machine.py @@ -33,10 +33,11 @@ from flexbe_core import Logger from flexbe_core.core import PreemptableState, PreemptableStateMachine +from flexbe_core.core import State +from flexbe_core.core import StateError from flexbe_core.core import StateMachine from flexbe_core.core import StateMap -from flexbe_core.core.exceptions import StateError -from flexbe_core.core.topics import Topics +from flexbe_core.core import Topics from flexbe_core.proxy import ProxySubscriberCached from flexbe_mirror.mirror_state import MirrorState @@ -77,11 +78,11 @@ def spin(self, start_time, state_map): self._last_deep_states_list = None # Force change to send behavior update loop_count = 0 self._total_loop_count = 0 # Attribute only added to top-level SM - outcome = PreemptableState._preempted_name + outcome = State._preempted_name if self._current_state is None: self.on_enter_mirror(userdata) Logger.localinfo(f"Mirror: set initial state for top-level '{self.name}' ({self.state_id}) ({self.id}) " - f" with state = {self._current_state.name}' ({self._current_state._state_id})") + f" with state = {self._current_state.name}' ({self._current_state.state_id})") while rclpy.ok() and not PreemptableState.preempt: self._total_loop_count += 1 @@ -112,7 +113,7 @@ def spin(self, start_time, state_map): MirrorState._last_state_id = None # Flag that the message was handled if MirrorState._last_state_outcome is not None: outcome = self.on_exit_mirror(userdata, MirrorState._last_state_outcome) - MirrorState.publish_update(self._target_path) # Notify back at top-level before exit + MirrorState.publish_update(self.state_id) # Notify back at top-level before exit MirrorState._last_state_outcome = None # Flag that the message was handled # Logger.localinfo(f' top-level outcome {outcome} for {state_id} ' # f'in thread started at {start_time.nanoseconds}') @@ -129,7 +130,7 @@ def spin(self, start_time, state_map): if MirrorState._last_state_id is not None or MirrorState._last_state_outcome is not None: # This should not happen unless system is out of sync - Logger.logwarn(f"MirrorStateMachine '{self.name}' ({self._state_id}) spin() - " + Logger.logwarn(f"MirrorStateMachine '{self.name}' ({self.state_id}) spin() - " f'no state handled outcome from {MirrorState._last_state_id} ' f'outcome index={MirrorState._last_state_outcome}') @@ -141,7 +142,7 @@ def spin(self, start_time, state_map): self._last_deep_states_list = deep_states if outcome is not None: - Logger.localinfo(f"MirrorStateMachine '{self.name}' ({self._state_id}) spin() - outcome = {outcome}" + Logger.localinfo(f"MirrorStateMachine '{self.name}' ({self.state_id}) spin() - outcome = {outcome}" ' - wait for confirming top-level outcome message!') else: @@ -191,8 +192,8 @@ def _execute_current_state_mirror(self, userdata): target = self._transitions[self._current_state.name][outcome] self._current_state = self._labels.get(target) # Get the new state if self._current_state is None: - Logger.localinfo(f"SM {self.name.replace('_mirror', '')} is done, but wait for outcome message.") - MirrorState.publish_update(self._target_path) # Notify back at sm-level before exit + # Logger.localinfo(f"SM {self.name.replace('_mirror', '')} is done, but wait for outcome message.") + MirrorState.publish_update(self.state_id) # Notify back at sm-level before exit return None else: # Logger.localinfo(f"SM {self.name.replace('_mirror', '')} transitioning " @@ -201,10 +202,10 @@ def _execute_current_state_mirror(self, userdata): return None except KeyError as exc: err_msg = f"Returned outcome '{outcome}' is not registered as a transition from '{self._current_state}'" - Logger.localerr(f"Mirror SM execute for '{self.name}' ({self._state_id}): {err_msg}") - Logger.localinfo(f' {self.name} ({self._state_id}) - labels={self._labels}') - Logger.localinfo(f' {self.name} ({self._state_id}) - transitions={self._transitions}') - Logger.localinfo(f' {self.name} ({self._state_id}) - outcomes={self._outcomes}') + Logger.localerr(f"Mirror SM execute for '{self.name}' ({self.state_id}): {err_msg}") + Logger.localinfo(f' {self.name} ({self.state_id}) - labels={self._labels}') + Logger.localinfo(f' {self.name} ({self.state_id}) - transitions={self._transitions}') + Logger.localinfo(f' {self.name} ({self.state_id}) - outcomes={self._outcomes}') raise StateError(err_msg) from exc # we handle internal SM transitions using mirror outcome messages @@ -220,7 +221,7 @@ def execute_mirror(self, userdata): # Logger.localinfo(f"Handling outcome of SM '{self.name.replace('_mirror', '')}' " # f"of '{self.path.replace('_mirror', '')}' ...") if self._last_outcome is not None: - Logger.localwarn(f"Mirror SM execute for '{self.name}' ({self._state_id}) : " + Logger.localwarn(f"Mirror SM execute for '{self.name}' ({self.state_id}) : " f'Already processed outcome={self._last_outcome} for ' f'outcome index={MirrorState._last_state_outcome} - reprocessing anyway') @@ -276,7 +277,7 @@ def on_enter_mirror(self, userdata): self._last_outcome = None self._current_state._entering = True # force state to handle enter on first execute self._userdata = None # not used in mirror - MirrorState.publish_update(self._target_path) + MirrorState.publish_update(self.state_id) def on_exit_mirror(self, userdata, desired_outcome=-1): """Exit mirror statemachine.""" @@ -285,9 +286,13 @@ def on_exit_mirror(self, userdata, desired_outcome=-1): self._current_state._entering = True self._current_state.on_exit_mirror(userdata, -1) # Preempted if desired_outcome != -1: - self._last_outcome = self.outcomes[desired_outcome] + if desired_outcome == StateMap._MAX_OUTCOME: + self._last_outcome = State._preempted_name + else: + self._last_outcome = self.outcomes[desired_outcome] self._current_state = None self._entering = True + MirrorState.publish_update(self.state_id + 255) # publish that we "entered" container to exit return self._last_outcome except Exception: # pylint: disable=W0703 Logger.localerr(f"Error: MirrorStateMachine execute for '{self.name}': " diff --git a/flexbe_msgs/CMakeLists.txt b/flexbe_msgs/CMakeLists.txt index 767742a..67bb3db 100644 --- a/flexbe_msgs/CMakeLists.txt +++ b/flexbe_msgs/CMakeLists.txt @@ -17,6 +17,7 @@ set(msg_files "msg/OutcomeCondition.msg" "msg/OutcomeRequest.msg" "msg/StateInstantiation.msg" + "msg/StateMapMsg.msg" "msg/SynthesisErrorCodes.msg" "msg/SynthesisRequest.msg" "msg/UICommand.msg" diff --git a/flexbe_msgs/action/BehaviorExecution.action b/flexbe_msgs/action/BehaviorExecution.action index fe60536..94b92aa 100644 --- a/flexbe_msgs/action/BehaviorExecution.action +++ b/flexbe_msgs/action/BehaviorExecution.action @@ -18,5 +18,5 @@ string outcome --- -# path of the current state -string current_state +# state id of the current state +int32 current_state diff --git a/flexbe_msgs/msg/OutcomeRequest.msg b/flexbe_msgs/msg/OutcomeRequest.msg index 620e985..8842292 100644 --- a/flexbe_msgs/msg/OutcomeRequest.msg +++ b/flexbe_msgs/msg/OutcomeRequest.msg @@ -1,2 +1,2 @@ uint8 outcome -string target \ No newline at end of file +int32 target # state id \ No newline at end of file diff --git a/flexbe_msgs/msg/StateMapMsg.msg b/flexbe_msgs/msg/StateMapMsg.msg new file mode 100644 index 0000000..ebe2036 --- /dev/null +++ b/flexbe_msgs/msg/StateMapMsg.msg @@ -0,0 +1,8 @@ +# Mapping between numeric ids and state path in given behavior + +int32 behavior_id # Behavior ID assigned in ContainerStructure.msg + +# Ordered lists of corresponding state id hash code and state path +int32[] state_ids +string[] state_paths + diff --git a/flexbe_onboard/flexbe_onboard/flexbe_onboard.py b/flexbe_onboard/flexbe_onboard/flexbe_onboard.py index fac98c1..d6172f1 100644 --- a/flexbe_onboard/flexbe_onboard/flexbe_onboard.py +++ b/flexbe_onboard/flexbe_onboard/flexbe_onboard.py @@ -55,7 +55,7 @@ def set_thread_name(name): from flexbe_core.core.topics import Topics from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached -from flexbe_msgs.msg import BEStatus, BehaviorSelection, BehaviorSync, CommandFeedback, UserdataInfo +from flexbe_msgs.msg import BEStatus, BehaviorSelection, BehaviorSync, CommandFeedback, StateMapMsg, UserdataInfo from flexbe_msgs.srv import GetUserdata import rclpy @@ -93,6 +93,7 @@ def __init__(self): # only at onboard level self._heartbeat_pub = self.create_publisher(BehaviorSync, Topics._ONBOARD_HEARTBEAT_TOPIC, 10) self._status_pub = self.create_publisher(BEStatus, Topics._ONBOARD_STATUS_TOPIC, 10) + self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_TOPIC, 2) latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL) self._version_sub = self.create_subscription(String, Topics._UI_VERSION_TOPIC, @@ -315,7 +316,15 @@ def _behavior_execution(self, beh_sel_msg): self.be.confirm() Logger.localinfo(f' behavior {self.be.name}: {self.be.beh_id} confirmation.') - args = [self.be.requested_state_path] if self.be.requested_state_path is not None else [] + # Publish behavior state map as a debugging aid (match to OCS side published by launcher and mirror) + state_ids, state_paths = be.state_map_items + state_map_msg = StateMapMsg(behavior_id=be.beh_id, + state_ids=state_ids, + state_paths=state_paths) + self._state_map_pub.publish(state_map_msg) + + # Publish start status + args = [self.be.requested_state_id] if self.be.requested_state_id is not None else [] Logger.localinfo(f'Behavior Engine - behavior {self.be.name}: {self.be.beh_id} BEStatus STARTED.') self._status_pub.publish(BEStatus(stamp=self.get_clock().now().to_msg(), behavior_id=self.be.beh_id, diff --git a/flexbe_widget/CMakeLists.txt b/flexbe_widget/CMakeLists.txt index 5abb5d1..f216901 100644 --- a/flexbe_widget/CMakeLists.txt +++ b/flexbe_widget/CMakeLists.txt @@ -11,6 +11,7 @@ install(PROGRAMS bin/breakpoint bin/create_repo bin/evaluate_logs + bin/flexbe_outcome_listener DESTINATION lib/${PROJECT_NAME} ) diff --git a/flexbe_widget/bin/flexbe_outcome_listener b/flexbe_widget/bin/flexbe_outcome_listener new file mode 100644 index 0000000..34e9ec2 --- /dev/null +++ b/flexbe_widget/bin/flexbe_outcome_listener @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from flexbe_widget.flexbe_outcome_listener import flexbe_outcome_listener_main + +if __name__ == '__main__': + flexbe_outcome_listener_main() diff --git a/flexbe_widget/flexbe_widget/behavior_action_server.py b/flexbe_widget/flexbe_widget/behavior_action_server.py index ae4c79b..81054b7 100644 --- a/flexbe_widget/flexbe_widget/behavior_action_server.py +++ b/flexbe_widget/flexbe_widget/behavior_action_server.py @@ -45,7 +45,7 @@ from rosidl_runtime_py import get_interface_path -from std_msgs.msg import Empty, String +from std_msgs.msg import Empty, Int32 import yaml @@ -65,7 +65,7 @@ def __init__(self, node): self._pub = self._node.create_publisher(BehaviorSelection, Topics._START_BEHAVIOR_TOPIC, 100) self._preempt_pub = self._node.create_publisher(Empty, Topics._CMD_PREEMPT_TOPIC, 100) self._status_pub = self._node.create_subscription(BEStatus, Topics._ONBOARD_STATUS_TOPIC, self._status_cb, 100) - self._state_pub = self._node.create_subscription(String, Topics._BEHAVIOR_UPDATE_TOPIC, self._state_cb, 100) + self._state_pub = self._node.create_subscription(Int32, Topics._BEHAVIOR_UPDATE_TOPIC, self._state_cb, 100) self._as = ActionServer(self._node, BehaviorExecution, Topics._EXECUTE_BEHAVIOR_ACTION, @@ -200,4 +200,4 @@ def _state_cb(self, msg): self._current_state = msg.data if self._current_goal and self._current_goal.is_active: self._current_goal.publish_feedback(BehaviorExecution.Feedback(current_state=self._current_state)) - self._node.get_logger().loginfo("Current state: '%s'" % self._current_state) + self._node.get_logger().loginfo('Current state id = %d' % self._current_state) diff --git a/flexbe_widget/flexbe_widget/behavior_launcher.py b/flexbe_widget/flexbe_widget/behavior_launcher.py index 85a643f..082e88b 100644 --- a/flexbe_widget/flexbe_widget/behavior_launcher.py +++ b/flexbe_widget/flexbe_widget/behavior_launcher.py @@ -46,6 +46,7 @@ from flexbe_msgs.msg import BEStatus, BehaviorModification, BehaviorRequest from flexbe_msgs.msg import BehaviorSelection, BehaviorSync from flexbe_msgs.msg import ContainerStructure +from flexbe_msgs.msg import StateMapMsg import rclpy from rclpy.node import Node @@ -77,6 +78,7 @@ def __init__(self): self._status_pub = self.create_publisher(BEStatus, Topics._ONBOARD_STATUS_TOPIC, 100) self._mirror_pub = self.create_publisher(ContainerStructure, Topics._MIRROR_STRUCTURE_TOPIC, 100) self._heartbeat_pub = self.create_publisher(Int32, Topics._LAUNCHER_HEARTBEAT_TOPIC, 2) + self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_OCS_TOPIC, 2) self._behavior_lib = BehaviorLibrary(self) @@ -120,10 +122,6 @@ def _request_callback(self, msg): if not self._ready_event.is_set(): Logger.logerr('Behavior engine is not ready - cannot process start request!') else: - # self._ready_event.clear() # require a new ready signal after publishing - # thread = threading.Thread(target=self._process_request, args=[msg]) - # thread.daemon = True - # thread.start() # Not waiting in process request, so safe to not block callback self._process_request(msg) @@ -166,13 +164,18 @@ def _process_request(self, msg): be_selection.arg_keys = msg.arg_keys be_selection.arg_values = msg.arg_values - container_map = StateMap() - be_structure = ContainerStructure() - be_structure.containers = msg.structure - for container in be_structure.containers: - # self.get_logger().info(f'BELauncher: request_callback: adding container {container.path} ...') - container_map.add_state(container.path, container) - # self.get_logger().info(f"BELauncher: request_callback: {msg.structure}") + state_map = StateMap() + try: + be_structure = ContainerStructure() + be_structure.containers = msg.structure + for container in be_structure.containers: + state_map.add_state(container.path, container) + self.get_logger().info(f'Built Statemachine {state_map}') + except Exception as exc: + self.get_logger().info(f"Failed to build state map for container {behavior['name']} ") + self.get_logger().info(f'{exc}') + self.get_logger().info(f'{state_map}') + raise exc try: be_filepath_new = self._behavior_lib.get_sourcecode_filepath(be_key) @@ -210,6 +213,17 @@ def _process_request(self, msg): new_content=content)) be_selection.behavior_id = zlib.adler32(be_content_new.encode()) & 0x7fffffff + + try: + state_ids, state_paths = list(zip(*state_map.items)) + state_map_msg = StateMapMsg(behavior_id=be_selection.behavior_id, + state_ids=state_ids, + state_paths=state_paths) + self._state_map_pub.publish(state_map_msg) # Used by the WebUI + + except Exception as exc: + self.get_logger().warn(f'Failed to publish state map from launcher!\n{exc}') + if msg.autonomy_level != 255: be_structure.behavior_id = be_selection.behavior_id self._mirror_pub.publish(be_structure) diff --git a/flexbe_widget/flexbe_widget/flexbe_outcome_listener.py b/flexbe_widget/flexbe_widget/flexbe_outcome_listener.py new file mode 100644 index 0000000..a9d3190 --- /dev/null +++ b/flexbe_widget/flexbe_widget/flexbe_outcome_listener.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python + +# Copyright 2024 Christopher Newport University +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the Christopher Newport University nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import rclpy +from rclpy.node import Node + +from flexbe_core.core import StateMap, Topics + +class FlexbeOutcomeListener(Node): + """Simple lister to echo FlexBE state machine information.""" + def __init__(self): + super().__init__('flexbe_outcome_listener') + + + # Suppress the node name and severity information (ChatGPT led me astray) + # import logging + # formatter = logging.Formatter('[%(asctime)s] %(message)s') + # console_handler = logging.StreamHandler() + # console_handler.setFormatter(formatter) + + # self.get_logger().handlers.clear() + # self.get_logger().addHandler(console_handler) + + self._state_map = None + self._state_map_sub = self.create_subscription( + Topics._topic_types[Topics._OUTCOME_TOPIC], + Topics._OUTCOME_TOPIC, + self._outcome_callback, + 10) + self._state_map_sub # prevent unused variable warning + + self._state_map_sub = self.create_subscription( + Topics._topic_types[Topics._STATE_MAP_OCS_TOPIC], + Topics._STATE_MAP_OCS_TOPIC, + self._state_map_callback, + 10) + self._state_map_sub # prevent unused variable warning + + def _outcome_callback(self, msg): + if self._state_map is None: + self.get_logger().info(f"Outcome msg hash value {msg.data:11d}") + return + + state_id, outcome = StateMap.unhash(msg.data) + path = f"'{self._state_map.get(state_id, 'unknown')}'" + self.get_logger().info(f"Outcome {outcome:2d} from {state_id:11d} {path:60s}") + + def _state_map_callback(self, msg): + self.get_logger().info(f"New state map received for {msg.behavior_id}") + + state_map = {} + for id, path in zip(msg.state_ids, msg.state_paths): + self.get_logger().info(f" adding {id:11d} at '{path}'") + state_map[id] = path + + self._state_map = state_map + +def flexbe_outcome_listener_main(args=None): + + print('Suggest: export RCUTILS_CONSOLE_OUTPUT_FORMAT="[{time}] {message}" in terminal') + print(' Run this before starting behavior to get full state path informatio from state map.') + rclpy.init(args=args) + + outcomes = FlexbeOutcomeListener() + + rclpy.spin(outcomes) + + # Destroy the node explicitly + # (optional - otherwise it will be done automatically + # when the garbage collector destroys the node object) + outcomes.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + flexbe_outcome_listener_main() \ No newline at end of file