From 735a4c6d8ae4023a295ef1d305b9e0c6e7608c70 Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Wed, 28 Feb 2024 11:28:29 -0500 Subject: [PATCH] Fix incorrect fragmented chunk merge --- association_test.go | 5 ++++- chunk_payload_data.go | 4 ++++ reassembly_queue.go | 21 ++++++++++++++++----- reassembly_queue_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 6 deletions(-) diff --git a/association_test.go b/association_test.go index ed53ed71..8a50cf29 100644 --- a/association_test.go +++ b/association_test.go @@ -265,7 +265,10 @@ func createNewAssociationPair(br *test.Bridge, ackMode int, recvBufSize uint32) handshake0Ch <- true }() go func() { - a1, err1 = Client(Config{ + // we could have two "client"s here but it's more + // standard to have one peer starting initialization and + // another waiting for the initialization to be requested (INIT). + a1, err1 = Server(Config{ Name: "a1", NetConn: br.GetConn1(), MaxReceiveBufferSize: recvBufSize, diff --git a/chunk_payload_data.go b/chunk_payload_data.go index b6f1b614..a6e50db9 100644 --- a/chunk_payload_data.go +++ b/chunk_payload_data.go @@ -206,3 +206,7 @@ func (p *chunkPayloadData) setAllInflight() { } } } + +func (p *chunkPayloadData) isFragmented() bool { + return !(p.head == nil && p.beginningFragment && p.endingFragment) +} diff --git a/reassembly_queue.go b/reassembly_queue.go index c23e6991..e0d527c0 100644 --- a/reassembly_queue.go +++ b/reassembly_queue.go @@ -155,11 +155,22 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { return false } - // Check if a chunkSet with the SSN already exists - for _, set := range r.ordered { - if set.ssn == chunk.streamSequenceNumber { - cset = set - break + // Check if a fragmented chunkSet with the fragmented SSN already exists + if chunk.isFragmented() { + for _, set := range r.ordered { + // nolint:godox + // TODO: add caution around SSN wrapping here... this helps only a little bit + // by ensuring we don't add to an unfragmented cset (1 chunk). There's + // a case where if the SSN does wrap around, we may see the same SSN + // for a different chunk. + + // nolint:godox + // TODO: this slice can get pretty big; it may be worth maintaining a map + // for O(1) lookups at the cost of 2x memory. + if set.ssn == chunk.streamSequenceNumber && set.chunks[0].isFragmented() { + cset = set + break + } } } diff --git a/reassembly_queue_test.go b/reassembly_queue_test.go index 91fd5305..02478f45 100644 --- a/reassembly_queue_test.go +++ b/reassembly_queue_test.go @@ -464,6 +464,44 @@ func TestReassemblyQueue(t *testing.T) { assert.Equal(t, 1, len(rq.unorderedChunks), "there should be one chunk kept") assert.Equal(t, 3, rq.getNumBytes(), "num bytes mismatch") }) + + t.Run("fragmented and unfragmented chunks with the same ssn", func(t *testing.T) { + rq := newReassemblyQueue(0) + + orgPpi := PayloadTypeWebRTCBinary + + var chunk *chunkPayloadData + var complete bool + var ssn uint16 = 6 + + chunk = &chunkPayloadData{ + payloadType: orgPpi, + tsn: 12, + beginningFragment: true, + endingFragment: true, + streamSequenceNumber: ssn, + userData: []byte("DEF"), + } + + complete = rq.push(chunk) + assert.True(t, complete, "chunk set should be complete") + assert.Equal(t, 3, rq.getNumBytes(), "num bytes mismatch") + + chunk = &chunkPayloadData{ + payloadType: orgPpi, + beginningFragment: true, + tsn: 11, + streamSequenceNumber: ssn, + userData: []byte("ABC"), + } + + complete = rq.push(chunk) + assert.False(t, complete, "chunk set should not be complete yet") + assert.Equal(t, 6, rq.getNumBytes(), "num bytes mismatch") + + assert.Equal(t, 2, len(rq.ordered), "there should be two chunks") + assert.Equal(t, 6, rq.getNumBytes(), "num bytes mismatch") + }) } func TestChunkSet(t *testing.T) {