From da1e4908be76c2988b57b8de02387cc4248aca69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Boczek?= Date: Thu, 19 Dec 2024 14:12:40 +0100 Subject: [PATCH] revert: executor to standard `rclpy.MultiThreadedExecutor` - with current method based on callbacks the default executor is good --- src/rai/rai/node.py | 7 +++--- src/rai/rai/utils/ros.py | 53 +++------------------------------------- 2 files changed, 7 insertions(+), 53 deletions(-) diff --git a/src/rai/rai/node.py b/src/rai/rai/node.py index cf848666..f51832e3 100644 --- a/src/rai/rai/node.py +++ b/src/rai/rai/node.py @@ -44,7 +44,6 @@ ) from std_srvs.srv import Trigger -import rai.utils.ros from rai.agents.state_based import Report, State, create_state_based_agent from rai.messages import HumanMultimodalMessage from rai.tools.ros.native import Ros2BaseTool @@ -143,7 +142,7 @@ def ros2_build_msg(msg_type: str, msg_args: Dict[str, Any]) -> Tuple[object, Typ return msg, msg_cls -class AsyncRos2ActionClient: +class Ros2ActionsHelper: def __init__(self, node: rclpy.node.Node): self.node = node @@ -414,7 +413,7 @@ def __init__( # ---------- ROS helpers ---------- self.ros_discovery_info = NodeDiscovery(self, allowlist=allowlist) - self.async_action_client = AsyncRos2ActionClient(self) + self.async_action_client = Ros2ActionsHelper(self) self.topics_handler = Ros2TopicsHandler( self, self.callback_group, self.ros_discovery_info ) @@ -447,7 +446,7 @@ def cancel_task(self) -> Union[str, bool]: # ------------- other methods ------------- def spin(self): - executor = rai.utils.ros.MultiThreadedExecutorFixed() + executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(self) executor.spin() rclpy.shutdown() diff --git a/src/rai/rai/utils/ros.py b/src/rai/rai/utils/ros.py index 22092284..b692e31a 100644 --- a/src/rai/rai/utils/ros.py +++ b/src/rai/rai/utils/ros.py @@ -12,19 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple import rclpy.callback_groups import rclpy.node from rclpy.action.graph import get_action_names_and_types -from rclpy.executors import ( - ConditionReachedException, - ExternalShutdownException, - MultiThreadedExecutor, - ShutdownException, - TimeoutException, - TimeoutObject, -) class NodeDiscovery: @@ -50,8 +42,9 @@ def __init__( callback_group=rclpy.callback_groups.MutuallyExclusiveCallbackGroup(), ) - # callables (e.g. fun(x: NodeDiscovery)) that will receive the discovery info on every timer callback - # allows to register other entities that needs up-to-date discovery info + # callables (e.g. fun(x: NodeDiscovery)) that will receive the discovery + # info on every timer callback. This allows to register other entities that + # needs up-to-date discovery info if setters is None: self.setters = list() else: @@ -99,41 +92,3 @@ def dict(self): "services_and_types": self.services_and_types, "actions_and_types": self.actions_and_types, } - - -class MultiThreadedExecutorFixed(MultiThreadedExecutor): - """ - Adresses a comment: - ```python - # make a copy of the list that we iterate over while modifying it - # (https://stackoverflow.com/q/1207406/3753684) - ``` - from the rclpy implementation - """ - - def _spin_once_impl( - self, - timeout_sec: Optional[Union[float, TimeoutObject]] = None, - wait_condition: Callable[[], bool] = lambda: False, - ) -> None: - try: - handler, entity, node = self.wait_for_ready_callbacks( - timeout_sec, None, wait_condition - ) - except ExternalShutdownException: - pass - except ShutdownException: - pass - except TimeoutException: - pass - except ConditionReachedException: - pass - else: - self._executor.submit(handler) - self._futures.append(handler) - futures = self._futures.copy() - for future in futures[:]: - if future.done(): - futures.remove(future) - future.result() # raise any exceptions - self._futures = futures