Skip to content

Commit

Permalink
Add support for topic QOS for ros2topic bw, delay and hz
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Welte <[email protected]>
  • Loading branch information
TonyWelte committed Sep 17, 2024
1 parent e2fddb2 commit 392e320
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 82 deletions.
78 changes: 78 additions & 0 deletions ros2topic/ros2topic/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

from rclpy.duration import Duration
from rclpy.expand_topic_name import expand_topic_name
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSPresetProfiles
from rclpy.qos import QoSReliabilityPolicy
from rclpy.topic_or_service_is_hidden import topic_or_service_is_hidden
from rclpy.validate_full_topic_name import validate_full_topic_name
from ros2cli.node.strategy import NodeStrategy
Expand Down Expand Up @@ -253,3 +256,78 @@ def add_qos_arguments(parser: ArgumentParser, subscribe_or_publish: str, default
help=(
f'Quality of service liveliness lease duration setting to {subscribe_or_publish} '
'with (overrides liveliness lease duration value of --qos-profile option'))

def extract_qos_arguments(args):
class QosArgs:
pass

qos = QosArgs()
qos.qos_profile = args.qos_profile
qos.qos_reliability = args.qos_reliability
qos.qos_durability = args.qos_durability
qos.qos_depth = args.qos_depth
qos.qos_history = args.qos_history
qos.qos_liveliness = args.qos_liveliness
qos.qos_liveliness_lease_duration_seconds = args.qos_liveliness_lease_duration_seconds

return qos

def choose_qos(node, topic_name: str, qos_args):
if (qos_args.qos_reliability is not None or
qos_args.qos_durability is not None or
qos_args.qos_depth is not None or
qos_args.qos_history is not None or
qos_args.qos_liveliness is not None or
qos_args.qos_liveliness_lease_duration_seconds is not None):

return qos_profile_from_short_keys(
qos_args.qos_profile,
reliability=qos_args.qos_reliability,
durability=qos_args.qos_durability,
depth=qos_args.qos_depth,
history=qos_args.qos_history,
liveliness=qos_args.qos_liveliness,
liveliness_lease_duration_s=qos_args.qos_liveliness_lease_duration_seconds)

qos_profile = QoSPresetProfiles.get_from_short_key(qos_args.qos_profile)
reliability_reliable_endpoints_count = 0
durability_transient_local_endpoints_count = 0

pubs_info = node.get_publishers_info_by_topic(topic_name)
publishers_count = len(pubs_info)
if publishers_count == 0:
return qos_profile

for info in pubs_info:
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
reliability_reliable_endpoints_count += 1
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
durability_transient_local_endpoints_count += 1

# If all endpoints are reliable, ask for reliable
if reliability_reliable_endpoints_count == publishers_count:
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
else:
if reliability_reliable_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSReliabilityPolicy.RELIABLE. Falling back to '
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
'to all publishers'
)
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT

# If all endpoints are transient_local, ask for transient_local
if durability_transient_local_endpoints_count == publishers_count:
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
else:
if durability_transient_local_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
'QoSDurabilityPolicy.VOLATILE as it will connect '
'to all publishers'
)
qos_profile.durability = QoSDurabilityPolicy.VOLATILE

return qos_profile
18 changes: 12 additions & 6 deletions ros2topic/ros2topic/verb/bw.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import traceback

import rclpy
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import add_qos_arguments
from ros2topic.api import extract_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import get_msg_class
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand All @@ -62,19 +64,21 @@ class BwVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'topic',
'topic_name',
help='Topic name to monitor for bandwidth utilization')
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
add_qos_arguments(parser, 'subscribe', 'sensor_data')
parser.add_argument(
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
help='maximum window size, in # of messages, for calculating rate '
f'(default: {DEFAULT_WINDOW_SIZE})', metavar='WINDOW')
add_direct_node_arguments(parser)

def main(self, *, args):
qos_args = extract_qos_arguments(args)
with DirectNode(args) as node:
_rostopic_bw(node.node, args.topic, window_size=args.window)
_rostopic_bw(node.node, args.topic_name, qos_args, window_size=args.window_size)


class ROSTopicBandwidth(object):
Expand Down Expand Up @@ -150,20 +154,22 @@ def print_bw(self):
print(f'{bw} from {n} messages\n\tMessage size mean: {mean} min: {min_s} max: {max_s}')


def _rostopic_bw(node, topic, window_size=DEFAULT_WINDOW_SIZE):
def _rostopic_bw(node, topic, qos, window_size=DEFAULT_WINDOW_SIZE):
"""Periodically print the received bandwidth of a topic to console until shutdown."""
# pause bw until topic is published
msg_class = get_msg_class(node, topic, blocking=True, include_hidden_topics=True)
if msg_class is None:
node.destroy_node()
return

qos_profile = choose_qos(node, topic_name=topic, qos_args=qos)

rt = ROSTopicBandwidth(node, window_size)
node.create_subscription(
msg_class,
topic,
rt.callback,
qos_profile_sensor_data,
qos_profile,
raw=True
)

Expand Down
19 changes: 13 additions & 6 deletions ros2topic/ros2topic/verb/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
from rclpy.time import Time
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import add_qos_arguments
from ros2topic.api import extract_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import get_msg_class
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand All @@ -50,12 +53,13 @@ class DelayVerb(VerbExtension):

def add_arguments(self, parser, cli_name):
arg = parser.add_argument(
'topic',
'topic_name',
help='Topic name to calculate the delay for')
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
add_qos_arguments(parser, 'subscribe', 'sensor_data')
parser.add_argument(
'--window', '-w', type=positive_int, default=DEFAULT_WINDOW_SIZE,
'--window', '-w', dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
help='window size, in # of messages, for calculating rate, '
'string to (default: %d)' % DEFAULT_WINDOW_SIZE)
add_direct_node_arguments(parser)
Expand All @@ -65,9 +69,9 @@ def main(self, *, args):


def main(args):
qos_args = extract_qos_arguments(args)
with DirectNode(args) as node:
_rostopic_delay(
node.node, args.topic, window_size=args.window)
_rostopic_delay(node.node, args.topic_name, qos_args, window_size=args.window_size)


class ROSTopicDelay(object):
Expand Down Expand Up @@ -155,11 +159,12 @@ def print_delay(self):
% (delay * 1e-9, min_delta * 1e-9, max_delta * 1e-9, std_dev * 1e-9, window))


def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE):
def _rostopic_delay(node, topic, qos, window_size=DEFAULT_WINDOW_SIZE):
"""
Periodically print the publishing delay of a topic to console until shutdown.
:param topic: topic name, ``str``
:param qos: qos configuration of the subscriber
:param window_size: number of messages to average over, ``unsigned_int``
:param blocking: pause delay until topic is published, ``bool``
"""
Expand All @@ -170,12 +175,14 @@ def _rostopic_delay(node, topic, window_size=DEFAULT_WINDOW_SIZE):
node.destroy_node()
return

qos_profile = choose_qos(node, topic_name=topic, qos_args=qos)

rt = ROSTopicDelay(node, window_size)
node.create_subscription(
msg_class,
topic,
rt.callback_delay,
qos_profile_sensor_data)
qos_profile)

timer = node.create_timer(1, rt.print_delay)
while rclpy.ok():
Expand Down
70 changes: 4 additions & 66 deletions ros2topic/ros2topic/verb/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@
from rclpy.event_handler import SubscriptionEventCallbacks
from rclpy.event_handler import UnsupportedEventTypeError
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSPresetProfiles
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.task import Future
from ros2cli.helpers import unsigned_int
from ros2cli.node.strategy import add_arguments as add_strategy_node_arguments
from ros2cli.node.strategy import NodeStrategy
from ros2topic.api import add_qos_arguments
from ros2topic.api import extract_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import get_msg_class
from ros2topic.api import positive_float
from ros2topic.api import qos_profile_from_short_keys
from ros2topic.api import TopicNameCompleter
from ros2topic.verb import VerbExtension
from rosidl_runtime_py import message_to_csv
Expand Down Expand Up @@ -108,67 +106,6 @@ def add_arguments(self, parser, cli_name):
'--include-message-info', '-i', action='store_true',
help='Shows the associated message info.')

def choose_qos(self, node, args):

if (args.qos_reliability is not None or
args.qos_durability is not None or
args.qos_depth is not None or
args.qos_history is not None or
args.qos_liveliness is not None or
args.qos_liveliness_lease_duration_seconds is not None):

return qos_profile_from_short_keys(
args.qos_profile,
reliability=args.qos_reliability,
durability=args.qos_durability,
depth=args.qos_depth,
history=args.qos_history,
liveliness=args.qos_liveliness,
liveliness_lease_duration_s=args.qos_liveliness_lease_duration_seconds)

qos_profile = QoSPresetProfiles.get_from_short_key(args.qos_profile)
reliability_reliable_endpoints_count = 0
durability_transient_local_endpoints_count = 0

pubs_info = node.get_publishers_info_by_topic(args.topic_name)
publishers_count = len(pubs_info)
if publishers_count == 0:
return qos_profile

for info in pubs_info:
if (info.qos_profile.reliability == QoSReliabilityPolicy.RELIABLE):
reliability_reliable_endpoints_count += 1
if (info.qos_profile.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL):
durability_transient_local_endpoints_count += 1

# If all endpoints are reliable, ask for reliable
if reliability_reliable_endpoints_count == publishers_count:
qos_profile.reliability = QoSReliabilityPolicy.RELIABLE
else:
if reliability_reliable_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSReliabilityPolicy.RELIABLE. Falling back to '
'QoSReliabilityPolicy.BEST_EFFORT as it will connect '
'to all publishers'
)
qos_profile.reliability = QoSReliabilityPolicy.BEST_EFFORT

# If all endpoints are transient_local, ask for transient_local
if durability_transient_local_endpoints_count == publishers_count:
qos_profile.durability = QoSDurabilityPolicy.TRANSIENT_LOCAL
else:
if durability_transient_local_endpoints_count > 0:
print(
'Some, but not all, publishers are offering '
'QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to '
'QoSDurabilityPolicy.VOLATILE as it will connect '
'to all publishers'
)
qos_profile.durability = QoSDurabilityPolicy.VOLATILE

return qos_profile

def main(self, *, args):

self.csv = args.csv
Expand Down Expand Up @@ -198,7 +135,8 @@ def main(self, *, args):

with NodeStrategy(args) as node:

qos_profile = self.choose_qos(node, args)
qos = extract_qos_arguments(args)
qos_profile = choose_qos(node, topic_name=args.topic_name, qos_args=qos)

if args.message_type is None:
message_type = get_msg_class(
Expand Down
16 changes: 12 additions & 4 deletions ros2topic/ros2topic/verb/hz.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
from rclpy.clock import Clock
from rclpy.clock import ClockType
from rclpy.executors import ExternalShutdownException
from rclpy.qos import qos_profile_sensor_data
from ros2cli.node.direct import add_arguments as add_direct_node_arguments
from ros2cli.node.direct import DirectNode
from ros2topic.api import add_qos_arguments
from ros2topic.api import extract_qos_arguments
from ros2topic.api import choose_qos
from ros2topic.api import get_msg_class
from ros2topic.api import positive_int
from ros2topic.api import TopicNameCompleter
Expand All @@ -62,6 +64,7 @@ def add_arguments(self, parser, cli_name):
help="Names of the ROS topic to listen to (e.g. '/chatter')")
arg.completer = TopicNameCompleter(
include_hidden_topics_key='include_hidden_topics')
add_qos_arguments(parser, 'subscribe', 'sensor_data')
parser.add_argument(
'--window', '-w',
dest='window_size', type=positive_int, default=DEFAULT_WINDOW_SIZE,
Expand Down Expand Up @@ -93,8 +96,10 @@ def eval_fn(m):
else:
filter_expr = None

qos_args = extract_qos_arguments(args)

with DirectNode(args) as node:
_rostopic_hz(node.node, topics, window_size=args.window_size, filter_expr=filter_expr,
_rostopic_hz(node.node, topics, qos_args, window_size=args.window_size, filter_expr=filter_expr,
use_wtime=args.use_wtime)


Expand Down Expand Up @@ -278,11 +283,12 @@ def _get_ascii_table(header, cols):
return table


def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
def _rostopic_hz(node, topics, qos, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None, use_wtime=False):
"""
Periodically print the publishing rate of a topic to console until shutdown.
:param topics: list of topic names, ``list`` of ``str``
:param qos: qos configuration of the subscriber
:param window_size: number of messages to average over, -1 for infinite, ``int``
:param filter_expr: Python filter expression that is called with m, the message instance
"""
Expand All @@ -299,11 +305,13 @@ def _rostopic_hz(node, topics, window_size=DEFAULT_WINDOW_SIZE, filter_expr=None
print('WARNING: failed to find message type for topic [%s]' % topic)
continue

qos_profile = choose_qos(node, topic_name=topic, qos_args=qos)

node.create_subscription(
msg_class,
topic,
functools.partial(rt.callback_hz, topic=topic),
qos_profile_sensor_data)
qos_profile)
if topics_len > 1:
print('Subscribed to [%s]' % topic)

Expand Down

0 comments on commit 392e320

Please sign in to comment.