Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle non-media probes #2816

Merged
merged 1 commit into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,59 @@ func Test_InterceptorRegistry_Build(t *testing.T) {
},
})

peerConnectionA, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
assert.NoError(t, err)

peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
peerConnectionA, peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).newPair(Configuration{})
assert.NoError(t, err)

assert.Equal(t, 2, registryBuildCount)
closePairNow(t, peerConnectionA, peerConnectionB)
}

func Test_Interceptor_ZeroSSRC(t *testing.T) {
to := test.TimeOut(time.Second * 20)
defer to.Stop()

report := test.CheckRoutines(t)
defer report()

track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)

offerer, answerer, err := newPair()
assert.NoError(t, err)

_, err = offerer.AddTrack(track)
assert.NoError(t, err)

probeReceiverCreated := make(chan struct{})

go func() {
sequenceNumber := uint16(0)
for range time.NewTicker(time.Millisecond * 20).C {
track.mu.Lock()
if len(track.bindings) == 1 {
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: 0,
SequenceNumber: sequenceNumber,
}, []byte{0, 1, 2, 3, 4, 5})
assert.NoError(t, err)
}
sequenceNumber++
track.mu.Unlock()

if nonMediaBandwidthProbe, ok := answerer.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
assert.Equal(t, len(nonMediaBandwidthProbe.Tracks()), 1)
close(probeReceiverCreated)
return
}
}
}()

assert.NoError(t, signalPair(offerer, answerer))

peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
peerConnectionConnected.Wait()

<-probeReceiverCreated
closePairNow(t, offerer, answerer)
}
37 changes: 36 additions & 1 deletion peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
// should be defined (see JSEP 3.4.1).
greaterMid int

rtpTransceivers []*RTPTransceiver
rtpTransceivers []*RTPTransceiver
nonMediaBandwidthProbe atomic.Value // RTPReceiver

onSignalingStateChangeHandler func(SignalingState)
onICEConnectionStateChangeHandler atomic.Value // func(ICEConnectionState)
Expand Down Expand Up @@ -1524,6 +1525,32 @@
return true, nil
}

// Chrome sends probing traffic on SSRC 0. This reads the packets to ensure that we properly
// generate TWCC reports for it. Since this isn't actually media we don't pass this to the user
func (pc *PeerConnection) handleNonMediaBandwidthProbe() {
nonMediaBandwidthProbe, err := pc.api.NewRTPReceiver(RTPCodecTypeVideo, pc.dtlsTransport)
if err != nil {
pc.log.Errorf("handleNonMediaBandwidthProbe failed to create RTPReceiver: %v", err)
return

Check warning on line 1534 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1533-L1534

Added lines #L1533 - L1534 were not covered by tests
}

if err = nonMediaBandwidthProbe.Receive(RTPReceiveParameters{
Encodings: []RTPDecodingParameters{{RTPCodingParameters: RTPCodingParameters{}}},
}); err != nil {
pc.log.Errorf("handleNonMediaBandwidthProbe failed to start RTPReceiver: %v", err)
return

Check warning on line 1541 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1540-L1541

Added lines #L1540 - L1541 were not covered by tests
}

pc.nonMediaBandwidthProbe.Store(nonMediaBandwidthProbe)
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
for {
if _, _, err = nonMediaBandwidthProbe.readRTP(b, nonMediaBandwidthProbe.Track()); err != nil {
pc.log.Tracef("handleNonMediaBandwidthProbe read exiting: %v", err)
return
}
}
}

func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocognit
remoteDescription := pc.RemoteDescription()
if remoteDescription == nil {
Expand Down Expand Up @@ -1656,6 +1683,11 @@
continue
}

if ssrc == 0 {
go pc.handleNonMediaBandwidthProbe()
continue
}

pc.dtlsTransport.storeSimulcastStream(stream)

if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines {
Expand Down Expand Up @@ -2072,6 +2104,9 @@
closeErrs = append(closeErrs, t.Stop())
}
}
if nonMediaBandwidthProbe, ok := pc.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
closeErrs = append(closeErrs, nonMediaBandwidthProbe.Stop())
}
pc.mu.Unlock()

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
Expand Down
6 changes: 3 additions & 3 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
SequenceNumber: sequenceNumber,
PayloadType: 96,
Padding: true,
SSRC: uint32(i),
SSRC: uint32(i + 1),
},
Payload: []byte{0x00, 0x02},
}
Expand All @@ -1547,7 +1547,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
SSRC: uint32(i + 1),
},
Payload: []byte{0x00},
}
Expand Down Expand Up @@ -1591,7 +1591,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
Version: 2,
SequenceNumber: sequenceNumber,
PayloadType: 96,
SSRC: uint32(i),
SSRC: uint32(i + 1),
},
Payload: []byte{0x00},
}
Expand Down
2 changes: 1 addition & 1 deletion peerconnection_renegotiation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func TestPeerConnection_Renegotiation_Simulcast(t *testing.T) {
for ssrc, rid := range rids {
header := &rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
SSRC: uint32(ssrc + 1),
SequenceNumber: sequenceNumber,
PayloadType: 96,
}
Expand Down
12 changes: 5 additions & 7 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@

var t *trackStreams
for idx, ts := range r.tracks {
if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC {
if ts.track != nil && ts.track.SSRC() == parameters.Encodings[i].SSRC {
t = &r.tracks[idx]
break
}
Expand All @@ -210,12 +210,10 @@
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
}

if parameters.Encodings[i].SSRC != 0 {
t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
return err
}
t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
return err

Check warning on line 216 in rtpreceiver.go

View check run for this annotation

Codecov / codecov/patch

rtpreceiver.go#L216

Added line #L216 was not covered by tests
}

if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
Expand Down
Loading