Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert 7c8bfbd44a and add test #2810

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions datachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type DataChannel struct {
readyState atomic.Value // DataChannelState
bufferedAmountLowThreshold uint64
detachCalled bool
readLoopActive chan struct{}

// The binaryType represents attribute MUST, on getting, return the value to
// which it was last set. On setting, if the new value is either the string
Expand Down Expand Up @@ -328,7 +327,6 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
defer d.mu.Unlock()

if !d.api.settingEngine.detach.DataChannels {
d.readLoopActive = make(chan struct{})
go d.readLoop()
}
}
Expand All @@ -352,7 +350,6 @@ func (d *DataChannel) onError(err error) {
}

func (d *DataChannel) readLoop() {
defer close(d.readLoopActive)
buffer := make([]byte, dataChannelBufferSize)
for {
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
Expand Down Expand Up @@ -452,22 +449,6 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
// Close Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer.
func (d *DataChannel) Close() error {
return d.close(false)
}

// Normally, close only stops writes from happening, so waitForReadsDone=true
// will wait for reads to be finished based on underlying SCTP association
// closure or a SCTP reset stream from the other side. This is safe to call
// with waitForReadsDone=true after tearing down a PeerConnection but not
// necessarily before. For example, if you used a vnet and dropped all packets
// right before closing the DataChannel, you'd need never see a reset stream.
func (d *DataChannel) close(waitForReadsDone bool) error {
if waitForReadsDone && d.readLoopActive != nil {
defer func() {
<-d.readLoopActive
}()
}

d.mu.Lock()
haveSctpTransport := d.dataChannel != nil
d.mu.Unlock()
Expand Down
35 changes: 4 additions & 31 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type PeerConnection struct {
idpLoginURL *string

isClosed *atomicBool
isClosedDone chan struct{}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool

Expand Down Expand Up @@ -117,7 +116,6 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
ICECandidatePoolSize: 0,
},
isClosed: &atomicBool{},
isClosedDone: make(chan struct{}),
isNegotiationNeeded: &atomicBool{},
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "",
Expand Down Expand Up @@ -2046,31 +2044,14 @@ func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes
return pc.dtlsTransport.WriteRTCP(pkts)
}

// Close ends the PeerConnection.
// It will make a best effort to wait for all underlying goroutines it spawned to finish,
// except for cases that would cause deadlocks with itself.
// Close ends the PeerConnection
func (pc *PeerConnection) Close() error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
if pc.isClosed.swap(true) {
// someone else got here first but may still be closing (e.g. via DTLS close_notify)
<-pc.isClosedDone
return nil
}
defer close(pc.isClosedDone)

// Try closing everything and collect the errors
// Shutdown strategy:
// 1. Close all data channels.
// 2. All Conn close by closing their underlying Conn.
// 3. A Mux stops this chain. It won't close the underlying
// Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed.
pc.sctpTransport.lock.Lock()
closeErrs := make([]error, 0, 4+len(pc.sctpTransport.dataChannels))
pc.sctpTransport.lock.Unlock()

// canon steps
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
pc.signalingState.Set(SignalingStateClosed)

Expand All @@ -2080,6 +2061,7 @@ func (pc *PeerConnection) Close() error {
// 2. A Mux stops this chain. It won't close the underlying
// Conn if one of the endpoints is closed down. To
// continue the chain the Mux has to be closed.
closeErrs := make([]error, 4)

closeErrs = append(closeErrs, pc.api.interceptor.Close())

Expand All @@ -2106,6 +2088,7 @@ func (pc *PeerConnection) Close() error {

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if pc.iceTransport != nil {
closeErrs = append(closeErrs, pc.iceTransport.Stop())
Expand All @@ -2114,13 +2097,6 @@ func (pc *PeerConnection) Close() error {
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())

// non-canon steps
pc.sctpTransport.lock.Lock()
for _, d := range pc.sctpTransport.dataChannels {
closeErrs = append(closeErrs, d.close(true))
}
pc.sctpTransport.lock.Unlock()

return util.FlattenErrs(closeErrs)
}

Expand Down Expand Up @@ -2292,11 +2268,8 @@ func (pc *PeerConnection) startTransports(iceRole ICERole, dtlsRole DTLSRole, re
}

pc.dtlsTransport.internalOnCloseHandler = func() {
if pc.isClosed.get() {
return
}

pc.log.Info("Closing PeerConnection from DTLS CloseNotify")

go func() {
if pcClosErr := pc.Close(); pcClosErr != nil {
pc.log.Warnf("Failed to close PeerConnection from DTLS CloseNotify: %s", pcClosErr)
Expand Down
102 changes: 0 additions & 102 deletions peerconnection_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
package webrtc

import (
"runtime"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -181,103 +179,3 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
t.Error("pcOffer.Close() Timeout")
}
}

func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()

report := CheckRoutinesIntolerant(t)
defer report()

pcOffer, pcAnswer, err := newPair()
if err != nil {
t.Fatal(err)
}

var dcAnswer *DataChannel
answerDataChannelOpened := make(chan struct{})
pcAnswer.OnDataChannel(func(d *DataChannel) {
// Make sure this is the data channel we were looking for. (Not the one
// created in signalPair).
if d.Label() != "data" {
return
}
dcAnswer = d
close(answerDataChannelOpened)
})

dcOffer, err := pcOffer.CreateDataChannel("data", nil)
if err != nil {
t.Fatal(err)
}

offerDataChannelOpened := make(chan struct{})
dcOffer.OnOpen(func() {
close(offerDataChannelOpened)
})

err = signalPair(pcOffer, pcAnswer)
if err != nil {
t.Fatal(err)
}

<-offerDataChannelOpened
<-answerDataChannelOpened

msgNum := 0
dcOffer.OnMessage(func(_ DataChannelMessage) {
t.Log("msg", msgNum)
msgNum++
})

// send 50 messages, then close pcOffer, and then send another 50
for i := 0; i < 100; i++ {
if i == 50 {
err = pcOffer.Close()
if err != nil {
t.Fatal(err)
}
}
_ = dcAnswer.Send([]byte("hello!"))
}

err = pcAnswer.Close()
if err != nil {
t.Fatal(err)
}
}

// CheckRoutinesIntolerant is used to check for leaked go-routines.
// It differs from test.CheckRoutines in that it won't wait at all
// for lingering goroutines. This is helpful for tests that need
// to ensure clean closure of resources.
func CheckRoutinesIntolerant(t *testing.T) func() {
return func() {
routines := getRoutines()
if len(routines) == 0 {
return
}
t.Fatalf("%s: \n%s", "Unexpected routines on test end", strings.Join(routines, "\n\n")) // nolint
}
}

func getRoutines() []string {
buf := make([]byte, 2<<20)
buf = buf[:runtime.Stack(buf, true)]
return filterRoutines(strings.Split(string(buf), "\n\n"))
}

func filterRoutines(routines []string) []string {
result := []string{}
for _, stack := range routines {
if stack == "" || // Empty
strings.Contains(stack, "testing.Main(") || // Tests
strings.Contains(stack, "testing.(*T).Run(") || // Test run
strings.Contains(stack, "getRoutines(") { // This routine
continue
}
result = append(result, stack)
}
return result
}
34 changes: 34 additions & 0 deletions peerconnection_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1606,3 +1606,37 @@ func TestPeerConnectionState(t *testing.T) {
assert.NoError(t, pc.Close())
assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState())
}

func TestPeerConnectionDeadlock(t *testing.T) {
lim := test.TimeOut(time.Second * 5)
defer lim.Stop()

report := test.CheckRoutines(t)
defer report()

closeHdlr := func(peerConnection *PeerConnection) {
peerConnection.OnICEConnectionStateChange(func(i ICEConnectionState) {
if i == ICEConnectionStateFailed || i == ICEConnectionStateClosed {
if err := peerConnection.Close(); err != nil {
assert.NoError(t, err)
}
}
})
}

pcOffer, pcAnswer, err := NewAPI().newPair(Configuration{})
assert.NoError(t, err)

assert.NoError(t, signalPair(pcOffer, pcAnswer))

onDataChannel, onDataChannelCancel := context.WithCancel(context.Background())
pcAnswer.OnDataChannel(func(*DataChannel) {
onDataChannelCancel()
})
<-onDataChannel.Done()

closeHdlr(pcOffer)
closeHdlr(pcAnswer)

closePairNow(t, pcOffer, pcAnswer)
}
Loading