Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance for inflight and payload queue #337

Merged
merged 5 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ const (
avgChunkSize = 500
// minTSNOffset is the minimum offset over the cummulative TSN that we will enqueue
// irrespective of the receive buffer size
// see Association.getMaxTSNOffset
// see getMaxTSNOffset
minTSNOffset = 2000
// maxTSNOffset is the maximum offset over the cummulative TSN that we will enqueue
// irrespective of the receive buffer size
// see Association.getMaxTSNOffset
// see getMaxTSNOffset
maxTSNOffset = 40000
// maxReconfigRequests is the maximum number of reconfig requests we will keep outstanding
maxReconfigRequests = 1000
Expand Down Expand Up @@ -166,7 +166,6 @@ type Association struct {
state uint32
initialTSN uint32
myNextTSN uint32 // nextTSN
peerLastTSN uint32 // lastRcvdTSN
minTSN2MeasureRTT uint32 // for RTT measurement
willSendForwardTSN bool
willRetransmitFast bool
Expand All @@ -190,7 +189,7 @@ type Association struct {
myMaxNumInboundStreams uint16
myMaxNumOutboundStreams uint16
myCookie *paramStateCookie
payloadQueue *payloadQueue
payloadQueue *receivePayloadQueue
inflightQueue *payloadQueue
pendingQueue *pendingQueue
controlQueue *controlQueue
Expand Down Expand Up @@ -333,7 +332,7 @@ func createAssociation(config Config) *Association {
myMaxNumOutboundStreams: math.MaxUint16,
myMaxNumInboundStreams: math.MaxUint16,

payloadQueue: newPayloadQueue(),
payloadQueue: newReceivePayloadQueue(getMaxTSNOffset(maxReceiveBufferSize)),
inflightQueue: newPayloadQueue(),
pendingQueue: newPendingQueue(),
controlQueue: newControlQueue(),
Expand Down Expand Up @@ -1071,6 +1070,11 @@ func min32(a, b uint32) uint32 {
return b
}

// peerLastTSN return last received cumulative TSN
func (a *Association) peerLastTSN() uint32 {
return a.payloadQueue.getcumulativeTSN()
}

// setState atomically sets the state of the Association.
// The caller should hold the lock.
func (a *Association) setState(newState uint32) {
Expand Down Expand Up @@ -1127,13 +1131,11 @@ func (a *Association) SRTT() float64 {
}

// getMaxTSNOffset returns the maximum offset over the current cummulative TSN that
// we are willing to enqueue. Limiting the maximum offset limits the number of
// tsns we have in the payloadQueue map. This ensures that we don't use too much space in
// the map itself. This also ensures that we keep the bytes utilized in the receive
// we are willing to enqueue. This ensures that we keep the bytes utilized in the receive
// buffer within a small multiple of the user provided max receive buffer size.
func (a *Association) getMaxTSNOffset() uint32 {
func getMaxTSNOffset(maxReceiveBufferSize uint32) uint32 {
// 4 is a magic number here. There is no theory behind this.
offset := (a.maxReceiveBufferSize * 4) / avgChunkSize
offset := (maxReceiveBufferSize * 4) / avgChunkSize
if offset < minTSNOffset {
offset = minTSNOffset
}
Expand Down Expand Up @@ -1186,7 +1188,7 @@ func (a *Association) handleInit(p *packet, i *chunkInit) ([]*packet, error) {
// is set initially by taking the peer's initial TSN,
// received in the INIT or INIT ACK chunk, and
// subtracting one from it.
a.peerLastTSN = i.initialTSN - 1
a.payloadQueue.init(i.initialTSN - 1)

for _, param := range i.params {
switch v := param.(type) { // nolint:gocritic
Expand Down Expand Up @@ -1260,7 +1262,7 @@ func (a *Association) handleInitAck(p *packet, i *chunkInitAck) error {
a.myMaxNumInboundStreams = min16(i.numInboundStreams, a.myMaxNumInboundStreams)
a.myMaxNumOutboundStreams = min16(i.numOutboundStreams, a.myMaxNumOutboundStreams)
a.peerVerificationTag = i.initiateTag
a.peerLastTSN = i.initialTSN - 1
a.payloadQueue.init(i.initialTSN - 1)
if a.sourcePort != p.destinationPort ||
a.destinationPort != p.sourcePort {
a.log.Warnf("[%s] handleInitAck: port mismatch", a.name)
Expand Down Expand Up @@ -1411,7 +1413,7 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {
a.name, d.tsn, d.immediateSack, len(d.userData))
a.stats.incDATAs()

canPush := a.payloadQueue.canPush(d, a.peerLastTSN, a.getMaxTSNOffset())
canPush := a.payloadQueue.canPush(d.tsn)
if canPush {
s := a.getOrCreateStream(d.streamIdentifier, true, PayloadTypeUnknown)
if s == nil {
Expand All @@ -1423,14 +1425,14 @@ func (a *Association) handleData(d *chunkPayloadData) []*packet {

if a.getMyReceiverWindowCredit() > 0 {
// Pass the new chunk to stream level as soon as it arrives
a.payloadQueue.push(d, a.peerLastTSN)
a.payloadQueue.push(d.tsn)
s.handleData(d)
} else {
// Receive buffer is full
lastTSN, ok := a.payloadQueue.getLastTSNReceived()
if ok && sna32LT(d.tsn, lastTSN) {
a.log.Debugf("[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
a.payloadQueue.push(d, a.peerLastTSN)
a.payloadQueue.push(d.tsn)
s.handleData(d)
} else {
a.log.Debugf("[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d", a.name, d.tsn, d.streamSequenceNumber)
Expand All @@ -1454,10 +1456,9 @@ func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool)
// Meaning, if peerLastTSN+1 points to a chunk that is received,
// advance peerLastTSN until peerLastTSN+1 points to unreceived chunk.
for {
if _, popOk := a.payloadQueue.pop(a.peerLastTSN + 1); !popOk {
if popOk := a.payloadQueue.pop(false); !popOk {
break
}
a.peerLastTSN++

for _, rstReq := range a.reconfigRequests {
resp := a.resetStreamsIfAny(rstReq)
Expand All @@ -1470,7 +1471,7 @@ func (a *Association) handlePeerLastTSNAndAcknowledgement(sackImmediately bool)

hasPacketLoss := (a.payloadQueue.size() > 0)
if hasPacketLoss {
a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString(a.peerLastTSN))
a.log.Tracef("[%s] packetloss: %s", a.name, a.payloadQueue.getGapAckBlocksString())
}

if (a.ackState != ackStateImmediate && !sackImmediately && !hasPacketLoss && a.ackMode == ackModeNormal) || a.ackMode == ackModeAlwaysDelay {
Expand Down Expand Up @@ -2068,8 +2069,8 @@ func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet {
// duplicate may indicate the previous SACK was lost in the network.

a.log.Tracef("[%s] should send ack? newCumTSN=%d peerLastTSN=%d",
a.name, c.newCumulativeTSN, a.peerLastTSN)
if sna32LTE(c.newCumulativeTSN, a.peerLastTSN) {
a.name, c.newCumulativeTSN, a.peerLastTSN())
if sna32LTE(c.newCumulativeTSN, a.peerLastTSN()) {
a.log.Tracef("[%s] sending ack on Forward TSN", a.name)
a.ackState = ackStateImmediate
a.ackTimer.stop()
Expand All @@ -2088,9 +2089,8 @@ func (a *Association) handleForwardTSN(c *chunkForwardTSN) []*packet {
// chunk,

// Advance peerLastTSN
for sna32LT(a.peerLastTSN, c.newCumulativeTSN) {
a.payloadQueue.pop(a.peerLastTSN + 1) // may not exist
a.peerLastTSN++
for sna32LT(a.peerLastTSN(), c.newCumulativeTSN) {
a.payloadQueue.pop(true) // may not exist
}

// Report new peerLastTSN value and abandoned largest SSN value to
Expand Down Expand Up @@ -2143,7 +2143,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
switch p := raw.(type) {
case *paramOutgoingResetRequest:
a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name)
if a.peerLastTSN < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests {
if a.peerLastTSN() < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests {
// We have too many reconfig requests outstanding. Drop the request and let
// the peer retransmit. A well behaved peer should only have 1 outstanding
// reconfig request.
Expand Down Expand Up @@ -2189,9 +2189,9 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) {
// The caller should hold the lock.
func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
result := reconfigResultSuccessPerformed
if sna32LTE(p.senderLastTSN, a.peerLastTSN) {
if sna32LTE(p.senderLastTSN, a.peerLastTSN()) {
a.log.Debugf("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d",
a.name, p.senderLastTSN, a.peerLastTSN)
a.name, p.senderLastTSN, a.peerLastTSN())
for _, id := range p.streamIdentifiers {
s, ok := a.streams[id]
if !ok {
Expand All @@ -2206,7 +2206,7 @@ func (a *Association) resetStreamsIfAny(p *paramOutgoingResetRequest) *packet {
delete(a.reconfigRequests, p.reconfigRequestSequenceNumber)
} else {
a.log.Debugf("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d",
a.name, p.senderLastTSN, a.peerLastTSN)
a.name, p.senderLastTSN, a.peerLastTSN())
result = reconfigResultInProgress
}

Expand Down Expand Up @@ -2280,7 +2280,7 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
break // would exceeds cwnd
}

if dataLen > a.rwnd {
if dataLen > a.RWND() {
break // no more rwnd
}

Expand Down Expand Up @@ -2454,10 +2454,10 @@ func (a *Association) generateNextRSN() uint32 {

func (a *Association) createSelectiveAckChunk() *chunkSelectiveAck {
sack := &chunkSelectiveAck{}
sack.cumulativeTSNAck = a.peerLastTSN
sack.cumulativeTSNAck = a.peerLastTSN()
sack.advertisedReceiverWindowCredit = a.getMyReceiverWindowCredit()
sack.duplicateTSN = a.payloadQueue.popDuplicates()
sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks(a.peerLastTSN)
sack.gapAckBlocks = a.payloadQueue.getGapAckBlocks()
return sack
}

Expand Down
46 changes: 16 additions & 30 deletions association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,10 +1283,10 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 3,
newCumulativeTSN: prevTSN + 3,
streams: []chunkForwardTSNStream{{identifier: 0, sequence: 0}},
}

Expand All @@ -1296,7 +1296,7 @@ func TestHandleForwardTSN(t *testing.T) {
delayedAckTriggered := a.delayedAckTriggered
immediateAckTriggered := a.immediateAckTriggered
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN+3, "peerLastTSN should advance by 3 ")
assert.Equal(t, a.peerLastTSN(), prevTSN+3, "peerLastTSN should advance by 3 ")
assert.True(t, delayedAckTriggered, "delayed sack should be triggered")
assert.False(t, immediateAckTriggered, "immediate sack should NOT be triggered")
assert.Nil(t, p, "should return nil")
Expand All @@ -1308,20 +1308,13 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

// this chunk is blocked by the missing chunk at tsn=1
a.payloadQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 2,
streamIdentifier: 0,
streamSequenceNumber: 1,
userData: []byte("ABC"),
}, a.peerLastTSN)
a.payloadQueue.push(a.peerLastTSN() + 2)

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 1,
newCumulativeTSN: a.peerLastTSN() + 1,
streams: []chunkForwardTSNStream{
{identifier: 0, sequence: 1},
},
Expand All @@ -1333,7 +1326,7 @@ func TestHandleForwardTSN(t *testing.T) {
delayedAckTriggered := a.delayedAckTriggered
immediateAckTriggered := a.immediateAckTriggered
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN+2, "peerLastTSN should advance by 3")
assert.Equal(t, a.peerLastTSN(), prevTSN+2, "peerLastTSN should advance by 3")
assert.True(t, delayedAckTriggered, "delayed sack should be triggered")
assert.False(t, immediateAckTriggered, "immediate sack should NOT be triggered")
assert.Nil(t, p, "should return nil")
Expand All @@ -1345,20 +1338,13 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

// this chunk is blocked by the missing chunk at tsn=1
a.payloadQueue.push(&chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 3,
streamIdentifier: 0,
streamSequenceNumber: 1,
userData: []byte("ABC"),
}, a.peerLastTSN)
a.payloadQueue.push(a.peerLastTSN() + 3)

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN + 1,
newCumulativeTSN: a.peerLastTSN() + 1,
streams: []chunkForwardTSNStream{
{identifier: 0, sequence: 1},
},
Expand All @@ -1369,7 +1355,7 @@ func TestHandleForwardTSN(t *testing.T) {
a.lock.Lock()
immediateAckTriggered := a.immediateAckTriggered
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN+1, "peerLastTSN should advance by 1")
assert.Equal(t, a.peerLastTSN(), prevTSN+1, "peerLastTSN should advance by 1")
assert.True(t, immediateAckTriggered, "immediate sack should be triggered")

assert.Nil(t, p, "should return nil")
Expand All @@ -1381,10 +1367,10 @@ func TestHandleForwardTSN(t *testing.T) {
LoggerFactory: loggerFactory,
})
a.useForwardTSN = true
prevTSN := a.peerLastTSN
prevTSN := a.peerLastTSN()

fwdtsn := &chunkForwardTSN{
newCumulativeTSN: a.peerLastTSN, // old TSN
newCumulativeTSN: a.peerLastTSN(), // old TSN
streams: []chunkForwardTSNStream{
{identifier: 0, sequence: 1},
},
Expand All @@ -1395,7 +1381,7 @@ func TestHandleForwardTSN(t *testing.T) {
a.lock.Lock()
ackState := a.ackState
a.lock.Unlock()
assert.Equal(t, a.peerLastTSN, prevTSN, "peerLastTSN should not advance")
assert.Equal(t, a.peerLastTSN(), prevTSN, "peerLastTSN should not advance")
assert.Equal(t, ackStateImmediate, ackState, "sack should be requested")
assert.Nil(t, p, "should return nil")
})
Expand Down Expand Up @@ -1690,7 +1676,7 @@ func TestAssocCreateNewStream(t *testing.T) {
toBeIgnored := &chunkPayloadData{
beginningFragment: true,
endingFragment: true,
tsn: a.peerLastTSN + 1,
tsn: a.peerLastTSN() + 1,
streamIdentifier: newSI,
userData: []byte("ABC"),
}
Expand Down Expand Up @@ -2482,7 +2468,7 @@ func TestAssocHandleInit(t *testing.T) {
return
}
assert.NoError(t, err, "should succeed")
assert.Equal(t, init.initialTSN-1, a.peerLastTSN, "should match")
assert.Equal(t, init.initialTSN-1, a.peerLastTSN(), "should match")
assert.Equal(t, uint16(1001), a.myMaxNumOutboundStreams, "should match")
assert.Equal(t, uint16(1002), a.myMaxNumInboundStreams, "should match")
assert.Equal(t, uint32(5678), a.peerVerificationTag, "should match")
Expand Down
Loading
Loading