Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pc.Close wait on spawned goroutines to close #2798

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type DataChannel struct {
readyState atomic.Value // DataChannelState
bufferedAmountLowThreshold uint64
detachCalled bool
readLoopActive chan struct{}

// The binaryType represents attribute MUST, on getting, return the value to
// which it was last set. On setting, if the new value is either the string
Expand Down Expand Up @@ -327,6 +328,7 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
defer d.mu.Unlock()

if !d.api.settingEngine.detach.DataChannels {
d.readLoopActive = make(chan struct{})
go d.readLoop()
}
}
Expand All @@ -350,6 +352,7 @@ func (d *DataChannel) onError(err error) {
}

func (d *DataChannel) readLoop() {
defer close(d.readLoopActive)
buffer := make([]byte, dataChannelBufferSize)
for {
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
Expand Down Expand Up @@ -449,6 +452,22 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
// Close Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer.
func (d *DataChannel) Close() error {
return d.close(false)
}

// Normally, close only stops writes from happening, so waitForReadsDone=true
// will wait for reads to be finished based on underlying SCTP association
// closure or a SCTP reset stream from the other side. This is safe to call
// with waitForReadsDone=true after tearing down a PeerConnection but not
// necessarily before. For example, if you used a vnet and dropped all packets
// right before closing the DataChannel, you'd need never see a reset stream.
func (d *DataChannel) close(waitForReadsDone bool) error {
if waitForReadsDone && d.readLoopActive != nil {
defer func() {
<-d.readLoopActive
}()
}

d.mu.Lock()
haveSctpTransport := d.dataChannel != nil
d.mu.Unlock()
Expand Down
35 changes: 31 additions & 4 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type PeerConnection struct {
idpLoginURL *string

isClosed *atomicBool
isClosedDone chan struct{}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool

Expand Down Expand Up @@ -116,6 +117,7 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
ICECandidatePoolSize: 0,
},
isClosed: &atomicBool{},
isClosedDone: make(chan struct{}),
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
Expand Down Expand Up @@ -2044,14 +2046,31 @@ func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes
return pc.dtlsTransport.WriteRTCP(pkts)
}

// Close ends the PeerConnection
// Close ends the PeerConnection.
// It will make a best effort to wait for all underlying goroutines it spawned to finish,
// except for cases that would cause deadlocks with itself.
func (pc *PeerConnection) Close() error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
if pc.isClosed.swap(true) {
// someone else got here first but may still be closing (e.g. via DTLS close_notify)
<-pc.isClosedDone
return nil
}
defer close(pc.isClosedDone)

// Try closing everything and collect the errors
// Shutdown strategy:
// 1. Close all data channels.
// 2. All Conn close by closing their underlying Conn.
// 3. A Mux stops this chain. It won't close the underlying
// Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed.
pc.sctpTransport.lock.Lock()
closeErrs := make([]error, 0, 4+len(pc.sctpTransport.dataChannels))
pc.sctpTransport.lock.Unlock()

// canon steps
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)

Expand All @@ -2061,7 +2080,6 @@ func (pc *PeerConnection) Close() error {
// 2. A Mux stops this chain. It won't close the underlying
// Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed.
closeErrs := make([]error, 4)
Sean-Der marked this conversation as resolved.
Show resolved Hide resolved

closeErrs = append(closeErrs, pc.api.interceptor.Close())

Expand All @@ -2088,7 +2106,6 @@ func (pc *PeerConnection) Close() error {

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if pc.iceTransport != nil {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
Expand All @@ -2097,6 +2114,13 @@ func (pc *PeerConnection) Close() error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())

// non-canon steps
pc.sctpTransport.lock.Lock()
Copy link
Member Author

@edaniels edaniels Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sean-Der I moved these steps lower. I also made Close not block since it's not clear to me if that's allowed per the spec. AFAICT, a normal Close waits for both sides to see the reset stream and let reads come through in the meantime.

for _, d := range pc.sctpTransport.dataChannels {
closeErrs = append(closeErrs, d.close(true))
}
pc.sctpTransport.lock.Unlock()

return util.FlattenErrs(closeErrs)
}

Expand Down Expand Up @@ -2268,8 +2292,11 @@ func (pc *PeerConnection) startTransports(iceRole ICERole, dtlsRole DTLSRole, re
}

pc.dtlsTransport.internalOnCloseHandler = func() {
pc.log.Info("Closing PeerConnection from DTLS CloseNotify")
if pc.isClosed.get() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this getting fired multiple times right now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't, but it can get fired from a DTLS closenotify while Close is happening, if the timing is right.

return
}

pc.log.Info("Closing PeerConnection from DTLS CloseNotify")
go func() {
if pcClosErr := pc.Close(); pcClosErr != nil {
pc.log.Warnf("Failed to close PeerConnection from DTLS CloseNotify: %s", pcClosErr)
Expand Down
102 changes: 102 additions & 0 deletions peerconnection_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package webrtc

import (
"runtime"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -179,3 +181,103 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
t.Error("pcOffer.Close() Timeout")
}
}

func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()

report := CheckRoutinesIntolerant(t)
defer report()

pcOffer, pcAnswer, err := newPair()
if err != nil {
t.Fatal(err)
}

var dcAnswer *DataChannel
answerDataChannelOpened := make(chan struct{})
pcAnswer.OnDataChannel(func(d *DataChannel) {
// Make sure this is the data channel we were looking for. (Not the one
// created in signalPair).
if d.Label() != "data" {
return
}
dcAnswer = d
close(answerDataChannelOpened)
})

dcOffer, err := pcOffer.CreateDataChannel("data", nil)
if err != nil {
t.Fatal(err)
}

offerDataChannelOpened := make(chan struct{})
dcOffer.OnOpen(func() {
close(offerDataChannelOpened)
})

err = signalPair(pcOffer, pcAnswer)
if err != nil {
t.Fatal(err)
}

<-offerDataChannelOpened
<-answerDataChannelOpened

msgNum := 0
dcOffer.OnMessage(func(_ DataChannelMessage) {
t.Log("msg", msgNum)
msgNum++
})

// send 50 messages, then close pcOffer, and then send another 50
for i := 0; i < 100; i++ {
if i == 50 {
err = pcOffer.Close()
if err != nil {
t.Fatal(err)
}
}
_ = dcAnswer.Send([]byte("hello!"))
}

err = pcAnswer.Close()
if err != nil {
t.Fatal(err)
}
}

// CheckRoutinesIntolerant is used to check for leaked go-routines.
// It differs from test.CheckRoutines in that it won't wait at all
// for lingering goroutines. This is helpful for tests that need
// to ensure clean closure of resources.
func CheckRoutinesIntolerant(t *testing.T) func() {
return func() {
routines := getRoutines()
if len(routines) == 0 {
return
}
t.Fatalf("%s: \n%s", "Unexpected routines on test end", strings.Join(routines, "\n\n")) // nolint
}
}

func getRoutines() []string {
buf := make([]byte, 2<<20)
buf = buf[:runtime.Stack(buf, true)]
return filterRoutines(strings.Split(string(buf), "\n\n"))
}

func filterRoutines(routines []string) []string {
result := []string{}
for _, stack := range routines {
if stack == "" || // Empty
strings.Contains(stack, "testing.Main(") || // Tests
strings.Contains(stack, "testing.(*T).Run(") || // Test run
strings.Contains(stack, "getRoutines(") { // This routine
continue
}
result = append(result, stack)
}
return result
}
Loading