From d0c92a5c45788ae1f913cf32a39122b93f500b6c Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 12 Aug 2024 22:05:28 +0530 Subject: [PATCH 1/2] Provide SCTP Association OnClose callback --- sctptransport.go | 32 +++++++++++++++++-- sctptransport_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/sctptransport.go b/sctptransport.go index a468cbc98d6..8167423b7ef 100644 --- a/sctptransport.go +++ b/sctptransport.go @@ -45,6 +45,7 @@ type SCTPTransport struct { // OnStateChange func() onErrorHandler func(error) + onCloseHandler func(error) sctpAssociation *sctp.Association onDataChannelHandler func(*DataChannel) @@ -174,6 +175,7 @@ func (r *SCTPTransport) acceptDataChannels(a *sctp.Association) { dataChannels = append(dataChannels, dc.dataChannel) } r.lock.RUnlock() + ACCEPT: for { dc, err := datachannel.Accept(a, &datachannel.Config{ @@ -183,6 +185,9 @@ ACCEPT: if !errors.Is(err, io.EOF) { r.log.Errorf("Failed to accept data channel: %v", err) r.onError(err) + r.onClose(err) + } else { + r.onClose(nil) } return } @@ -230,9 +235,14 @@ ACCEPT: MaxRetransmits: maxRetransmits, }, r, r.api.settingEngine.LoggerFactory.NewLogger("ortc")) if err != nil { + // This data channel is invalid. Close it and log an error. + if err1 := dc.Close(); err1 != nil { + r.log.Errorf("Failed to close invalid data channel: %v", err1) + } r.log.Errorf("Failed to accept data channel: %v", err) r.onError(err) - return + // We've received a datachannel with invalid configuration. We can still receive other datachannels. + continue ACCEPT } <-r.onDataChannel(rtcDC) @@ -249,8 +259,7 @@ ACCEPT: } } -// OnError sets an event handler which is invoked when -// the SCTP connection error occurs. +// OnError sets an event handler which is invoked when the SCTP Association errors. func (r *SCTPTransport) OnError(f func(err error)) { r.lock.Lock() defer r.lock.Unlock() @@ -267,6 +276,23 @@ func (r *SCTPTransport) onError(err error) { } } +// OnClose sets an event handler which is invoked when the SCTP Association closes. +func (r *SCTPTransport) OnClose(f func(err error)) { + r.lock.Lock() + defer r.lock.Unlock() + r.onCloseHandler = f +} + +func (r *SCTPTransport) onClose(err error) { + r.lock.RLock() + handler := r.onCloseHandler + r.lock.RUnlock() + + if handler != nil { + go handler(err) + } +} + // OnDataChannel sets an event handler which is invoked when a data // channel message arrives from a remote peer. func (r *SCTPTransport) OnDataChannel(f func(*DataChannel)) { diff --git a/sctptransport_test.go b/sctptransport_test.go index 74313debca0..d8eb6155d7f 100644 --- a/sctptransport_test.go +++ b/sctptransport_test.go @@ -6,7 +6,13 @@ package webrtc -import "testing" +import ( + "bytes" + "testing" + "time" + + "github.com/stretchr/testify/require" +) func TestGenerateDataChannelID(t *testing.T) { sctpTransportWithChannels := func(ids []uint16) *SCTPTransport { @@ -48,3 +54,66 @@ func TestGenerateDataChannelID(t *testing.T) { } } } + +func TestSCTPTransportOnClose(t *testing.T) { + offerPC, answerPC, err := newPair() + require.NoError(t, err) + + answerPC.OnDataChannel(func(dc *DataChannel) { + dc.OnMessage(func(_ DataChannelMessage) { + if err1 := dc.Send([]byte("hello")); err1 != nil { + t.Error("failed to send message") + } + }) + }) + + recvMsg := make(chan struct{}, 1) + offerPC.OnConnectionStateChange(func(state PeerConnectionState) { + if state == PeerConnectionStateConnected { + defer func() { + offerPC.OnConnectionStateChange(nil) + }() + + dc, createErr := offerPC.CreateDataChannel(expectedLabel, nil) + if createErr != nil { + t.Errorf("Failed to create a PC pair for testing") + return + } + dc.OnMessage(func(msg DataChannelMessage) { + if !bytes.Equal(msg.Data, []byte("hello")) { + t.Error("invalid msg received") + } + recvMsg <- struct{}{} + }) + dc.OnOpen(func() { + if err1 := dc.Send([]byte("hello")); err1 != nil { + t.Error("failed to send initial msg", err1) + } + }) + } + }) + + err = signalPair(offerPC, answerPC) + require.NoError(t, err) + + select { + case <-recvMsg: + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } + + // setup SCTP OnClose callback + ch := make(chan error, 1) + answerPC.SCTP().OnClose(func(err error) { + ch <- err + }) + + err = offerPC.Close() // This will trigger sctp onclose callback on remote + require.NoError(t, err) + + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } +} From 484d89ad38ea47a460fd86db2351823622897932 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 13 Aug 2024 00:12:44 +0530 Subject: [PATCH 2/2] Fix SCTPTransport OnClose test --- sctptransport_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sctptransport_test.go b/sctptransport_test.go index d8eb6155d7f..b18fc2281cb 100644 --- a/sctptransport_test.go +++ b/sctptransport_test.go @@ -59,6 +59,8 @@ func TestSCTPTransportOnClose(t *testing.T) { offerPC, answerPC, err := newPair() require.NoError(t, err) + defer closePairNow(t, offerPC, answerPC) + answerPC.OnDataChannel(func(dc *DataChannel) { dc.OnMessage(func(_ DataChannelMessage) { if err1 := dc.Send([]byte("hello")); err1 != nil {