Skip to content

Commit

Permalink
Fix incorrect fragmented chunk merge
Browse files Browse the repository at this point in the history
  • Loading branch information
edaniels committed Mar 6, 2024
1 parent 3f9fba4 commit 1df1932
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 6 deletions.
5 changes: 4 additions & 1 deletion association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ func createNewAssociationPair(br *test.Bridge, ackMode int, recvBufSize uint32)
handshake0Ch <- true
}()
go func() {
a1, err1 = Client(Config{
// we could have two "client"s here but it's more
// standard to have one peer starting initialization and
// another waiting for the initialization to be requested (INIT).
a1, err1 = Server(Config{
Name: "a1",
NetConn: br.GetConn1(),
MaxReceiveBufferSize: recvBufSize,
Expand Down
4 changes: 4 additions & 0 deletions chunk_payload_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,7 @@ func (p *chunkPayloadData) setAllInflight() {
}
}
}

func (p *chunkPayloadData) isFragmented() bool {
return !(p.head == nil && p.beginningFragment && p.endingFragment)
}
21 changes: 16 additions & 5 deletions reassembly_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,22 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool {
return false
}

// Check if a chunkSet with the SSN already exists
for _, set := range r.ordered {
if set.ssn == chunk.streamSequenceNumber {
cset = set
break
// Check if a fragmented chunkSet with the fragmented SSN already exists
if chunk.isFragmented() {
for _, set := range r.ordered {
// nolint:godox
// TODO: add caution around SSN wrapping here... this helps only a little bit
// by ensuring we don't add to an unfragmented cset (1 chunk). There's
// a case where if the SSN does wrap around, we may see the same SSN
// for a different chunk.

// nolint:godox
// TODO: this slice can get pretty big; it may be worth maintaining a map
// for O(1) lookups at the cost of 2x memory.
if set.ssn == chunk.streamSequenceNumber && set.chunks[0].isFragmented() {
cset = set
break
}
}
}

Expand Down
38 changes: 38 additions & 0 deletions reassembly_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,44 @@ func TestReassemblyQueue(t *testing.T) {
assert.Equal(t, 1, len(rq.unorderedChunks), "there should be one chunk kept")
assert.Equal(t, 3, rq.getNumBytes(), "num bytes mismatch")
})

t.Run("fragmented and unfragmented chunks with the same ssn", func(t *testing.T) {
rq := newReassemblyQueue(0)

orgPpi := PayloadTypeWebRTCBinary

var chunk *chunkPayloadData
var complete bool
var ssn uint16 = 6

chunk = &chunkPayloadData{
payloadType: orgPpi,
tsn: 12,
beginningFragment: true,
endingFragment: true,
streamSequenceNumber: ssn,
userData: []byte("DEF"),
}

complete = rq.push(chunk)
assert.True(t, complete, "chunk set should be complete")
assert.Equal(t, 3, rq.getNumBytes(), "num bytes mismatch")

chunk = &chunkPayloadData{
payloadType: orgPpi,
beginningFragment: true,
tsn: 11,
streamSequenceNumber: ssn,
userData: []byte("ABC"),
}

complete = rq.push(chunk)
assert.False(t, complete, "chunk set should not be complete yet")
assert.Equal(t, 6, rq.getNumBytes(), "num bytes mismatch")

assert.Equal(t, 2, len(rq.ordered), "there should be two chunks")
assert.Equal(t, 6, rq.getNumBytes(), "num bytes mismatch")
})
}

func TestChunkSet(t *testing.T) {
Expand Down

0 comments on commit 1df1932

Please sign in to comment.