Skip to content

Commit

Permalink
Exclude inflight size from bufferedAmount
Browse files Browse the repository at this point in the history
Relates to #218
  • Loading branch information
enobufs committed Apr 23, 2022
1 parent 6f8c8bd commit 3b0b07a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 18 deletions.
47 changes: 35 additions & 12 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,14 +1617,6 @@ func (a *Association) handleSack(d *chunkSelectiveAck) error {
a.onCumulativeTSNAckPointAdvanced(totalBytesAcked)
}

for si, nBytesAcked := range bytesAckedPerStream {
if s, ok := a.streams[si]; ok {
a.lock.Unlock()
s.onBufferReleased(nBytesAcked)
a.lock.Lock()
}
}

// New rwnd value
// RFC 4960 sec 6.2.1. Processing a Received SACK
// D)
Expand Down Expand Up @@ -2012,13 +2004,17 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
// is 0), the data sender can always have one DATA chunk in flight to
// the receiver if allowed by cwnd (see rule B, below).

bytesSentPerStream := map[uint16]int{}

for {
c := a.pendingQueue.peek()
if c == nil {
a.log.Debug("no more pending data")
break // no more pending data
}

dataLen := uint32(len(c.userData))
dataLen := len(c.userData)

if dataLen == 0 {
sisToReset = append(sisToReset, c.streamIdentifier)
err := a.pendingQueue.pop(c)
Expand All @@ -2028,15 +2024,24 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
continue
}

if uint32(a.inflightQueue.getNumBytes())+dataLen > a.cwnd {
if uint32(a.inflightQueue.getNumBytes())+uint32(dataLen) > a.cwnd {
a.log.Debugf("would exceed cwnd: inflight=%d, dataLen=%d cwnd=%d", a.inflightQueue.getNumBytes(), dataLen, a.cwnd)
break // would exceeds cwnd
}

if dataLen > a.rwnd {
if uint32(dataLen) > a.rwnd {
a.log.Debugf("no more rwnd: dataLen=%d rwnd=%d", dataLen, a.rwnd)
break // no more rwnd
}

a.rwnd -= dataLen
a.rwnd -= uint32(dataLen)

// Sum the number of bytes sent per stream
if amount, ok := bytesSentPerStream[c.streamIdentifier]; ok {
bytesSentPerStream[c.streamIdentifier] = amount + dataLen
} else {
bytesSentPerStream[c.streamIdentifier] = dataLen
}

a.movePendingDataChunkToInflightQueue(c)
chunks = append(chunks, c)
Expand All @@ -2047,10 +2052,28 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1
// Send zero window probe
c := a.pendingQueue.peek()
if c != nil {
dataLen := len(c.userData)

// Sum the number of bytes sent per stream
if amount, ok := bytesSentPerStream[c.streamIdentifier]; ok {
bytesSentPerStream[c.streamIdentifier] = amount + dataLen
} else {
bytesSentPerStream[c.streamIdentifier] = dataLen
}

a.movePendingDataChunkToInflightQueue(c)
chunks = append(chunks, c)
}
}

// Report the size of user data to be sent
for si, nBytesSent := range bytesSentPerStream {
if s, ok := a.streams[si]; ok {
a.lock.Unlock()
s.onBufferReleased(nBytesSent)
a.lock.Lock()
}
}
}

return chunks, sisToReset
Expand Down
17 changes: 14 additions & 3 deletions association_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !js
// +build !js

package sctp
Expand Down Expand Up @@ -1809,7 +1810,7 @@ func TestAssocCongestionControl(t *testing.T) {
}

// Repeat calling br.Tick() until the buffered amount becomes 0
for s0.BufferedAmount() > 0 && nPacketsReceived < nPacketsToSend {
for nPacketsReceived < nPacketsToSend {
for {
n = br.Tick()
if n == 0 {
Expand Down Expand Up @@ -1847,6 +1848,7 @@ func TestAssocCongestionControl(t *testing.T) {
assert.True(t, cwnd > ssthresh, "should be in congestion avoidance mode")
assert.True(t, ssthresh >= maxReceiveBufferSize, "should not be less than the initial size of 128KB")

assert.Equal(t, uint64(0), s0.BufferedAmount())
assert.Equal(t, nPacketsReceived, nPacketsToSend, "unexpected num of packets received")
assert.Equal(t, 0, s1.getNumBytesInReassemblyQueue(), "reassembly queue should be empty")

Expand Down Expand Up @@ -1896,7 +1898,7 @@ func TestAssocCongestionControl(t *testing.T) {
// 2. Wait until the sender's cwnd becomes 1*MTU (RTO occurred)
// 3. Stat reading a1's data
var hasRTOed bool
for s0.BufferedAmount() > 0 && nPacketsReceived < nPacketsToSend {
for nPacketsReceived < nPacketsToSend {
for {
n = br.Tick()
if n == 0 {
Expand Down Expand Up @@ -1942,6 +1944,7 @@ func TestAssocCongestionControl(t *testing.T) {

br.Process()

assert.Equal(t, uint64(0), s0.BufferedAmount())
assert.Equal(t, nPacketsReceived, nPacketsToSend, "unexpected num of packets received")
assert.Equal(t, 0, s1.getNumBytesInReassemblyQueue(), "reassembly queue should be empty")

Expand Down Expand Up @@ -1984,13 +1987,20 @@ func TestAssocDelayedAck(t *testing.T) {
a1.stats.reset()

// Writes data (will fragmented)
t.Logf("writing data with size %d", len(sbuf))
n, err = s0.WriteSCTP(sbuf, PayloadTypeWebRTCBinary)
assert.Nil(t, err, "WriteSCTP failed")
assert.Equal(t, n, len(sbuf), "unexpected length of received data")

// Repeat calling br.Tick() until the buffered amount becomes 0
since := time.Now()
for s0.BufferedAmount() > 0 {
for {
a0.lock.RLock()
unackedSize := a0.inflightQueue.size()
a0.lock.RUnlock()
if nPacketsReceived > 0 && unackedSize == 0 {
break
}
for {
n = br.Tick()
if n == 0 {
Expand All @@ -2009,6 +2019,7 @@ func TestAssocDelayedAck(t *testing.T) {
if !assert.Nil(t, err, "ReadSCTP failed") {
return
}
t.Logf("read data with size %d", n)
assert.Equal(t, len(sbuf), n, "unexpected length of received data")
assert.Equal(t, ppi, PayloadTypeWebRTCBinary, "unexpected ppi")

Expand Down
6 changes: 3 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ func (s *Stream) handleData(pd *chunkPayloadData) {
var readable bool
if s.reassemblyQueue.push(pd) {
readable = s.reassemblyQueue.isReadable()
s.log.Debugf("[%s] reassemblyQueue readable=%v", s.name, readable)
s.log.Tracef("[%s] reassemblyQueue readable=%v", s.name, readable)
if readable {
s.log.Debugf("[%s] readNotifier.signal()", s.name)
s.log.Tracef("[%s] readNotifier.signal()", s.name)
s.readNotifier.Signal()
s.log.Debugf("[%s] readNotifier.signal() done", s.name)
s.log.Tracef("[%s] readNotifier.signal() done", s.name)
}
}
}
Expand Down

0 comments on commit 3b0b07a

Please sign in to comment.