-
Notifications
You must be signed in to change notification settings - Fork 778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[21617] Inconsistent network throughput with LARGE_DATA #5158
Comments
Point cloud publisher: #!/usr/bin/env python3
from array import array
from random import randint
from threading import Thread
import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
POINT_NUM = 180000
FIELDS = [
PointField(name="x", offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name="y", offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name="z", offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name="intensity", offset=12, datatype=PointField.FLOAT32, count=1),
]
POINT_SIZE = 16
def main():
rclpy.init()
node = Node("points")
qos = rclpy.qos.QoSProfile(
depth=10,
reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
# reliability=rclpy.qos.QoSReliabilityPolicy.BEST_EFFORT,
)
publisher = node.create_publisher(PointCloud2, "points", qos)
rate = node.create_rate(10)
counter = 0
# Numpy to array conversion is slow. Published points selected
# by random offset from the beginning of this array.
points = array("B",
np.random \
.rand(POINT_NUM * 100, len(FIELDS))
.astype(np.float32)
.tobytes())
spinner = Thread(target=rclpy.spin, args=(node,), daemon=True)
spinner.start()
while rclpy.ok():
print(counter)
counter += 1
msg = PointCloud2()
msg.height = 1
msg.width = POINT_NUM
msg.fields = FIELDS
msg.point_step = POINT_SIZE
msg.row_step = POINT_NUM * POINT_SIZE
msg.is_dense = True
msg.is_bigendian = False
offset = randint(0, POINT_NUM * 99) * POINT_SIZE
msg.data = points[offset:offset + POINT_NUM * POINT_SIZE]
timestamp = node.get_clock().now().to_msg()
msg.header = Header()
msg.header.stamp = timestamp
msg.header.frame_id = "map"
publisher.publish(msg)
rate.sleep()
if __name__ == "__main__":
main() |
Image publisher: #!/usr/bin/env python3
from array import array
from random import randint
from threading import Thread
import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import Header
WIDTH = 200
HEIGHT = 200
PIXEL_SIZE = 3
TOPIC_NUM = 4
def main():
rclpy.init()
node = Node("images")
qos = rclpy.qos.QoSProfile(
depth=10,
reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
# reliability=rclpy.qos.QoSReliabilityPolicy.BEST_EFFORT,
)
image_size = WIDTH * HEIGHT * PIXEL_SIZE
pixels = array("B",
(np.random.rand(image_size * 2) * 256)
.astype(np.uint8)
.tobytes())
def publisher_thread(topic):
publisher = node.create_publisher(Image, topic, qos)
clock = node.get_clock()
rate = node.create_rate(25)
counter = 0
while rclpy.ok():
print(topic, counter)
counter += 1
msg = Image()
msg.width = WIDTH
msg.height = HEIGHT
msg.encoding = "rgb8"
msg.step = WIDTH * PIXEL_SIZE
msg.is_bigendian = False
offset = randint(0, image_size)
msg.data = pixels[offset:offset + image_size]
msg.header = Header()
msg.header.frame_id = "map"
timestamp = clock.now().to_msg()
msg.header.stamp = timestamp
publisher.publish(msg)
rate.sleep()
for i in range(1, TOPIC_NUM + 1):
topic = f"image{i}"
thread = Thread(
target=publisher_thread,
args=(topic,),
daemon=True)
thread.start()
rclpy.spin(node)
if __name__ == "__main__":
main() |
Subscriber: #!/usr/bin/env python3
import sys
from time import time
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.time import Time
from sensor_msgs.msg import PointCloud2, Image
USE_SIM_TIME = False
def main():
rclpy.init()
node_id = int(time())
node = Node(f"subscriber_{node_id}")
node.set_parameters([
Parameter("use_sim_time", Parameter.Type.BOOL, USE_SIM_TIME)
])
clock = node.get_clock()
timestamps = []
delays = []
qos = rclpy.qos.QoSProfile(
depth=10,
reliability=rclpy.qos.QoSReliabilityPolicy.RELIABLE,
# reliability=rclpy.qos.QoSReliabilityPolicy.BEST_EFFORT,
)
counter = [0]
_callback = lambda msg: callback(msg, clock, delays, timestamps, counter)
msg_type = globals()[sys.argv[1]]
topic = sys.argv[2]
subscriber = node.create_subscription(msg_type, topic, _callback, qos)
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
def callback(msg, clock, delays, timestamps, counter):
counter[0] += 1
print(counter[0])
delta = clock.now() - Time.from_msg(msg.header.stamp)
delta = delta.nanoseconds / 1e9
delays.append(delta)
mean_delay = sum(delays) / len(delays)
max_delay = max(delays)
if len(delays) == 20:
delays.pop(0)
print(f"mean: {mean_delay:.3f} s, max: {max_delay:.3f} s")
timestamp = Time.from_msg(msg.header.stamp).nanoseconds
timestamps.append(timestamp)
if len(timestamps) == 20:
timestamps.pop(0)
if len(timestamps) > 1:
delta = (timestamps[-1] - timestamps[0]) * 1e-9
msg_num = len(timestamps) - 1
rate = msg_num / delta
print(f"rate: {rate:.3f} Hz")
if __name__ == "__main__":
main() |
Such behavior observed even with BEST_EFFORT policy. |
Tried publishing from another desktop PC, connected directly through 1 Gbps. Same behavior. |
Increasing publisher's |
Hi @mode89, thanks for your report, |
Thank you! :) Today tested environment based on |
Is there an already existing issue for this?
Expected behavior
I expect amount of network traffic generated by FastDDS in LARGE_DATA mode to be almost the same as the payload (ROS message size multiplied by publishing rate).
Current behavior
Occasionally, generated network traffic exceeds estimated payload throughput.
There no consistency in how often it happens: on some days it could happen on half of the runs, on other days - maybe one out of ten runs.
Surprisingly, whenever network traffic exceeds the estimated value, it is always almost exactly 3 times higher.
Steps to reproduce
Set up two computers connected via Ethernet. Run publisher on one computer and subscriber on another computer. Publish large messages. Observe network throughput.
Fast DDS version/commit
I use ROS Jazzy, which currently bundles Fast DDS v2.14.1. I also tried ROS Humble with Fast DDS v2.14.3 - same behavior.
Platform/Architecture
Other. Please specify in Additional context section.
Transport layer
TCPv4
Additional context
In my experiments, the publisher is an embedded ARM64 platform:
Subscriber runs on a desktop PC:
Computers connected through 1 Gbps switch.
Both computers build ROS2 (with FastDDS) from source code through Nix package manager. I didn't apply any extra patches to FastDDS.
On publisher side,
net.core.wmem_max = 200000000
. On subscriber side:net.core.rmem_max = 200000000
.I emulated two kinds of payload:
XML configuration file
Relevant log output
No response
Network traffic capture
No response
The text was updated successfully, but these errors were encountered: