Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ported melodic speed ups #583

Open
wants to merge 1 commit into
base: noetic-devel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 150 additions & 50 deletions rosserial_python/src/rosserial_python/SerialClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
import sys
import threading
import time
import select
import os
import fcntl
import io
#import line_profiler as Profiler

from serial import Serial, SerialException, SerialTimeoutException

Expand Down Expand Up @@ -255,10 +260,31 @@ def listen(self):
self.startSerialClient()
rospy.loginfo("startSerialClient() exited")

def fileno(self):
return self.socket.fileno()

def startSerialClient(self):
client = SerialClient(self)
# profile = Profiler.LineProfiler()
try:
# profile.add_function(SerialClient.run)
# profile.add_function(SerialClient.setupPublisher)
# profile.add_function(SerialClient.setupSubscriber)
# profile.add_function(SerialClient.setupServiceServerPublisher)
# profile.add_function(SerialClient.setupServiceClientPublisher)
# profile.add_function(SerialClient.setupServiceServerSubscriber)
# profile.add_function(SerialClient.setupServiceClientSubscriber)
# profile.add_function(SerialClient.handleParameterRequest)
# profile.add_function(SerialClient.handleLoggingRequest)
# profile.add_function(SerialClient.tryRead)
# profile.add_function(SerialClient._tryRead)

# wrapped_run = profile(lambda: client.run())
# wrapped_run()
# profile.dump_stats('/tmp/serial.profile')
# profile.print_stats()
client.run()

except KeyboardInterrupt:
pass
except RuntimeError:
Expand Down Expand Up @@ -294,18 +320,26 @@ def write(self, data):
totalsent += self.socket.send(data[totalsent:])
except BrokenPipeError:
raise RuntimeError("RosSerialServer.write() socket connection broken")
except io.BlockingIOError as e:
sent = e.characters_written
if sent == 0:
# being a quick hack, we don't turn
# on/off poll flags and handle everything
# in a single threaded loop.
# of course, this will make a bad
# situation worse.
time.sleep(0.001)
continue

def read(self, rqsted_length):
self.msg = b''
if not self.isConnected:
return self.msg

while len(self.msg) < rqsted_length:
chunk = self.socket.recv(rqsted_length - len(self.msg))
if chunk == b'':
raise RuntimeError("RosSerialServer.read() socket connection broken")
self.msg = self.msg + chunk
return self.msg
chunk = self.socket.recv(rqsted_length)
if chunk == '':
raise RuntimeError("RosSerialServer.read() socket connection broken")
return chunk

def inWaiting(self):
try: # the caller checks just for <1, so we'll peek at just one byte
Expand All @@ -328,6 +362,18 @@ class SerialClient(object):
protocol_ver2 = b'\xfe'
protocol_ver = protocol_ver2

# Named after the Doctor Who Episode
class Blink(object):
"""
Wake up the poll and queue.get and don't let them sleep anymore.
"""
def __init__(parent):
self.parent = parent

def invoke(self):
self.parent._pipe_w.write('Wakie, Wakie'.encode())
self.parent.write_queue.put(self)

def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=False):
""" Initialize node, connect to bus, attempt to negotiate topics. """

Expand All @@ -336,6 +382,7 @@ def __init__(self, port=None, baud=57600, timeout=5.0, fix_pyserial_for_test=Fal
self.write_lock = threading.RLock()
self.write_queue = queue.Queue()
self.write_thread = None
self.byte_buffer = bytearray()

self.lastsync = rospy.Time(0)
self.lastsync_lost = rospy.Time(0)
Expand Down Expand Up @@ -425,28 +472,67 @@ def txStopRequest(self):
rospy.loginfo("Sending tx stop request")

def tryRead(self, length):
while len(self.byte_buffer) < length:
self.byte_buffer.extend(self._tryRead(length-len(self.byte_buffer), 65536))
b = self.byte_buffer[0:length]
self.byte_buffer = self.byte_buffer[length:]
return b

def _tryRead(self, length, chunk):
try:
read_start = time.time()
bytes_remaining = length
result = bytearray()
while bytes_remaining != 0 and time.time() - read_start < self.timeout:
with self.read_lock:
received = self.port.read(bytes_remaining)

# Block here until there is something to do.
waittime = self.timeout - (time.time() - read_start)
ready = []
if waittime <= 0:
break

# Hacky, but better than busy waiting.
ready = self.poll.poll(waittime * 1000)

if self.port.fileno() not in [ i[0] for i in ready ]:
break

try:
with self.read_lock:
received = self.port.read(max(chunk, bytes_remaining))
except IOError as e:
continue

if len(received) != 0:
#line_profile next target: 511 13812 1297061.0 93.9 10.5
self.last_read = rospy.Time.now()
result.extend(received)
bytes_remaining -= len(received)
if bytes_remaining < 0:
bytes_remaining = 0

if bytes_remaining != 0:
raise IOError("Returned short (expected %d bytes, received %d instead)." % (length, length - bytes_remaining))

return bytes(result)
return result
except Exception as e:
raise IOError("Serial Port read failure: %s" % e)

def run(self):
""" Forward recieved messages to appropriate publisher. """

# countdown = 1000 for profiling

# Set Non-Blocking so are read ahead in try_read won't block.
# Means we could get short writes in theory.
flag = fcntl.fcntl(self.port.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.port.fileno(), fcntl.F_SETFL, flag | os.O_NONBLOCK)

# Get a pipe object used to wake up the sleepers.
# Be careful, these are not initialized elsewhere.
(self._pipe_r, self._pipe_w) = os.pipe()
rospy.on_shutdown(lambda : SerialClient.Blink(self).invoke())

# Launch write thread.
if self.write_thread is None:
self.write_thread = threading.Thread(target=self.processWriteQueue)
Expand All @@ -456,36 +542,52 @@ def run(self):
# Handle reading.
data = ''
read_step = None
while self.write_thread.is_alive() and not rospy.is_shutdown():
if (rospy.Time.now() - self.lastsync).to_sec() > (self.timeout * 3):
if self.synced:
rospy.logerr("Lost sync with device, restarting...")
else:
rospy.logerr("Unable to sync with device; possible link problem or link software version mismatch such as hydro rosserial_python with groovy Arduino")
self.lastsync_lost = rospy.Time.now()
self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_NO_SYNC)
self.requestTopics()
self.lastsync = rospy.Time.now()

self.poll = select.poll()
self.poll.register(self.port, select.POLLIN)
self.poll.register(self._pipe_r, select.POLLIN)

while True:
# for profiling.
# countdown = countdown - 1
# if countdown <= 0:
# return
# if countdown % 100 == 0:
# rospy.logerr("countdown=%d" %countdown)

if len(self.byte_buffer) == 0:
waittime = 3 * self.timeout - (rospy.Time.now() - self.lastsync).to_sec()
ready_fds = []
if waittime > 0:
ready_fds = self.poll.poll(waittime * 1000)
if not ready_fds:
if self.synced:
rospy.logerr("Lost sync with device, restarting...")
else:
rospy.logerr("Unable to sync with device; possible link problem or linsoftware version mismatch such as hydro rosserial_python with groovy Arduino")
self.lastsync_lost = rospy.Time.now()
self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR,
ERROR_NO_SYNC)
self.requestTopics()
self.lastsync = rospy.Time.now()
continue
elif self._pipe_r in [ i[0] for i in ready_fds ]:
break

# This try-block is here because we make multiple calls to read(). Any one of them can throw
# an IOError if there's a serial problem or timeout. In that scenario, a single handler at the
# bottom attempts to reconfigure the topics.
try:
with self.read_lock:
if self.port.inWaiting() < 1:
time.sleep(0.001)
continue

# Find sync flag.
flag = [0, 0]
read_step = 'syncflag'
flag[0] = self.tryRead(1)
flag[0] = bytes(self.tryRead(1))
if (flag[0] != self.header):
continue

# Find protocol version.
read_step = 'protocol'
flag[1] = self.tryRead(1)
flag[1] = bytes(self.tryRead(1))
if flag[1] != self.protocol_ver:
self.sendDiagnostics(diagnostic_msgs.msg.DiagnosticStatus.ERROR, ERROR_MISMATCHED_PROTOCOL)
rospy.logerr("Mismatched protocol version in packet (%s): lost sync or rosserial_python is from different ros release than the rosserial client" % repr(flag[1]))
Expand Down Expand Up @@ -528,19 +630,18 @@ def run(self):

# Reada checksum for topic id and msg
read_step = 'data checksum'
chk = self.tryRead(1)
checksum = sum(array.array('B', topic_id_header + msg + chk))
chk = bytes(self.tryRead(1))
checksum = sum(topic_id_header) + sum(msg) + ord(chk)

# Validate checksum.
if checksum % 256 == 255:
self.synced = True
self.lastsync_success = rospy.Time.now()
try:
self.callbacks[topic_id](msg)
self.callbacks[topic_id](bytes(msg))
except KeyError:
rospy.logerr("Tried to publish before configured, topic id %d" % topic_id)
self.requestTopics()
time.sleep(0.001)
else:
rospy.loginfo("wrong checksum for topic id and msg")

Expand Down Expand Up @@ -775,26 +876,25 @@ def processWriteQueue(self):
Main loop for the thread that processes outgoing data to write to the serial port.
"""
while not rospy.is_shutdown():
if self.write_queue.empty():
time.sleep(0.01)
else:
data = self.write_queue.get()
while True:
try:
if isinstance(data, tuple):
topic, msg = data
self._send(topic, msg)
elif isinstance(data, bytes):
self._write(data)
else:
rospy.logerr("Trying to write invalid data type: %s" % type(data))
break
except SerialTimeoutException as exc:
rospy.logerr('Write timeout: %s' % exc)
time.sleep(1)
except RuntimeError as exc:
rospy.logerr('Write thread exception: %s' % exc)
break
data = self.write_queue.get()
while True:
try:
if isinstance(data, tuple):
topic, msg = data
self._send(topic, msg)
elif isinstance(data, bytes):
self._write(data)
elif isinstance (data, SerialClient.Blink):
break
else:
rospy.logerr("Trying to write invalid data type: %s" % type(data))
break
except SerialTimeoutException as exc:
rospy.logerr('Write timeout: %s' % exc)
time.sleep(1)
except RuntimeError as exc:
rospy.logerr('Write thread exception: %s' % exc)
break


def sendDiagnostics(self, level, msg_text):
Expand Down