Skip to content

Commit

Permalink
Add E2E Test for RTX
Browse files Browse the repository at this point in the history
Assert that generation of NACKs and sending of RTX operates as expected.
  • Loading branch information
Sean-Der committed Oct 5, 2024
1 parent 32f7063 commit 1e82fbe
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 104 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/pion/datachannel v1.5.9
github.com/pion/dtls/v3 v3.0.2
github.com/pion/ice/v4 v4.0.1
github.com/pion/interceptor v0.1.34
github.com/pion/interceptor v0.1.35
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/rtcp v1.2.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0=
github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k=
github.com/pion/ice/v4 v4.0.1 h1:2d3tPoTR90F3TcGYeXUwucGlXI3hds96cwv4kjZmb9s=
github.com/pion/ice/v4 v4.0.1/go.mod h1:2dpakjpd7+74L5j3TAe6gvkbI5UIzOgAnkimm9SuHvA=
github.com/pion/interceptor v0.1.34 h1:jb1MG9LTdQ4VVCSZDUbUzjeJNngzz4dBXcr2dL+ejfA=
github.com/pion/interceptor v0.1.34/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/interceptor v0.1.35 h1:CshR7vY/1uF20GbD2/edgtcsxwkeCt3G3FUTr0MMwwM=
github.com/pion/interceptor v0.1.35/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
Expand Down
2 changes: 1 addition & 1 deletion peerconnection_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func TestICERestart_Error_Handling(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

offerPeerConnection, answerPeerConnection, wan := createVNetPair(t)
offerPeerConnection, answerPeerConnection, wan := createVNetPair(t, nil)

pushICEState := func(i ICEConnectionState) { iceStates <- i }
offerPeerConnection.OnICEConnectionStateChange(pushICEState)
Expand Down
76 changes: 75 additions & 1 deletion peerconnection_media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestPeerConnection_Media_Disconnected(t *testing.T) {
m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())

pcOffer, pcAnswer, wan := createVNetPair(t)
pcOffer, pcAnswer, wan := createVNetPair(t, nil)

keepPackets := &atomicBool{}
keepPackets.set(true)
Expand Down Expand Up @@ -1780,3 +1781,76 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) {

closePairNow(t, pcOffer, pcAnswer)
}

// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection
// the user gets retransmitted RTP packets with no extra configuration
func Test_PeerConnection_RTX_E2E(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()

pcOffer, pcAnswer, wan := createVNetPair(t, nil)

wan.AddChunkFilter(func(vnet.Chunk) bool {
return rand.Intn(5) != 4 //nolint: gosec
})

track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)

rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)

go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC

rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
pkt, attributes, readRTPErr := track.ReadRTP()
if errors.Is(readRTPErr, io.EOF) {
return
} else if pkt.PayloadType == 0 {
continue
}

assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))

rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))

rtxReadCancel()
}
}
})

assert.NoError(t, signalPair(pcOffer, pcAnswer))

func() {
for {
select {
case <-time.After(20 * time.Millisecond):
writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second})
assert.NoError(t, writeErr)
case <-rtxRead.Done():
return
}
}
}()

assert.NoError(t, wan.Stop())
closePairNow(t, pcOffer, pcAnswer)
}
93 changes: 1 addition & 92 deletions rtpreceiver_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,16 @@ package webrtc

import (
"context"
"encoding/binary"
"errors"
"io"
"testing"
"time"

"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/transport/v3/test"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
)

func TestSetRTPParameters(t *testing.T) {
sender, receiver, wan := createVNetPair(t)
sender, receiver, wan := createVNetPair(t, nil)

outgoingTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
Expand Down Expand Up @@ -75,89 +70,3 @@ func TestSetRTPParameters(t *testing.T) {
assert.NoError(t, wan.Stop())
closePairNow(t, sender, receiver)
}

// Assert the behavior of reading a RTX with a distinct SSRC
// All the attributes should be populated and the packet unpacked
func Test_RTX_Read(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()

pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)

track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)

rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)

rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC

rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
pkt, attributes, readRTPErr := track.ReadRTP()
if errors.Is(readRTPErr, io.EOF) {
return
} else if pkt.PayloadType == 0 {
continue
}

assert.NoError(t, readRTPErr)
assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))
assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD})

rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500)

rtxReadCancel()
}
}
})

assert.NoError(t, signalPair(pcOffer, pcAnswer))

func() {
for i := uint16(0); ; i++ {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
PayloadType: 96,
SequenceNumber: i,
},
Payload: []byte{0xB, 0xA, 0xD},
}

select {
case <-time.After(20 * time.Millisecond):
// Send the original packet
err = track.WriteRTP(&pkt)
assert.NoError(t, err)

rtxPayload := []byte{0x0, 0x0, 0xB, 0xA, 0xD}
binary.BigEndian.PutUint16(rtxPayload[0:2], pkt.Header.SequenceNumber)

// Send the RTX
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: uint32(rtxSsrc),
PayloadType: 97,
SequenceNumber: i + 500,
}, rtxPayload)
assert.NoError(t, err)
case <-rtxRead.Done():
return
}
}
}()

closePairNow(t, pcOffer, pcAnswer)
}
2 changes: 1 addition & 1 deletion rtpreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func Test_RTPReceiver_SetReadDeadline(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

sender, receiver, wan := createVNetPair(t)
sender, receiver, wan := createVNetPair(t, nil)

track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion rtpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/transport/v3/test"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -157,7 +158,7 @@ func Test_RTPSender_SetReadDeadline(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

sender, receiver, wan := createVNetPair(t)
sender, receiver, wan := createVNetPair(t, &interceptor.Registry{})

track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
Expand Down
16 changes: 11 additions & 5 deletions vnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/assert"
)

func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Router) {
func createVNetPair(t *testing.T, interceptorRegistry *interceptor.Registry) (*PeerConnection, *PeerConnection, *vnet.Router) {
// Create a root router
wan, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "1.2.3.0/24",
Expand Down Expand Up @@ -53,12 +53,18 @@ func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Route
// Start the virtual network by calling Start() on the root router
assert.NoError(t, wan.Start())

offerInterceptorRegistry := &interceptor.Registry{}
offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine), WithInterceptorRegistry(offerInterceptorRegistry)).NewPeerConnection(Configuration{})
offerOptions := []func(*API){WithSettingEngine(offerSettingEngine)}
if interceptorRegistry != nil {
offerOptions = append(offerOptions, WithInterceptorRegistry(interceptorRegistry))
}
offerPeerConnection, err := NewAPI(offerOptions...).NewPeerConnection(Configuration{})
assert.NoError(t, err)

answerInterceptorRegistry := &interceptor.Registry{}
answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine), WithInterceptorRegistry(answerInterceptorRegistry)).NewPeerConnection(Configuration{})
answerOptions := []func(*API){WithSettingEngine(answerSettingEngine)}
if interceptorRegistry != nil {
answerOptions = append(answerOptions, WithInterceptorRegistry(interceptorRegistry))
}
answerPeerConnection, err := NewAPI(answerOptions...).NewPeerConnection(Configuration{})
assert.NoError(t, err)

return offerPeerConnection, answerPeerConnection, wan
Expand Down

0 comments on commit 1e82fbe

Please sign in to comment.