Skip to content

Commit

Permalink
Add peek_at_buffer to proxy subscriber; modify processing of transiti…
Browse files Browse the repository at this point in the history
…on command for nested behaviors
  • Loading branch information
David Conner committed Jun 23, 2024
1 parent 7f6262b commit 0de608b
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 47 deletions.
74 changes: 42 additions & 32 deletions flexbe_core/flexbe_core/core/concurrency_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@ def _execute_current_state(self):

self._manual_transition_requested = None
# Logger.localinfo(f"-concurrency container '{self.name}' is_controlled={self._is_controlled}"
# f' has transition request={self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC)}')
# f" with {len(self._states)} states is entering={self._entering} ")
if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC):
# Special handling in concurrency container - can be either CC or one of several internal states.
command_msg = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC)
command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC)

if command_msg.target == self.name:
cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer
assert cmd_msg2 is command_msg, "Unexpected change in CMD_TRANSITION_TOPIC buffer"
Logger.localinfo(f"-concurrency container '{self.name}' is handling the transition cmd msg={command_msg}")

self._force_transition = True
outcome = self.outcomes[command_msg.outcome]
self._manual_transition_requested = outcome
Expand All @@ -122,40 +126,46 @@ def _execute_current_state(self):
self._last_outcome = outcome
return outcome
else:
Logger.localinfo(f"concurrency container '{self.name}' ")
Logger.localinfo(f"concurrency container '{self.name}' - storing {command_msg} transition request")
self._manual_transition_requested = command_msg

for state in self._states:
if state.name in self._returned_outcomes and self._returned_outcomes[state.name] is not None:
# print(f" in current {self._name} : state '{state.name}' is already done.", flush=True)
continue # already done with executing

if self._manual_transition_requested is not None and self._manual_transition_requested.target == state.name:
# Transition request applies to this state
# @TODO - Should we be using path not name here?
command_msg = self._manual_transition_requested
self._manual_transition_requested = None # Reset at this level

if 0 <= command_msg.outcome < len(state.outcomes):
state._force_transition = True
outcome = state.outcomes[command_msg.outcome]
state._manual_transition_requested = outcome
self._returned_outcomes[state.name] = outcome
with UserData(reference=self._userdata, remap=self._remappings[state.name],
input_keys=state.input_keys, output_keys=state.output_keys) as userdata:
state.on_exit(userdata)

# ConcurrencyContainer bypasses normal operatable state handling of manual request, so do that here
state._publish_outcome(outcome)

self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command='transition',
args=[command_msg.target, state.name]))
Logger.localerr(f'--> Manually triggered outcome {outcome} ({command_msg.outcome}) '
f"of state '{state.name}' from inside concurrency {self.name}")
continue
else:
Logger.localerr(f"--> Invalid outcome {command_msg.outcome} request for state '{state.name}' "
f"from inside concurrency '{self.name}'\n{state.outcomes}")
if self._manual_transition_requested is not None:
if self._manual_transition_requested.target == state.name:
# Transition request applies to this state
# @TODO - Should we be using path not name here?
command_msg = self._manual_transition_requested
cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer
assert cmd_msg2 is command_msg, "Something is up with handling of buffer for CMD_TRANSITION_TOPIC"
Logger.localinfo(f"-concurrency container '{self.name}' state '{state.name}' is handling "
f"the cmd msg={command_msg}")
self._manual_transition_requested = None # Reset at this level

if 0 <= command_msg.outcome < len(state.outcomes):
state._force_transition = True
outcome = state.outcomes[command_msg.outcome]
state._manual_transition_requested = outcome
self._returned_outcomes[state.name] = outcome
with UserData(reference=self._userdata, remap=self._remappings[state.name],
input_keys=state.input_keys, output_keys=state.output_keys) as userdata:
state.on_exit(userdata)

# ConcurrencyContainer bypasses normal operatable state handling of manual request, so do that here
state._publish_outcome(outcome)

self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command='transition',
args=[command_msg.target, state.name]))
Logger.localerr(f'--> Manually triggered outcome {outcome} ({command_msg.outcome}) '
f"of state '{state.name}' from inside concurrency {self.name}")
continue
else:
Logger.localerr(f"--> Invalid outcome {command_msg.outcome} request for state '{state.name}' "
f"from inside concurrency '{self.name}'\n{state.outcomes}")

if (PriorityContainer.active_container is not None
and not all(a == s for a, s in zip(PriorityContainer.active_container.split('/'),
Expand All @@ -173,7 +183,8 @@ def _execute_current_state(self):

continue # other state has priority

if state.sleep_duration <= 0: # ready to execute
if state.sleep_duration <= 0 or self._manual_transition_requested is not None: # ready to execute
# Execute if we have a pending manual transition command or state tic rate elapsed
out = self._execute_single_state(state)
self._returned_outcomes[state.name] = out

Expand Down Expand Up @@ -234,7 +245,6 @@ def _execute_current_state(self):
self._force_transition = False

self._last_outcome = outcome
# Logger.localinfo(f"ConcurrencyContainer '{self.name}' returning outcome '{outcome}' (request inner sync)")
return outcome

def _execute_single_state(self, state, force_exit=False):
Expand Down
18 changes: 11 additions & 7 deletions flexbe_core/flexbe_core/core/manually_transitionable_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,22 @@ def __init__(self, *args, **kwargs):
def _manually_transitionable_execute(self, *args, **kwargs):
self._manual_transition_requested = None
if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC):
command_msg = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC)
self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command='transition', args=[command_msg.target, self.name]))
if command_msg.target != self.name:
Logger.logwarn("Requested outcome for state '%s' but active state is '%s'" %
(command_msg.target, self.name))
else:
command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC)
if command_msg.target == self.name:
cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC)
assert cmd_msg2 is command_msg, "Unexpected change in CMD_TRANSITION_TOPIC buffer"
self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command='transition', args=[command_msg.target, self.name]))

self._force_transition = True
outcome = self.outcomes[command_msg.outcome]
self._manual_transition_requested = outcome
Logger.localinfo("--> Manually triggered outcome '%s' of state '%s'" % (outcome, self.name))
return outcome
else:
Logger.loginfo(f"Requested outcome for state '{command_msg.target}' "
f" but this active state is '{self.name}' - keep looking for potential nested state")

# otherwise, return the normal outcome
self._force_transition = False
return self.__execute(*args, **kwargs)
Expand Down
14 changes: 14 additions & 0 deletions flexbe_core/flexbe_core/core/preemptable_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from flexbe_core.core.preemptable_state import PreemptableState
from flexbe_core.core.topics import Topics
from flexbe_core.logger import Logger
from flexbe_msgs.msg import CommandFeedback

import rclpy

Expand Down Expand Up @@ -84,8 +85,21 @@ def spin(self, userdata=None):
"""Spin the execute loop for preemptable portion."""
outcome = None
while rclpy.ok():
command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC)

outcome = self.execute(userdata)

if command_msg is not None:
command_msg2 = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC)
if command_msg is command_msg2:
# Execute loop went through process and did not handle the requested transition
Logger.loginfo(f"'{self.name}' did not handle transition "
f"cmd='{command_msg.target}' ({command_msg.outcome}) - toss it!")
self._pub.publish(Topics._CMD_FEEDBACK_TOPIC,
CommandFeedback(command='transition',
args=['invalid', command_msg.target]))
self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC)

# Store the information for safely passing to heartbeat thread
deep_states = self.get_deep_states()
assert isinstance(deep_states, list), f'Expecting a list here, not {deep_states}'
Expand Down
35 changes: 28 additions & 7 deletions flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,31 @@ def get_from_buffer(cls, topic):
@type topic: string
@param topic: The topic of interest.
"""
if not ProxySubscriberCached._topics[topic]['buffered']:
Logger.warning('Attempted to access buffer of non-buffered topic!')
return None
if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0:
return None
return ProxySubscriberCached._topics[topic]['msg_queue'].popleft()
if cls.is_available(topic):
if not ProxySubscriberCached._topics[topic]['buffered']:
Logger.warning('Attempted to access buffer of non-buffered topic!')
return None
if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0:
return None
return ProxySubscriberCached._topics[topic]['msg_queue'].popleft()
return None

@classmethod
def peek_at_buffer(cls, topic):
"""
Peek at the oldest buffered message of the given topic, but leave in queue.
@type topic: string
@param topic: The topic of interest.
"""
if cls.is_available(topic):
if not ProxySubscriberCached._topics[topic]['buffered']:
Logger.warning('Attempted to access buffer of non-buffered topic!')
return None
if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0:
return None
return ProxySubscriberCached._topics[topic]['msg_queue'][0]
return None

@classmethod
def has_msg(cls, topic):
Expand All @@ -320,7 +339,9 @@ def has_buffered(cls, topic):
@type topic: string
@param topic: The topic of interest.
"""
return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
if cls.is_available(topic):
return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0
return False

@classmethod
def remove_last_msg(cls, topic, clear_buffer=False):
Expand Down
3 changes: 2 additions & 1 deletion flexbe_core/test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ def test_manually_transitionable_state(self):
state._sub._callback(OutcomeRequest(target='invalid', outcome=1), Topics._CMD_TRANSITION_TOPIC)
outcome = self._execute(state)
self.assertIsNone(outcome)
self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['invalid', 'subject']))
# This state won't handle the transition request, so not message is expected
self.assertNoMessage(sub, fb_topic)
self.node.get_logger().info('test_manually_transitionable_state - OK! ')

def test_cross_combinations(self):
Expand Down

0 comments on commit 0de608b

Please sign in to comment.