Skip to content

Commit

Permalink
revert: executor to standard rclpy.MultiThreadedExecutor
Browse files Browse the repository at this point in the history
- with current method based on callbacks the default executor is good
  • Loading branch information
boczekbartek committed Dec 20, 2024
1 parent 5ce3e8a commit da1e490
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 53 deletions.
7 changes: 3 additions & 4 deletions src/rai/rai/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
)
from std_srvs.srv import Trigger

import rai.utils.ros
from rai.agents.state_based import Report, State, create_state_based_agent
from rai.messages import HumanMultimodalMessage
from rai.tools.ros.native import Ros2BaseTool
Expand Down Expand Up @@ -143,7 +142,7 @@ def ros2_build_msg(msg_type: str, msg_args: Dict[str, Any]) -> Tuple[object, Typ
return msg, msg_cls


class AsyncRos2ActionClient:
class Ros2ActionsHelper:
def __init__(self, node: rclpy.node.Node):
self.node = node

Expand Down Expand Up @@ -414,7 +413,7 @@ def __init__(

# ---------- ROS helpers ----------
self.ros_discovery_info = NodeDiscovery(self, allowlist=allowlist)
self.async_action_client = AsyncRos2ActionClient(self)
self.async_action_client = Ros2ActionsHelper(self)
self.topics_handler = Ros2TopicsHandler(
self, self.callback_group, self.ros_discovery_info
)
Expand Down Expand Up @@ -447,7 +446,7 @@ def cancel_task(self) -> Union[str, bool]:

# ------------- other methods -------------
def spin(self):
executor = rai.utils.ros.MultiThreadedExecutorFixed()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(self)
executor.spin()
rclpy.shutdown()
Expand Down
53 changes: 4 additions & 49 deletions src/rai/rai/utils/ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Dict, List, Optional, Tuple, Union
from typing import Callable, Dict, List, Optional, Tuple

import rclpy.callback_groups
import rclpy.node
from rclpy.action.graph import get_action_names_and_types
from rclpy.executors import (
ConditionReachedException,
ExternalShutdownException,
MultiThreadedExecutor,
ShutdownException,
TimeoutException,
TimeoutObject,
)


class NodeDiscovery:
Expand All @@ -50,8 +42,9 @@ def __init__(
callback_group=rclpy.callback_groups.MutuallyExclusiveCallbackGroup(),
)

# callables (e.g. fun(x: NodeDiscovery)) that will receive the discovery info on every timer callback
# allows to register other entities that needs up-to-date discovery info
# callables (e.g. fun(x: NodeDiscovery)) that will receive the discovery
# info on every timer callback. This allows to register other entities that
# needs up-to-date discovery info
if setters is None:
self.setters = list()
else:
Expand Down Expand Up @@ -99,41 +92,3 @@ def dict(self):
"services_and_types": self.services_and_types,
"actions_and_types": self.actions_and_types,
}


class MultiThreadedExecutorFixed(MultiThreadedExecutor):
"""
Adresses a comment:
```python
# make a copy of the list that we iterate over while modifying it
# (https://stackoverflow.com/q/1207406/3753684)
```
from the rclpy implementation
"""

def _spin_once_impl(
self,
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
wait_condition: Callable[[], bool] = lambda: False,
) -> None:
try:
handler, entity, node = self.wait_for_ready_callbacks(
timeout_sec, None, wait_condition
)
except ExternalShutdownException:
pass
except ShutdownException:
pass
except TimeoutException:
pass
except ConditionReachedException:
pass
else:
self._executor.submit(handler)
self._futures.append(handler)
futures = self._futures.copy()
for future in futures[:]:
if future.done():
futures.remove(future)
future.result() # raise any exceptions
self._futures = futures

0 comments on commit da1e490

Please sign in to comment.