From 392e320750ba6529d032904e43cda47d42a11136 Mon Sep 17 00:00:00 2001 From: Anthony Welte Date: Tue, 10 Sep 2024 18:28:58 +0200 Subject: [PATCH] Add support for topic QOS for ros2topic bw, delay and hz Signed-off-by: Anthony Welte --- ros2topic/ros2topic/api/__init__.py | 78 +++++++++++++++++++++++++++++ ros2topic/ros2topic/verb/bw.py | 18 ++++--- ros2topic/ros2topic/verb/delay.py | 19 ++++--- ros2topic/ros2topic/verb/echo.py | 70 ++------------------------ ros2topic/ros2topic/verb/hz.py | 16 ++++-- 5 files changed, 119 insertions(+), 82 deletions(-) diff --git a/ros2topic/ros2topic/api/__init__.py b/ros2topic/ros2topic/api/__init__.py index 7b5b4f94a..f7aa59be4 100644 --- a/ros2topic/ros2topic/api/__init__.py +++ b/ros2topic/ros2topic/api/__init__.py @@ -21,6 +21,9 @@ from rclpy.duration import Duration from rclpy.expand_topic_name import expand_topic_name +from rclpy.qos import QoSDurabilityPolicy +from rclpy.qos import QoSPresetProfiles +from rclpy.qos import QoSReliabilityPolicy from rclpy.topic_or_service_is_hidden import topic_or_service_is_hidden from rclpy.validate_full_topic_name import validate_full_topic_name from ros2cli.node.strategy import NodeStrategy @@ -253,3 +256,78 @@ def add_qos_arguments(parser: ArgumentParser, subscribe_or_publish: str, default help=( f'Quality of service liveliness lease duration setting to {subscribe_or_publish} ' 'with (overrides liveliness lease duration value of --qos-profile option')) + +def extract_qos_arguments(args): + class QosArgs: + pass + + qos = QosArgs() + qos.qos_profile = args.qos_profile + qos.qos_reliability = args.qos_reliability + qos.qos_durability = args.qos_durability + qos.qos_depth = args.qos_depth + qos.qos_history = args.qos_history + qos.qos_liveliness = args.qos_liveliness + qos.qos_liveliness_lease_duration_seconds = args.qos_liveliness_lease_duration_seconds + + return qos + +def choose_qos(node, topic_name: str, qos_args): + if (qos_args.qos_reliability is not None or + qos_args.qos_durability is not None or + qos_args.qos_depth is not None or + qos_args.qos_history is not None or + qos_args.qos_liveliness is not None or + qos_args.qos_liveliness_lease_duration_seconds is not None): + + return qos_profile_from_short_keys( + qos_args.qos_profile, + reliability=qos_args.qos_reliability, + durability=qos_args.qos_durability, + depth=qos_args.qos_depth, + history=qos_args.qos_history, + liveliness=qos_args.qos_liveliness, + liveliness_lease_duration_s=qos_args.qos_liveliness_lease_duration_seconds) + + qos_profile = QoSPresetProfiles.get_from_short_key(qos_args.qos_profile) + reliability_reliable_endpoints_count = 0 + durability_transient_local_endpoints_count = 0 + + pubs_info = node.get_publishers_info_by_topic(topic_name) + publishers_count = len(pubs_info) + if publishers_count == 0: + return qos_profile + + for info in pubs_info: + if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE): + reliability_reliable_endpoints_count += 1 + if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL): + durability_transient_local_endpoints_count += 1 + + # If all endpoints are reliable, ask for reliable + if reliability_reliable_endpoints_count == publishers_count: + qos_profile.reliability = QoSReliabilityPolicy.RELIABLE + else: + if reliability_reliable_endpoints_count > 0: + print( + 'Some, but not all, publishers are offering ' + 'QoSReliabilityPolicy.RELIABLE. Falling back to ' + 'QoSReliabilityPolicy.BEST_EFFORT as it will connect ' + 'to all publishers' + ) + qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT + + # If all endpoints are transient_local, ask for transient_local + if durability_transient_local_endpoints_count == publishers_count: + qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL + else: + if durability_transient_local_endpoints_count > 0: + print( + 'Some, but not all, publishers are offering ' + 'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to ' + 'QoSDurabilityPolicy.VOLATILE as it will connect ' + 'to all publishers' + ) + qos_profile.durability = QoSDurabilityPolicy.VOLATILE + + return qos_profile diff --git a/ros2topic/ros2topic/verb/bw.py b/ros2topic/ros2topic/verb/bw.py index b1ad3f1cb..3a4aa815c 100644 --- a/ros2topic/ros2topic/verb/bw.py +++ b/ros2topic/ros2topic/verb/bw.py @@ -34,9 +34,11 @@ import traceback import rclpy -from rclpy.qos import qos_profile_sensor_data from ros2cli.node.direct import add_arguments as add_direct_node_arguments from ros2cli.node.direct import DirectNode +from ros2topic.api import add_qos_arguments +from ros2topic.api import extract_qos_arguments +from ros2topic.api import choose_qos from ros2topic.api import get_msg_class from ros2topic.api import positive_int from ros2topic.api import TopicNameCompleter @@ -62,19 +64,21 @@ class BwVerb(VerbExtension): def add_arguments(self, parser, cli_name): arg = parser.add_argument( - 'topic', + 'topic_name', help='Topic name to monitor for bandwidth utilization') arg.completer = TopicNameCompleter( include_hidden_topics_key='include_hidden_topics') + add_qos_arguments(parser, 'subscribe', 'sensor_data') parser.add_argument( - '--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE, + '--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE, help='maximum window size, in # of messages, for calculating rate ' f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW') add_direct_node_arguments(parser) def main(self, *, args): + qos_args = extract_qos_arguments(args) with DirectNode(args) as node: - _rostopic_bw(node.node, args.topic, window_size=args.window) + _rostopic_bw(node.node, args.topic_name, qos_args, window_size=args.window_size) class ROSTopicBandwidth(object): @@ -150,7 +154,7 @@ def print_bw(self): print(f'{bw} from {n} messages\n\tMessage size mean: {mean} min: {min_s} max: {max_s}') -def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE): +def _rostopic_bw(node, topic, qos, window_size=DEFAULT_WINDOW_SIZE): """Periodically print the received bandwidth of a topic to console until shutdown.""" # pause bw until topic is published msg_class = get_msg_class(node, topic, blocking=True, include_hidden_topics=True) @@ -158,12 +162,14 @@ def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE): node.destroy_node() return + qos_profile = choose_qos(node, topic_name=topic, qos_args=qos) + rt = ROSTopicBandwidth(node, window_size) node.create_subscription( msg_class, topic, rt.callback, - qos_profile_sensor_data, + qos_profile, raw=True ) diff --git a/ros2topic/ros2topic/verb/delay.py b/ros2topic/ros2topic/verb/delay.py index da5e63e06..0798cbc17 100644 --- a/ros2topic/ros2topic/verb/delay.py +++ b/ros2topic/ros2topic/verb/delay.py @@ -37,6 +37,9 @@ from rclpy.time import Time from ros2cli.node.direct import add_arguments as add_direct_node_arguments from ros2cli.node.direct import DirectNode +from ros2topic.api import add_qos_arguments +from ros2topic.api import extract_qos_arguments +from ros2topic.api import choose_qos from ros2topic.api import get_msg_class from ros2topic.api import positive_int from ros2topic.api import TopicNameCompleter @@ -50,12 +53,13 @@ class DelayVerb(VerbExtension): def add_arguments(self, parser, cli_name): arg = parser.add_argument( - 'topic', + 'topic_name', help='Topic name to calculate the delay for') arg.completer = TopicNameCompleter( include_hidden_topics_key='include_hidden_topics') + add_qos_arguments(parser, 'subscribe', 'sensor_data') parser.add_argument( - '--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE, + '--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE, help='window size, in # of messages, for calculating rate, ' 'string to (default: %d)' % DEFAULT_WINDOW_SIZE) add_direct_node_arguments(parser) @@ -65,9 +69,9 @@ def main(self, *, args): def main(args): + qos_args = extract_qos_arguments(args) with DirectNode(args) as node: - _rostopic_delay( - node.node, args.topic, window_size=args.window) + _rostopic_delay(node.node, args.topic_name, qos_args, window_size=args.window_size) class ROSTopicDelay(object): @@ -155,11 +159,12 @@ def print_delay(self): % (delay * 1e-9, min_delta * 1e-9, max_delta * 1e-9, std_dev * 1e-9, window)) -def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE): +def _rostopic_delay(node, topic, qos, window_size=DEFAULT_WINDOW_SIZE): """ Periodically print the publishing delay of a topic to console until shutdown. :param topic: topic name, ``str`` + :param qos: qos configuration of the subscriber :param window_size: number of messages to average over, ``unsigned_int`` :param blocking: pause delay until topic is published, ``bool`` """ @@ -170,12 +175,14 @@ def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE): node.destroy_node() return + qos_profile = choose_qos(node, topic_name=topic, qos_args=qos) + rt = ROSTopicDelay(node, window_size) node.create_subscription( msg_class, topic, rt.callback_delay, - qos_profile_sensor_data) + qos_profile) timer = node.create_timer(1, rt.print_delay) while rclpy.ok(): diff --git a/ros2topic/ros2topic/verb/echo.py b/ros2topic/ros2topic/verb/echo.py index 7f669c17c..e8e415f96 100644 --- a/ros2topic/ros2topic/verb/echo.py +++ b/ros2topic/ros2topic/verb/echo.py @@ -19,18 +19,16 @@ from rclpy.event_handler import SubscriptionEventCallbacks from rclpy.event_handler import UnsupportedEventTypeError from rclpy.node import Node -from rclpy.qos import QoSDurabilityPolicy -from rclpy.qos import QoSPresetProfiles from rclpy.qos import QoSProfile -from rclpy.qos import QoSReliabilityPolicy from rclpy.task import Future from ros2cli.helpers import unsigned_int from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments from ros2cli.node.strategy import NodeStrategy from ros2topic.api import add_qos_arguments +from ros2topic.api import extract_qos_arguments +from ros2topic.api import choose_qos from ros2topic.api import get_msg_class from ros2topic.api import positive_float -from ros2topic.api import qos_profile_from_short_keys from ros2topic.api import TopicNameCompleter from ros2topic.verb import VerbExtension from rosidl_runtime_py import message_to_csv @@ -108,67 +106,6 @@ def add_arguments(self, parser, cli_name): '--include-message-info', '-i', action='store_true', help='Shows the associated message info.') - def choose_qos(self, node, args): - - if (args.qos_reliability is not None or - args.qos_durability is not None or - args.qos_depth is not None or - args.qos_history is not None or - args.qos_liveliness is not None or - args.qos_liveliness_lease_duration_seconds is not None): - - return qos_profile_from_short_keys( - args.qos_profile, - reliability=args.qos_reliability, - durability=args.qos_durability, - depth=args.qos_depth, - history=args.qos_history, - liveliness=args.qos_liveliness, - liveliness_lease_duration_s=args.qos_liveliness_lease_duration_seconds) - - qos_profile = QoSPresetProfiles.get_from_short_key(args.qos_profile) - reliability_reliable_endpoints_count = 0 - durability_transient_local_endpoints_count = 0 - - pubs_info = node.get_publishers_info_by_topic(args.topic_name) - publishers_count = len(pubs_info) - if publishers_count == 0: - return qos_profile - - for info in pubs_info: - if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE): - reliability_reliable_endpoints_count += 1 - if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL): - durability_transient_local_endpoints_count += 1 - - # If all endpoints are reliable, ask for reliable - if reliability_reliable_endpoints_count == publishers_count: - qos_profile.reliability = QoSReliabilityPolicy.RELIABLE - else: - if reliability_reliable_endpoints_count > 0: - print( - 'Some, but not all, publishers are offering ' - 'QoSReliabilityPolicy.RELIABLE. Falling back to ' - 'QoSReliabilityPolicy.BEST_EFFORT as it will connect ' - 'to all publishers' - ) - qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT - - # If all endpoints are transient_local, ask for transient_local - if durability_transient_local_endpoints_count == publishers_count: - qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL - else: - if durability_transient_local_endpoints_count > 0: - print( - 'Some, but not all, publishers are offering ' - 'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to ' - 'QoSDurabilityPolicy.VOLATILE as it will connect ' - 'to all publishers' - ) - qos_profile.durability = QoSDurabilityPolicy.VOLATILE - - return qos_profile - def main(self, *, args): self.csv = args.csv @@ -198,7 +135,8 @@ def main(self, *, args): with NodeStrategy(args) as node: - qos_profile = self.choose_qos(node, args) + qos = extract_qos_arguments(args) + qos_profile = choose_qos(node, topic_name=args.topic_name, qos_args=qos) if args.message_type is None: message_type = get_msg_class( diff --git a/ros2topic/ros2topic/verb/hz.py b/ros2topic/ros2topic/verb/hz.py index b04b2edb2..303732e0e 100644 --- a/ros2topic/ros2topic/verb/hz.py +++ b/ros2topic/ros2topic/verb/hz.py @@ -41,9 +41,11 @@ from rclpy.clock import Clock from rclpy.clock import ClockType from rclpy.executors import ExternalShutdownException -from rclpy.qos import qos_profile_sensor_data from ros2cli.node.direct import add_arguments as add_direct_node_arguments from ros2cli.node.direct import DirectNode +from ros2topic.api import add_qos_arguments +from ros2topic.api import extract_qos_arguments +from ros2topic.api import choose_qos from ros2topic.api import get_msg_class from ros2topic.api import positive_int from ros2topic.api import TopicNameCompleter @@ -62,6 +64,7 @@ def add_arguments(self, parser, cli_name): help="Names of the ROS topic to listen to (e.g. '/chatter')") arg.completer = TopicNameCompleter( include_hidden_topics_key='include_hidden_topics') + add_qos_arguments(parser, 'subscribe', 'sensor_data') parser.add_argument( '--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE, @@ -93,8 +96,10 @@ def eval_fn(m): else: filter_expr = None + qos_args = extract_qos_arguments(args) + with DirectNode(args) as node: - _rostopic_hz(node.node, topics, window_size=args.window_size, filter_expr=filter_expr, + _rostopic_hz(node.node, topics, qos_args, window_size=args.window_size, filter_expr=filter_expr, use_wtime=args.use_wtime) @@ -278,11 +283,12 @@ def _get_ascii_table(header, cols): return table -def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False): +def _rostopic_hz(node, topics, qos, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False): """ Periodically print the publishing rate of a topic to console until shutdown. :param topics: list of topic names, ``list`` of ``str`` + :param qos: qos configuration of the subscriber :param window_size: number of messages to average over, -1 for infinite, ``int`` :param filter_expr: Python filter expression that is called with m, the message instance """ @@ -299,11 +305,13 @@ def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None print('WARNING: failed to find message type for topic [%s]' % topic) continue + qos_profile = choose_qos(node, topic_name=topic, qos_args=qos) + node.create_subscription( msg_class, topic, functools.partial(rt.callback_hz, topic=topic), - qos_profile_sensor_data) + qos_profile) if topics_len > 1: print('Subscribed to [%s]' % topic)