diff --git a/flexbe_core/flexbe_core/core/concurrency_container.py b/flexbe_core/flexbe_core/core/concurrency_container.py index 190da85..1f3aeec 100644 --- a/flexbe_core/flexbe_core/core/concurrency_container.py +++ b/flexbe_core/flexbe_core/core/concurrency_container.py @@ -99,12 +99,16 @@ def _execute_current_state(self): self._manual_transition_requested = None # Logger.localinfo(f"-concurrency container '{self.name}' is_controlled={self._is_controlled}" - # f' has transition request={self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC)}') + # f" with {len(self._states)} states is entering={self._entering} ") if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC): # Special handling in concurrency container - can be either CC or one of several internal states. - command_msg = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) + command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) if command_msg.target == self.name: + cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer + assert cmd_msg2 is command_msg, "Unexpected change in CMD_TRANSITION_TOPIC buffer" + Logger.localinfo(f"-concurrency container '{self.name}' is handling the transition cmd msg={command_msg}") + self._force_transition = True outcome = self.outcomes[command_msg.outcome] self._manual_transition_requested = outcome @@ -122,40 +126,46 @@ def _execute_current_state(self): self._last_outcome = outcome return outcome else: - Logger.localinfo(f"concurrency container '{self.name}' ") + Logger.localinfo(f"concurrency container '{self.name}' - storing {command_msg} transition request") self._manual_transition_requested = command_msg for state in self._states: if state.name in self._returned_outcomes and self._returned_outcomes[state.name] is not None: + # print(f" in current {self._name} : state '{state.name}' is already done.", flush=True) continue # already done with executing - if self._manual_transition_requested is not None and self._manual_transition_requested.target == state.name: - # Transition request applies to this state - # @TODO - Should we be using path not name here? - command_msg = self._manual_transition_requested - self._manual_transition_requested = None # Reset at this level - - if 0 <= command_msg.outcome < len(state.outcomes): - state._force_transition = True - outcome = state.outcomes[command_msg.outcome] - state._manual_transition_requested = outcome - self._returned_outcomes[state.name] = outcome - with UserData(reference=self._userdata, remap=self._remappings[state.name], - input_keys=state.input_keys, output_keys=state.output_keys) as userdata: - state.on_exit(userdata) - - # ConcurrencyContainer bypasses normal operatable state handling of manual request, so do that here - state._publish_outcome(outcome) - - self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, - CommandFeedback(command='transition', - args=[command_msg.target, state.name])) - Logger.localerr(f'--> Manually triggered outcome {outcome} ({command_msg.outcome}) ' - f"of state '{state.name}' from inside concurrency {self.name}") - continue - else: - Logger.localerr(f"--> Invalid outcome {command_msg.outcome} request for state '{state.name}' " - f"from inside concurrency '{self.name}'\n{state.outcomes}") + if self._manual_transition_requested is not None: + if self._manual_transition_requested.target == state.name: + # Transition request applies to this state + # @TODO - Should we be using path not name here? + command_msg = self._manual_transition_requested + cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) # Using here, so clear from buffer + assert cmd_msg2 is command_msg, "Something is up with handling of buffer for CMD_TRANSITION_TOPIC" + Logger.localinfo(f"-concurrency container '{self.name}' state '{state.name}' is handling " + f"the cmd msg={command_msg}") + self._manual_transition_requested = None # Reset at this level + + if 0 <= command_msg.outcome < len(state.outcomes): + state._force_transition = True + outcome = state.outcomes[command_msg.outcome] + state._manual_transition_requested = outcome + self._returned_outcomes[state.name] = outcome + with UserData(reference=self._userdata, remap=self._remappings[state.name], + input_keys=state.input_keys, output_keys=state.output_keys) as userdata: + state.on_exit(userdata) + + # ConcurrencyContainer bypasses normal operatable state handling of manual request, so do that here + state._publish_outcome(outcome) + + self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, + CommandFeedback(command='transition', + args=[command_msg.target, state.name])) + Logger.localerr(f'--> Manually triggered outcome {outcome} ({command_msg.outcome}) ' + f"of state '{state.name}' from inside concurrency {self.name}") + continue + else: + Logger.localerr(f"--> Invalid outcome {command_msg.outcome} request for state '{state.name}' " + f"from inside concurrency '{self.name}'\n{state.outcomes}") if (PriorityContainer.active_container is not None and not all(a == s for a, s in zip(PriorityContainer.active_container.split('/'), @@ -173,7 +183,8 @@ def _execute_current_state(self): continue # other state has priority - if state.sleep_duration <= 0: # ready to execute + if state.sleep_duration <= 0 or self._manual_transition_requested is not None: # ready to execute + # Execute if we have a pending manual transition command or state tic rate elapsed out = self._execute_single_state(state) self._returned_outcomes[state.name] = out @@ -234,7 +245,6 @@ def _execute_current_state(self): self._force_transition = False self._last_outcome = outcome - # Logger.localinfo(f"ConcurrencyContainer '{self.name}' returning outcome '{outcome}' (request inner sync)") return outcome def _execute_single_state(self, state, force_exit=False): diff --git a/flexbe_core/flexbe_core/core/manually_transitionable_state.py b/flexbe_core/flexbe_core/core/manually_transitionable_state.py index df134d1..822c2bf 100644 --- a/flexbe_core/flexbe_core/core/manually_transitionable_state.py +++ b/flexbe_core/flexbe_core/core/manually_transitionable_state.py @@ -57,18 +57,22 @@ def __init__(self, *args, **kwargs): def _manually_transitionable_execute(self, *args, **kwargs): self._manual_transition_requested = None if self._is_controlled and self._sub.has_buffered(Topics._CMD_TRANSITION_TOPIC): - command_msg = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) - self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, - CommandFeedback(command='transition', args=[command_msg.target, self.name])) - if command_msg.target != self.name: - Logger.logwarn("Requested outcome for state '%s' but active state is '%s'" % - (command_msg.target, self.name)) - else: + command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) + if command_msg.target == self.name: + cmd_msg2 = self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) + assert cmd_msg2 is command_msg, "Unexpected change in CMD_TRANSITION_TOPIC buffer" + self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, + CommandFeedback(command='transition', args=[command_msg.target, self.name])) + self._force_transition = True outcome = self.outcomes[command_msg.outcome] self._manual_transition_requested = outcome Logger.localinfo("--> Manually triggered outcome '%s' of state '%s'" % (outcome, self.name)) return outcome + else: + Logger.loginfo(f"Requested outcome for state '{command_msg.target}' " + f" but this active state is '{self.name}' - keep looking for potential nested state") + # otherwise, return the normal outcome self._force_transition = False return self.__execute(*args, **kwargs) diff --git a/flexbe_core/flexbe_core/core/preemptable_state_machine.py b/flexbe_core/flexbe_core/core/preemptable_state_machine.py index 9f8b5aa..c0d7340 100644 --- a/flexbe_core/flexbe_core/core/preemptable_state_machine.py +++ b/flexbe_core/flexbe_core/core/preemptable_state_machine.py @@ -36,6 +36,7 @@ from flexbe_core.core.preemptable_state import PreemptableState from flexbe_core.core.topics import Topics from flexbe_core.logger import Logger +from flexbe_msgs.msg import CommandFeedback import rclpy @@ -84,8 +85,21 @@ def spin(self, userdata=None): """Spin the execute loop for preemptable portion.""" outcome = None while rclpy.ok(): + command_msg = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) + outcome = self.execute(userdata) + if command_msg is not None: + command_msg2 = self._sub.peek_at_buffer(Topics._CMD_TRANSITION_TOPIC) + if command_msg is command_msg2: + # Execute loop went through process and did not handle the requested transition + Logger.loginfo(f"'{self.name}' did not handle transition " + f"cmd='{command_msg.target}' ({command_msg.outcome}) - toss it!") + self._pub.publish(Topics._CMD_FEEDBACK_TOPIC, + CommandFeedback(command='transition', + args=['invalid', command_msg.target])) + self._sub.get_from_buffer(Topics._CMD_TRANSITION_TOPIC) + # Store the information for safely passing to heartbeat thread deep_states = self.get_deep_states() assert isinstance(deep_states, list), f'Expecting a list here, not {deep_states}' diff --git a/flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py b/flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py index 85acbce..77765bc 100644 --- a/flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py +++ b/flexbe_core/flexbe_core/proxy/proxy_subscriber_cached.py @@ -293,12 +293,31 @@ def get_from_buffer(cls, topic): @type topic: string @param topic: The topic of interest. """ - if not ProxySubscriberCached._topics[topic]['buffered']: - Logger.warning('Attempted to access buffer of non-buffered topic!') - return None - if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0: - return None - return ProxySubscriberCached._topics[topic]['msg_queue'].popleft() + if cls.is_available(topic): + if not ProxySubscriberCached._topics[topic]['buffered']: + Logger.warning('Attempted to access buffer of non-buffered topic!') + return None + if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0: + return None + return ProxySubscriberCached._topics[topic]['msg_queue'].popleft() + return None + + @classmethod + def peek_at_buffer(cls, topic): + """ + Peek at the oldest buffered message of the given topic, but leave in queue. + + @type topic: string + @param topic: The topic of interest. + """ + if cls.is_available(topic): + if not ProxySubscriberCached._topics[topic]['buffered']: + Logger.warning('Attempted to access buffer of non-buffered topic!') + return None + if len(ProxySubscriberCached._topics[topic]['msg_queue']) == 0: + return None + return ProxySubscriberCached._topics[topic]['msg_queue'][0] + return None @classmethod def has_msg(cls, topic): @@ -320,7 +339,9 @@ def has_buffered(cls, topic): @type topic: string @param topic: The topic of interest. """ - return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0 + if cls.is_available(topic): + return len(ProxySubscriberCached._topics[topic]['msg_queue']) > 0 + return False @classmethod def remove_last_msg(cls, topic, clear_buffer=False): diff --git a/flexbe_core/test/test_core.py b/flexbe_core/test/test_core.py index 8d1dd0b..660cbf7 100755 --- a/flexbe_core/test/test_core.py +++ b/flexbe_core/test/test_core.py @@ -476,7 +476,8 @@ def test_manually_transitionable_state(self): state._sub._callback(OutcomeRequest(target='invalid', outcome=1), Topics._CMD_TRANSITION_TOPIC) outcome = self._execute(state) self.assertIsNone(outcome) - self.assertMessage(sub, fb_topic, CommandFeedback(command='transition', args=['invalid', 'subject'])) + # This state won't handle the transition request, so not message is expected + self.assertNoMessage(sub, fb_topic) self.node.get_logger().info('test_manually_transitionable_state - OK! ') def test_cross_combinations(self):