Skip to content

Commit

Permalink
Fix retransmit bug for large messages causing message corruption
Browse files Browse the repository at this point in the history
Logic bug: We are looping over segments sent by the C++ library and sending them
over a python transport. If the last message is larger than the transport pause
threshold, this causes the transport to pause us. In that case, we forget to
increment the current write_index, causing us to retransmit the same message in
an infinite loop.

This is a serious bug, because it causes messages to become corrupted.
  • Loading branch information
LasseBlaauwbroek authored and haata committed Oct 16, 2023
1 parent 302a96d commit a30fd77
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions capnp/lib/capnp.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2371,6 +2371,7 @@ cdef class _AsyncIoStream:
elif self.protocol.transport is not None and hasattr(self.protocol.transport, "close"):
self.protocol.transport.close()
# Call connection_lost immediately, instead of waiting for the transport to do it.
# TODO: This might be a questionable thing to do...
self.protocol.connection_lost("Stream is closing")

async def wait_closed(self):
Expand Down Expand Up @@ -2558,15 +2559,15 @@ cdef class _PyAsyncIoStreamProtocol(DummyBaseClass, asyncio.BufferedProtocol):
self.read_reset()

cdef write_loop(self):
if not self.write_in_progress: return
if self.write_paused or not self.write_in_progress: return
cdef const ArrayPtr[const uint8_t]* piece
for i in range(self.write_index, self.write_pieces.size()):
if self.write_paused:
self.write_index = i
break
piece = &self.write_pieces[i]
view = memoryview.PyMemoryView_FromMemory(<char*>piece.begin(), piece.size(), buffer.PyBUF_READ)
self.transport.write(view)
if self.write_paused:
self.write_index = i+1
break
if not self.write_paused:
self.write_fulfiller.fulfill()
self.write_reset()
Expand Down

0 comments on commit a30fd77

Please sign in to comment.