Skip to content

Commit

Permalink
update with state map and changes to concurrent handling; allow remov…
Browse files Browse the repository at this point in the history
…ing action clients and service callers
  • Loading branch information
David Conner committed Aug 15, 2023
1 parent b1422c2 commit 0e00e01
Show file tree
Hide file tree
Showing 53 changed files with 2,395 additions and 927 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/flexbe_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ jobs:
# - os: ubuntu-22.04
# ros: humble
# python: python3
# ci_branch: ros2-devel
# ci_branch: humble-pre-release
- os: ubuntu-22.04
ros: iron
python: python3
ci_branch: ros2-devel
ci_branch: rolling-pre-release
- os: ubuntu-22.04
ros: rolling
python: python3
ci_branch: ros2-devel
ci_branch: rolling-pre-release

runs-on: ${{ matrix.os }}
env:
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ The user interface features a runtime control interface as well as a graphical e

Please refer to the FlexBE Homepage ([flexbe.github.io](http://flexbe.github.io)) for further information, tutorials, application examples, and much more.

![FlexBE CI](https://github.com/FlexBE/flexbe_behavior_engine/workflows/FlexBE%20CI/badge.svg?branch=ros2-devel)
You may also want to check out the quick start tutorial demonstrations at [FlexBE Turtlesim Demo](https://github.com/FlexBE/flexbe_turtlesim_demo).

![FlexBE CI](https://github.com/FlexBE/flexbe_behavior_engine/workflows/FlexBE%20CI/badge.svg?branch=rolling-pre-release)

Humble ![ROS Build Farm](https://build.ros2.org/job/Hdev__flexbe_behavior_engine__ubuntu_jammy_amd64/badge/icon)

Iron ![ROS Build Farm](https://build.ros2.org/job/Idev__flexbe_behavior_engine__ubuntu_jammy_amd64/badge/icon)

Rolling ![ROS Build Farm](https://build.ros2.org/job/Rdev__flexbe_behavior_engine__ubuntu_jammy_amd64/badge/icon)

## Installation
Expand Down
5 changes: 5 additions & 0 deletions flexbe_behavior_engine/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Changelog for package flexbe_behavior_engine
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Forthcoming
-----------
* flake 8 cleanup
* support state id and concurrency handling

2.3.3 (2023-08-09)
------------------

Expand Down
19 changes: 19 additions & 0 deletions flexbe_core/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Changelog for package flexbe_core
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Forthcoming
-----------
* add options to remove service callers and action clients
* proxy publisher dictionary usage
* use modern style names with deprecated flag
* add sync lock to proxies
* added code to remove subscriptions and publishers with sync lock
* use deque for msg buffer add lock to prevent modifications during callback (e.g. when thread starts or finishes)
* add hash for StateMap outcome index to standardize handling
* reinitialize existing state machine instead of rebuilding on sync (1000x faster)
* update with standardized topic handling
* update ui version handling
* OperatableStateMachine is now a pseudo manually transitionable state (TODO -separate logic to shadow state design)
* add is_controlled check to avoid attempts at duplicate subscriptions and cleanup
* onboard side coded to send new BehaviorSync and 'mirror/outcome'
* adding state_id handling; pre-building ContainerStructure to set IDs
* flake8, pep257 and codestyle checks

2.3.3 (2023-08-09)
------------------
* destroy sub/pub/client in executor thread
Expand All @@ -27,6 +45,7 @@ Changelog for package flexbe_core
* clean up some spam to FlexBE app console
* include package name in behavior request (requires flexbe_app 3.1+) to allow duplicate behavior names in packages


2.2.0 (2023-06-29)
------------------
* Modify to used behavior_id (checksum) and behavior_key consistently
Expand Down
2 changes: 2 additions & 0 deletions flexbe_core/flexbe_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from .logger import Logger # noqa: F401
from .state_logger import StateLogger # noqa: F401

MIN_UI_VERSION = '4.0.0' # Minimum FlexBE App or UI version required to interact with this version of flexbe_core

# pylint: disable=R0903


Expand Down
24 changes: 16 additions & 8 deletions flexbe_core/flexbe_core/behavior.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ def confirm(self):

self._state_machine.confirm(self.name, self.beh_id)

# def define_structure(self):
# """
# Calculate all state ids and prepare the ContainerStructure message
# """
# self._state_machine.define_structure()

def execute(self):
"""
Execute the behavior.
Expand Down Expand Up @@ -220,16 +226,18 @@ def prepare_for_switch(self, state):
states[1].replace_state(state) # add to new state machine
self.requested_state_path = state.path # set start after switch

def get_current_state(self):
return self._state_machine.get_deep_state()
def get_current_states(self):
return self._state_machine.get_deep_states()

def get_locked_state(self):
state = self._state_machine.get_deep_state()
while state is not None:
if state.is_locked():
return state

state = state._parent
"""Return the first state designated as locked."""
states = self._state_machine.get_deep_states()
for state in states:
while state is not None:
if state.is_locked():
return state

state = state._parent
return None

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions flexbe_core/flexbe_core/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .operatable_state import OperatableState # noqa: F401
from .event_state import EventState # noqa: F401

from .state_map import StateMap # noqa: F401
from .user_data import UserData # noqa: F401

__all__ = [
Expand All @@ -63,5 +64,6 @@
'PreemptableState',
'OperatableState',
'EventState',
'StateMap',
'UserData'
]
180 changes: 156 additions & 24 deletions flexbe_core/flexbe_core/core/concurrency_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@
It synchronizes its current state with the mirror and supports some control mechanisms.
"""
from flexbe_core.logger import Logger
from flexbe_core.core.user_data import UserData
from flexbe_core.core.event_state import EventState
from flexbe_core.core.lockable_state_machine import LockableStateMachine
from flexbe_core.core.operatable_state import OperatableState
from flexbe_core.core.operatable_state_machine import OperatableStateMachine
from flexbe_core.core.priority_container import PriorityContainer
from flexbe_core.core.topics import Topics
from flexbe_core.core.user_data import UserData
from flexbe_core.logger import Logger
from flexbe_core.state_logger import StateLogger

from flexbe_core.core.operatable_state_machine import OperatableStateMachine
from flexbe_msgs.msg import CommandFeedback, OutcomeRequest


class ConcurrencyContainer(OperatableStateMachine):
Expand All @@ -53,6 +58,9 @@ def __init__(self, conditions=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._conditions = conditions if conditions else {}
self._returned_outcomes = {}
self._current_state = None
self._type = OperatableStateMachine.ContainerType.ConcurrencyContainer.value
self._manual_transition_requested = None

@property
def sleep_duration(self):
Expand All @@ -63,28 +71,111 @@ def sleep_duration(self):

return sleep_dur

@property
def current_state(self):
"""Return current state of Concurrency container, which is itself and list of active states."""
return self

@property
def current_state_label(self):
"""Return current state name of Concurrency container. which is itself."""
return self.name

def get_required_autonomy(self, outcome, state):
try:
assert state in self._current_state, "get required autonomy in ConcurrencyContainer - state doesn't match!"
return self._autonomy[state.name][outcome]
except Exception as exc:
Logger.error(f"Failure to retrieve autonomy for '{self.name}' in CC - "
f" current state label='{self.name}' state='{state.name}' outcome='{outcome}'.")
Logger.localerr(f"{type(exc)} - {exc}\n\n {self._current_state}")
Logger.localerr(f"{self._autonomy}")

def _execute_current_state(self):
# execute all states that are done with sleeping and determine next sleep duration
self._inner_sync_request = False # clear prior request for lower level state
self._current_state = [] # Concurrency container has multiple active states so use list

self._manual_transition_requested = None
# Logger.localinfo(f"-concurrency container {self.name} is_controlled={self._is_controlled}"
# f" has transition request={self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC)}")
if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC):
# Special handling in concurrency container - can be either CC or one of several internal states.
command_msg = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC)

if command_msg.target == self.name:
self._force_transition = True
outcome = self.outcomes[command_msg.outcome]
self._manual_transition_requested = outcome
self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command="transition",
args=[command_msg.target, self.name]))
Logger.localwarn(f"--> Manually triggered outcome {outcome} of concurrency container {self.name}")
self.on_exit(self.userdata,
states=[s for s in self._states if (s.name not in self._returned_outcomes
or self._returned_outcomes[s.name] is None)])
self._returned_outcomes = {}
self._current_state = None
self._last_outcome = outcome
return outcome
else:
Logger.localinfo(f"concurrency container {self.name} ")
self._manual_transition_requested = command_msg

for state in self._states:
if state.name in self._returned_outcomes and self._returned_outcomes[state.name] is not None:
continue # already done with executing

if self._manual_transition_requested is not None and self._manual_transition_requested.target == state.name:
# Transition request applies to this state
# @TODO - Should we be using path not name here?
command_msg = self._manual_transition_requested
self._manual_transition_requested = None # Reset at this level

if 0 <= command_msg.outcome < len(state.outcomes):
state._force_transition = True
outcome = state.outcomes[command_msg.outcome]
state._manual_transition_requested = outcome
self._returned_outcomes[state.name] = outcome
with UserData(reference=self._userdata, remap=self._remappings[state.name],
input_keys=state.input_keys, output_keys=state.output_keys) as userdata:
state.on_exit(userdata)

# ConcurrencyContainer bypasses normal operatable state handling of manual request, so do that here
state._publish_outcome(outcome)

self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command="transition",
args=[command_msg.target, state.name]))
Logger.localerr(f"--> Manually triggered outcome {outcome} ({command_msg.outcome}) "
f"of state {state.name} from inside concurrency {self.name}")
continue
else:
Logger.localerr(f"--> Invalid outcome {command_msg.outcome} request for state {state.name} "
f"from inside concurrency {self.name}\n{state.outcomes}")

if (PriorityContainer.active_container is not None
and not all(a == s for a, s in zip(PriorityContainer.active_container.split('/'),
state.path.split('/')))):
if isinstance(state, EventState):
# Base state not a container
state._notify_skipped()
elif state.get_deep_state() is not None:
state.get_deep_state()._notify_skipped()
continue # other state has priority

# this state must be a container
deep_states = state.get_deep_states()
if deep_states is not None:
for dpst in deep_states:
dpst._notify_skipped()

continue # other state has priority

if state.sleep_duration <= 0: # ready to execute
out = self._execute_single_state(state)
self._returned_outcomes[state.name] = out
if out:
# Track any state with outcome as the current state
self._current_state = state

# Track any state that remains as being currently active
self._current_state.append(state)

# we want to pass sync requests back up to parent,
self._inner_sync_request = self._inner_sync_request or state._inner_sync_request
Expand All @@ -111,9 +202,36 @@ def _execute_current_state(self):
self._returned_outcomes = {}
# right now, going out of a concurrency container may break sync
# thus, as a quick fix, explicitly request sync again on any output
self._inner_sync_request = True
# self._inner_sync_request = True
self._current_state = None
Logger.localwarn('ConcurrencyContainer %s returning outcome %s (request inner sync)' % (self.name, str(outcome)))

if self._is_controlled:
# reset previously requested outcome if applicable
if self._last_requested_outcome is not None and outcome is None:
self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC, OutcomeRequest(outcome=255, target=self.path))
self._last_requested_outcome = None

# request outcome because autonomy level is too low
if (not self._force_transition and self.parent is not None
and (not self.parent.is_transition_allowed(self.name, outcome)
or outcome is not None and self.is_breakpoint)):
if outcome != self._last_requested_outcome:
self._pub.publish(Topics._OUTCOME_REQUEST_TOPIC,
OutcomeRequest(outcome=self.outcomes.index(outcome),
target=self.path))
Logger.localinfo("<-- Want result: %s > %s" % (self.name, outcome))
StateLogger.log('flexbe.operator', self, type='request', request=outcome,
autonomy=self.parent.autonomy_level,
required=self.parent.get_required_autonomy(outcome, self))
self._last_requested_outcome = outcome
outcome = None
elif outcome is not None and outcome in self.outcomes:
# autonomy level is high enough, report the executed transition
self._publish_outcome(outcome)
self._force_transition = False

self._last_outcome = outcome
# Logger.localinfo(f"ConcurrencyContainer '{self.name}' returning outcome '{outcome}' (request inner sync)")
return outcome

def _execute_single_state(self, state, force_exit=False):
Expand All @@ -136,21 +254,8 @@ def _execute_single_state(self, state, force_exit=False):

return result

def _enable_ros_control(self):
state = self._states[0]
if isinstance(state, EventState):
state._enable_ros_control()
if isinstance(state, OperatableStateMachine):
state._enable_ros_control()

def _disable_ros_control(self):
state = self._states[0]
if isinstance(state, EventState):
state._disable_ros_control()
if isinstance(state, OperatableStateMachine):
state._disable_ros_control()

def on_enter(self, userdata): # pylint: disable=W0613
super().on_enter(userdata)
for state in self._states:
# Force on_enter at state level (userdata passed by _execute_single_state)
state._entering = True # force state to handle enter on first execute
Expand All @@ -162,3 +267,30 @@ def on_exit(self, userdata, states=None):
continue # skip states that already exited themselves
self._execute_single_state(state, force_exit=True)
self._current_state = None

def get_deep_states(self):
"""
Recursively look for the currently executing states.
Traverse all state machines down to the terminal child state that is not a container.
EXCEPT for ConcurrencyContainers. Those are both active state and container.
@return: The list of active states (not state machine)
"""
deep_states = [self] # Concurrency acts as both state and container for this purpose
if isinstance(self._current_state, list):
for state in self._current_state:
# Internal states (after skipping concurrency container self)
if isinstance(state, LockableStateMachine):
deep_states.extend(state.get_deep_states())
else:
deep_states.append(state)
# Logger.localinfo(f"Concurrent get_deep_states: {self.name} {[state.path for state in deep_states]}")
return deep_states
elif self._current_state is not None:
Logger.localerr(f"ConcurrentContainer.get_deep_states {self.name} - current state is NOT a list!")
raise TypeError(f"ConcurrentContainer.get_deep_states {self.name} - current state is NOT a list!")
# Otherwise, either haven't fully entered, or all have returned outcomes

return deep_states
Loading

0 comments on commit 0e00e01

Please sign in to comment.