From 9aed055e25da703346b5ce5cbede9b7d1ff75e21 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 3 Apr 2024 14:55:13 +0530 Subject: [PATCH 1/4] Limit the number of reconfig requests --- association.go | 28 ++++++++++++++++- association_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 1 deletion(-) diff --git a/association.go b/association.go index bd79b882..a9b87c7f 100644 --- a/association.go +++ b/association.go @@ -110,6 +110,8 @@ const ( // irrespective of the receive buffer size // see Association.getMaxTSNOffset maxTSNOffset = 40000 + // maxReconfigRequests is the maximum number of reconfig requests we will keep outstanding + maxReconfigRequests = 1000 ) func getAssociationStateString(a uint32) string { @@ -2140,15 +2142,39 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) { switch p := raw.(type) { case *paramOutgoingResetRequest: a.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", a.name) + if a.peerLastTSN < p.senderLastTSN && len(a.reconfigRequests) >= maxReconfigRequests { + // We have too many reconfig requests outstanding. Drop the request and let + // the peer retransmit. A well behaved peer should only have 1 outstanding + // reconfig request. + // + // RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.1.1 + // At any given time, there MUST NOT be more than one request in flight. + // So, if the Re-configuration Timer is running and the RE-CONFIG chunk + // contains at least one request parameter, the chunk MUST be buffered. + // chrome: https://chromium.googlesource.com/external/webrtc/+/refs/heads/main/net/dcsctp/socket/stream_reset_handler.cc#271 + return nil, fmt.Errorf("too many outstanding reconfig requests: %d", len(a.reconfigRequests)) + } a.reconfigRequests[p.reconfigRequestSequenceNumber] = p resp := a.resetStreamsIfAny(p) if resp != nil { return resp, nil } return nil, nil //nolint:nilnil - case *paramReconfigResponse: a.log.Tracef("[%s] handleReconfigParam (ReconfigResponse)", a.name) + if p.result == reconfigResultInProgress { + // RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.7 + // + // If the Result field indicates "In progress", the timer for the + // Re-configuration Request Sequence Number is started again. If + // the timer runs out, the RE-CONFIG chunk MUST be retransmitted + // but the corresponding error counters MUST NOT be incremented. + if len(a.reconfigs) == 0 { + a.tReconfig.stop() + a.tReconfig.start(a.rtoMgr.getRTO()) + } + return nil, nil + } delete(a.reconfigs, p.reconfigResponseSequenceNumber) if len(a.reconfigs) == 0 { a.tReconfig.stop() diff --git a/association_test.go b/association_test.go index bfb8d76e..e21b8411 100644 --- a/association_test.go +++ b/association_test.go @@ -3393,3 +3393,78 @@ func TestDataChunkBundlingIntoPacket(t *testing.T) { } } } + +func TestAssociation_ReconfigRequestsLimited(t *testing.T) { + checkGoroutineLeaks(t) + + lim := test.TimeOut(time.Second * 10) + defer lim.Stop() + + a1chan, a2chan := make(chan *Association), make(chan *Association) + + udp1, udp2 := createUDPConnPair() + + go func() { + a1, err := Client(Config{ + NetConn: udp1, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + assert.NoError(t, err) + a1chan <- a1 + }() + + go func() { + a2, err := Server(Config{ + NetConn: udp2, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + assert.NoError(t, err) + a2chan <- a2 + }() + + a1, a2 := <-a1chan, <-a2chan + defer a1.Close() + defer a2.Close() + + writeStream, err := a1.OpenStream(1, PayloadTypeWebRTCString) + require.NoError(t, err) + + readStream, err := a2.OpenStream(1, PayloadTypeWebRTCString) + require.NoError(t, err) + + // exchange some data + testData := []byte("test") + _, err = writeStream.Write(testData) + require.NoError(t, err) + + buf := make([]byte, len(testData)) + _, err = readStream.Read(buf) + assert.NoError(t, err) + assert.Equal(t, testData, buf) + + a1.lock.RLock() + tsn := a1.myNextTSN + a1.lock.RUnlock() + for i := 0; i < maxReconfigRequests+100; i++ { + c := &chunkReconfig{ + paramA: ¶mOutgoingResetRequest{ + reconfigRequestSequenceNumber: 10 + uint32(i), + senderLastTSN: tsn + 10, // has to be enqueued + streamIdentifiers: []uint16{uint16(i)}, + }, + } + p := a1.createPacket([]chunk{c}) + buf, err := p.marshal(true) + require.NoError(t, err) + _, err = a1.netConn.Write(buf) + require.NoError(t, err) + if i%100 == 0 { + time.Sleep(100 * time.Millisecond) + } + } + // Let a2 process the requests + time.Sleep(2 * time.Second) + a2.lock.RLock() + require.LessOrEqual(t, len(a2.reconfigRequests), maxReconfigRequests) + a2.lock.RUnlock() +} From a3eaf5665816431eca28c760ab1bbb53d92b37f1 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 4 Apr 2024 11:18:14 +0530 Subject: [PATCH 2/4] Fix bug with timer reset --- association.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/association.go b/association.go index a9b87c7f..3938df8b 100644 --- a/association.go +++ b/association.go @@ -2169,7 +2169,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) { // Re-configuration Request Sequence Number is started again. If // the timer runs out, the RE-CONFIG chunk MUST be retransmitted // but the corresponding error counters MUST NOT be incremented. - if len(a.reconfigs) == 0 { + if _, ok := a.reconfigs[p.reconfigResponseSequenceNumber]; ok { a.tReconfig.stop() a.tReconfig.start(a.rtoMgr.getRTO()) } From d8ac9789a46d16f9cc33437ef706d1fe6c12d6d0 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 4 Apr 2024 19:45:46 +0530 Subject: [PATCH 3/4] Fix lint --- association.go | 3 ++- association_test.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/association.go b/association.go index 3938df8b..c7942221 100644 --- a/association.go +++ b/association.go @@ -50,6 +50,7 @@ var ( ErrChunkTypeUnhandled = errors.New("unhandled chunk type") ErrHandshakeInitAck = errors.New("handshake failed (INIT ACK)") ErrHandshakeCookieEcho = errors.New("handshake failed (COOKIE ECHO)") + ErrTooManyReconfigRequests = errors.New("too many outstanding reconfig requests") ) const ( @@ -2152,7 +2153,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) { // So, if the Re-configuration Timer is running and the RE-CONFIG chunk // contains at least one request parameter, the chunk MUST be buffered. // chrome: https://chromium.googlesource.com/external/webrtc/+/refs/heads/main/net/dcsctp/socket/stream_reset_handler.cc#271 - return nil, fmt.Errorf("too many outstanding reconfig requests: %d", len(a.reconfigRequests)) + return nil, fmt.Errorf("%w: %d", ErrTooManyReconfigRequests, len(a.reconfigRequests)) } a.reconfigRequests[p.reconfigRequestSequenceNumber] = p resp := a.resetStreamsIfAny(p) diff --git a/association_test.go b/association_test.go index e21b8411..8b87c952 100644 --- a/association_test.go +++ b/association_test.go @@ -3423,8 +3423,6 @@ func TestAssociation_ReconfigRequestsLimited(t *testing.T) { }() a1, a2 := <-a1chan, <-a2chan - defer a1.Close() - defer a2.Close() writeStream, err := a1.OpenStream(1, PayloadTypeWebRTCString) require.NoError(t, err) @@ -3467,4 +3465,7 @@ func TestAssociation_ReconfigRequestsLimited(t *testing.T) { a2.lock.RLock() require.LessOrEqual(t, len(a2.reconfigRequests), maxReconfigRequests) a2.lock.RUnlock() + + require.NoError(t, a1.Close()) + require.NoError(t, a2.Close()) } From eeff1eb255ab42680fff3807f860ce8775874f77 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 4 Apr 2024 16:48:02 -0700 Subject: [PATCH 4/4] Update association.go --- association.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/association.go b/association.go index c7942221..46881b1d 100644 --- a/association.go +++ b/association.go @@ -2174,7 +2174,7 @@ func (a *Association) handleReconfigParam(raw param) (*packet, error) { a.tReconfig.stop() a.tReconfig.start(a.rtoMgr.getRTO()) } - return nil, nil + return nil, nil //nolint:nilnil } delete(a.reconfigs, p.reconfigResponseSequenceNumber) if len(a.reconfigs) == 0 {