Skip to content

Commit

Permalink
Improve performance of inflight and payload queue
Browse files Browse the repository at this point in the history
Generate sorted slice is slow if the queue is
large when data rate is high and some packets
is lost/out-of-order. Use different queue struct
for inflight and payload queue. Since inflight
queue's chunk tsn is always consecutive so use
a queue to hold chunks and it is always ordered.
Use bitmask to hold payload tsn queue to calculate
cumulative tsn and SACK.
  • Loading branch information
cnderrauber committed Jun 27, 2024
1 parent e90e787 commit bf38694
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 252 deletions.
60 changes: 30 additions & 30 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ const (
avgChunkSize = 500
// minTSNOffset is the minimum offset over the cummulative TSN that we will enqueue
// irrespective of the receive buffer size
// see Association.getMaxTSNOffset
// see getMaxTSNOffset
minTSNOffset = 2000
// maxTSNOffset is the maximum offset over the cummulative TSN that we will enqueue
// irrespective of the receive buffer size
// see Association.getMaxTSNOffset
// see getMaxTSNOffset
maxTSNOffset = 40000
// maxReconfigRequests is the maximum number of reconfig requests we will keep outstanding
maxReconfigRequests = 1000
Expand Down Expand Up @@ -166,7 +166,6 @@ type Association struct {
state uint32
initialTSN uint32
myNextTSN uint32 // nextTSN
peerLastTSN uint32 // lastRcvdTSN
minTSN2MeasureRTT uint32 // for RTT measurement
willSendForwardTSN bool
willRetransmitFast bool
Expand All @@ -190,7 +189,7 @@ type Association struct {
myMaxNumInboundStreams uint16
myMaxNumOutboundStreams uint16
myCookie *paramStateCookie
payloadQueue *payloadQueue
payloadQueue *receivePayloadQueue
inflightQueue *payloadQueue
pendingQueue *pendingQueue
controlQueue *controlQueue
Expand Down Expand Up @@ -333,7 +332,7 @@ func createAssociation(config Config) *Association {
myMaxNumOutboundStreams: math.MaxUint16,
myMaxNumInboundStreams: math.MaxUint16,

payloadQueue: newPayloadQueue(),
payloadQueue: newReceivePayloadQueue(getMaxTSNOffset(maxReceiveBufferSize)),
inflightQueue: newPayloadQueue(),
pendingQueue: newPendingQueue(),
controlQueue: newControlQueue(),
Expand Down Expand Up @@ -1071,6 +1070,11 @@ func min32(a, b uint32) uint32 {
return b
}

// peerLastTSN return last received cumulative TSN
func (a *Association) peerLastTSN() uint32 {
return a.payloadQueue.getcumulativeTSN()
}

// setState atomically sets the state of the Association.
// The caller should hold the lock.
func (a *Association) setState(newState uint32) {
Expand Down Expand Up @@ -1127,13 +1131,11 @@ func (a *Association) SRTT() float64 {
}

// getMaxTSNOffset returns the maximum offset over the current cummulative TSN that
// we are willing to enqueue. Limiting the maximum offset limits the number of
// tsns we have in the payloadQueue map. This ensures that we don't use too much space in
// the map itself. This also ensures that we keep the bytes utilized in the receive
// we are willing to enqueue. This ensures that we keep the bytes utilized in the receive
// buffer within a small multiple of the user provided max receive buffer size.
func (a *Association) getMaxTSNOffset() uint32 {
func getMaxTSNOffset(maxReceiveBufferSize uint32) uint32 {
// 4 is a magic number here. There is no theory behind this.
offset := (a.maxReceiveBufferSize * 4) / avgChunkSize
offset := (maxReceiveBufferSize * 4) / avgChunkSize
if offset < minTSNOffset {
offset = minTSNOffset
}
Expand Down Expand Up @@ -1186,7 +1188,7 @@ func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {
// is set initially by taking the peer's initial TSN,
// received in the INIT or INIT ACK chunk, and
// subtracting one from it.
a.peerLastTSN = i.initialTSN - 1
a.payloadQueue.init(i.initialTSN - 1)

for _, param := range i.params {
switch v := param.(type) { // nolint:gocritic
Expand Down Expand Up @@ -1260,7 +1262,7 @@ func (a *Association) handleInitAck(p *packet, i *chunkInitAck) error {
a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
a.peerVerificationTag = i.initiateTag
a.peerLastTSN = i.initialTSN - 1
a.payloadQueue.init(i.initialTSN - 1)
if a.sourcePort != p.destinationPort ||
a.destinationPort != p.sourcePort {
a.log.Warnf("[%s] handleInitAck: port mismatch", a.name)
Expand Down Expand Up @@ -1411,7 +1413,7 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {
a.name, d.tsn, d.immediateSack, len(d.userData))
a.stats.incDATAs()

canPush := a.payloadQueue.canPush(d, a.peerLastTSN, a.getMaxTSNOffset())
canPush := a.payloadQueue.canPush(d.tsn)
if canPush {
s := a.getOrCreateStream(d.streamIdentifier, true, PayloadTypeUnknown)
if s == nil {
Expand All @@ -1423,14 +1425,14 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {

if a.getMyReceiverWindowCredit() > 0 {
// Pass the new chunk to stream level as soon as it arrives
a.payloadQueue.push(d, a.peerLastTSN)
a.payloadQueue.push(d.tsn)
s.handleData(d)
} else {
// Receive buffer is full
lastTSN, ok := a.payloadQueue.getLastTSNReceived()
if ok && sna32LT(d.tsn, lastTSN) {
a.log.Debugf("[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
a.payloadQueue.push(d, a.peerLastTSN)
a.payloadQueue.push(d.tsn)
s.handleData(d)
} else {
a.log.Debugf("[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
Expand All @@ -1454,10 +1456,9 @@ func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool)
// Meaning, if peerLastTSN+1 points to a chunk that is received,
// advance peerLastTSN until peerLastTSN+1 points to unreceived chunk.
for {
if _, popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk {
if popOk := a.payloadQueue.pop(false); !popOk {
break
}
a.peerLastTSN++

for _, rstReq := range a.reconfigRequests {
resp := a.resetStreamsIfAny(rstReq)
Expand All @@ -1470,7 +1471,7 @@ func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool)

hasPacketLoss := (a.payloadQueue.size() > 0)
if hasPacketLoss {
a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString(a.peerLastTSN))
a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString())
}

if (a.ackState != ackStateImmediate && !sackImmediately && !hasPacketLoss && a.ackMode == ackModeNormal) || a.ackMode == ackModeAlwaysDelay {
Expand Down Expand Up @@ -2068,8 +2069,8 @@ func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet {
// duplicate may indicate the previous SACK was lost in the network.

a.log.Tracef("[%s] should send ack? newCumTSN=%d peerLastTSN=%d",
a.name, c.newCumulativeTSN, a.peerLastTSN)
if sna32LTE(c.newCumulativeTSN, a.peerLastTSN) {
a.name, c.newCumulativeTSN, a.peerLastTSN())
if sna32LTE(c.newCumulativeTSN, a.peerLastTSN()) {

Check warning on line 2073 in association.go

View check run for this annotation

Codecov / codecov/patch

association.go#L2072-L2073

Added lines #L2072 - L2073 were not covered by tests
a.log.Tracef("[%s] sending ack on Forward TSN", a.name)
a.ackState = ackStateImmediate
a.ackTimer.stop()
Expand All @@ -2088,9 +2089,8 @@ func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet {
// chunk,

// Advance peerLastTSN
for sna32LT(a.peerLastTSN, c.newCumulativeTSN) {
a.payloadQueue.pop(a.peerLastTSN + 1) // may not exist
a.peerLastTSN++
for sna32LT(a.peerLastTSN(), c.newCumulativeTSN) {
a.payloadQueue.pop(true) // may not exist

Check warning on line 2093 in association.go

View check run for this annotation

Codecov / codecov/patch

association.go#L2092-L2093

Added lines #L2092 - L2093 were not covered by tests
}

// Report new peerLastTSN value and abandoned largest SSN value to
Expand Down Expand Up @@ -2143,7 +2143,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
switch p := raw.(type) {
case *paramOutgoingResetRequest:
a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name)
if a.peerLastTSN < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests {
if a.peerLastTSN() < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests {
// We have too many reconfig requests outstanding. Drop the request and let
// the peer retransmit. A well behaved peer should only have 1 outstanding
// reconfig request.
Expand Down Expand Up @@ -2189,9 +2189,9 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
// The caller should hold the lock.
func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
result := reconfigResultSuccessPerformed
if sna32LTE(p.senderLastTSN, a.peerLastTSN) {
if sna32LTE(p.senderLastTSN, a.peerLastTSN()) {
a.log.Debugf("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d",
a.name, p.senderLastTSN, a.peerLastTSN)
a.name, p.senderLastTSN, a.peerLastTSN())
for _, id := range p.streamIdentifiers {
s, ok := a.streams[id]
if !ok {
Expand All @@ -2206,7 +2206,7 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
} else {
a.log.Debugf("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d",
a.name, p.senderLastTSN, a.peerLastTSN)
a.name, p.senderLastTSN, a.peerLastTSN())

Check warning on line 2209 in association.go

View check run for this annotation

Codecov / codecov/patch

association.go#L2209

Added line #L2209 was not covered by tests
result = reconfigResultInProgress
}

Expand Down Expand Up @@ -2280,7 +2280,7 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
break // would exceeds cwnd
}

if dataLen > a.rwnd {
if dataLen > a.RWND() {
break // no more rwnd
}

Expand Down Expand Up @@ -2454,10 +2454,10 @@ func (a *Association) generateNextRSN() uint32 {

func (a *Association) createSelectiveAckChunk() *chunkSelectiveAck {
sack := &chunkSelectiveAck{}
sack.cumulativeTSNAck = a.peerLastTSN
sack.cumulativeTSNAck = a.peerLastTSN()
sack.advertisedReceiverWindowCredit = a.getMyReceiverWindowCredit()
sack.duplicateTSN = a.payloadQueue.popDuplicates()
sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks(a.peerLastTSN)
sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks()
return sack
}

Expand Down
46 changes: 16 additions & 30 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,10 +1283,10 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 3,
newCumulativeTSN: prevTSN + 3,
streams: []chunkForwardTSNStream{{identifier: 0, sequence: 0}},
}

Expand All @@ -1296,7 +1296,7 @@ func TestHandleForwardTSN(t *testing.T) {
delayedAckTriggered := a.delayedAckTriggered
immediateAckTriggered := a.immediateAckTriggered
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN+3, "peerLastTSN should advance by 3 ")
assert.Equal(t, a.peerLastTSN(), prevTSN+3, "peerLastTSN should advance by 3 ")
assert.True(t, delayedAckTriggered, "delayed sack should be triggered")
assert.False(t, immediateAckTriggered, "immediate sack should NOT be triggered")
assert.Nil(t, p, "should return nil")
Expand All @@ -1308,20 +1308,13 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

// this chunk is blocked by the missing chunk at tsn=1
a.payloadQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 2,
streamIdentifier: 0,
streamSequenceNumber: 1,
userData: []byte("ABC"),
}, a.peerLastTSN)
a.payloadQueue.push(a.peerLastTSN() + 2)

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 1,
newCumulativeTSN: a.peerLastTSN() + 1,
streams: []chunkForwardTSNStream{
{identifier: 0, sequence: 1},
},
Expand All @@ -1333,7 +1326,7 @@ func TestHandleForwardTSN(t *testing.T) {
delayedAckTriggered := a.delayedAckTriggered
immediateAckTriggered := a.immediateAckTriggered
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN+2, "peerLastTSN should advance by 3")
assert.Equal(t, a.peerLastTSN(), prevTSN+2, "peerLastTSN should advance by 3")
assert.True(t, delayedAckTriggered, "delayed sack should be triggered")
assert.False(t, immediateAckTriggered, "immediate sack should NOT be triggered")
assert.Nil(t, p, "should return nil")
Expand All @@ -1345,20 +1338,13 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

// this chunk is blocked by the missing chunk at tsn=1
a.payloadQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 3,
streamIdentifier: 0,
streamSequenceNumber: 1,
userData: []byte("ABC"),
}, a.peerLastTSN)
a.payloadQueue.push(a.peerLastTSN() + 3)

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 1,
newCumulativeTSN: a.peerLastTSN() + 1,
streams: []chunkForwardTSNStream{
{identifier: 0, sequence: 1},
},
Expand All @@ -1369,7 +1355,7 @@ func TestHandleForwardTSN(t *testing.T) {
a.lock.Lock()
immediateAckTriggered := a.immediateAckTriggered
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN+1, "peerLastTSN should advance by 1")
assert.Equal(t, a.peerLastTSN(), prevTSN+1, "peerLastTSN should advance by 1")
assert.True(t, immediateAckTriggered, "immediate sack should be triggered")

assert.Nil(t, p, "should return nil")
Expand All @@ -1381,10 +1367,10 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN, // old TSN
newCumulativeTSN: a.peerLastTSN(), // old TSN
streams: []chunkForwardTSNStream{
{identifier: 0, sequence: 1},
},
Expand All @@ -1395,7 +1381,7 @@ func TestHandleForwardTSN(t *testing.T) {
a.lock.Lock()
ackState := a.ackState
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN, "peerLastTSN should not advance")
assert.Equal(t, a.peerLastTSN(), prevTSN, "peerLastTSN should not advance")
assert.Equal(t, ackStateImmediate, ackState, "sack should be requested")
assert.Nil(t, p, "should return nil")
})
Expand Down Expand Up @@ -1690,7 +1676,7 @@ func TestAssocCreateNewStream(t *testing.T) {
toBeIgnored := &chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 1,
tsn: a.peerLastTSN() + 1,
streamIdentifier: newSI,
userData: []byte("ABC"),
}
Expand Down Expand Up @@ -2482,7 +2468,7 @@ func TestAssocHandleInit(t *testing.T) {
return
}
assert.NoError(t, err, "should succeed")
assert.Equal(t, init.initialTSN-1, a.peerLastTSN, "should match")
assert.Equal(t, init.initialTSN-1, a.peerLastTSN(), "should match")
assert.Equal(t, uint16(1001), a.myMaxNumOutboundStreams, "should match")
assert.Equal(t, uint16(1002), a.myMaxNumInboundStreams, "should match")
assert.Equal(t, uint32(5678), a.peerVerificationTag, "should match")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/pion/sctp

require (
github.com/gammazero/deque v0.2.1
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/transport/v3 v3.0.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
Loading

0 comments on commit bf38694

Please sign in to comment.