diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 6f6b3bf4c57..a30c38f6bfd 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -11,7 +11,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ["1.14", "1.15"] + go: ["1.15", "1.16"] fail-fast: false name: Go ${{ matrix.go }} steps: @@ -34,7 +34,8 @@ jobs: - name: Setup go-acc run: | - go get -u github.com/ory/go-acc + go get github.com/ory/go-acc + git checkout go.mod go.sum - name: Run test run: | @@ -53,7 +54,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: ["1.14", "1.15"] + go: ["1.15", "1.16"] fail-fast: false name: Go i386 ${{ matrix.go }} steps: @@ -109,7 +110,7 @@ jobs: - name: Download Go run: curl -sSfL https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz | tar -C ~ -xzf - env: - GO_VERSION: 1.15 + GO_VERSION: 1.16 - name: Set Go Root run: echo "GOROOT=${HOME}/go" >> $GITHUB_ENV diff --git a/examples/broadcast/README.md b/examples/broadcast/README.md index f27ac87a205..f9544b38acc 100644 --- a/examples/broadcast/README.md +++ b/examples/broadcast/README.md @@ -20,11 +20,19 @@ Run `broadcast` OR run `main.go` in `github.com/pion/webrtc/examples/broadcast` ### Start a publisher * Click `Publish a Broadcast` -* `curl localhost:8080/sdp -d "YOUR SDP"`. The `broadcast` application will respond with an offer, paste this into the second input field. Then press `Start Session` +* Copy the string in the first input labelled `Browser base64 Session Description` +* Run `curl localhost:8080/sdp -d "$BROWSER_OFFER"`. `$BROWSER_OFFER` is the value you copied in the last step. +* The `broadcast` terminal application will respond with an answer, paste this into the second input field in your browser. +* Press `Start Session` +* The connection state will be printed in the terminal and under `logs` in the browser. ### Join the broadcast * Click `Join a Broadcast` -* `curl localhost:8080/sdp -d "YOUR SDP"`. The `broadcast` application will respond with an offer, paste this into the second input field. Then press `Start Session` +* Copy the string in the first input labelled `Browser base64 Session Description` +* Run `curl localhost:8080/sdp -d "$BROWSER_OFFER"`. `$BROWSER_OFFER` is the value you copied in the last step. +* The `broadcast` terminal application will respond with an answer, paste this into the second input field in your browser. +* Press `Start Session` +* The connection state will be printed in the terminal and under `logs` in the browser. You can change the listening port using `-port 8011` diff --git a/examples/rtp-to-webrtc/README.md b/examples/rtp-to-webrtc/README.md index cdfecaa8b00..065d2db5fb3 100644 --- a/examples/rtp-to-webrtc/README.md +++ b/examples/rtp-to-webrtc/README.md @@ -35,13 +35,13 @@ gst-launch-1.0 videotestsrc ! video/x-raw,width=640,height=480,format=I420 ! vp8 #### ffmpeg ``` -ffmpeg -re -f lavfi -i testsrc=size=640x480:rate=30 -vcodec libvpx -cpu-used 5 -deadline 1 -g 10 -error-resilient 1 -auto-alt-ref 1 -f rtp rtp://127.0.0.1:5004 +ffmpeg -re -f lavfi -i testsrc=size=640x480:rate=30 -vcodec libvpx -cpu-used 5 -deadline 1 -g 10 -error-resilient 1 -auto-alt-ref 1 -f rtp rtp://127.0.0.1:5004?pkt_size=1200 ``` If you wish to send audio replace both occurrences of `vp8` in `main.go` then run ``` -ffmpeg -f lavfi -i "sine=frequency=1000" -c:a libopus -b:a 48000 -sample_fmt s16p -ssrc 1 -payload_type 111 -f rtp -max_delay 0 -application lowdelay rtp:/127.0.0.1:5004 +ffmpeg -f lavfi -i "sine=frequency=1000" -c:a libopus -b:a 48000 -sample_fmt s16p -ssrc 1 -payload_type 111 -f rtp -max_delay 0 -application lowdelay rtp:/127.0.0.1:5004?pkt_size=1200 ``` ### Input rtp-to-webrtc's SessionDescription into your browser diff --git a/fmtp.go b/fmtp.go new file mode 100644 index 00000000000..f82dcb820a6 --- /dev/null +++ b/fmtp.go @@ -0,0 +1,37 @@ +package webrtc + +import ( + "strings" +) + +type fmtp map[string]string + +// parseFmtp parses fmtp string. +func parseFmtp(line string) fmtp { + f := fmtp{} + for _, p := range strings.Split(line, ";") { + pp := strings.SplitN(strings.TrimSpace(p), "=", 2) + key := strings.ToLower(pp[0]) + var value string + if len(pp) > 1 { + value = pp[1] + } + f[key] = value + } + return f +} + +// fmtpConsist checks that two FTMP parameters are not inconsistent. +func fmtpConsist(a, b fmtp) bool { + for k, v := range a { + if vb, ok := b[k]; ok && !strings.EqualFold(vb, v) { + return false + } + } + for k, v := range b { + if va, ok := a[k]; ok && !strings.EqualFold(va, v) { + return false + } + } + return true +} diff --git a/fmtp_test.go b/fmtp_test.go new file mode 100644 index 00000000000..3f0a498e78b --- /dev/null +++ b/fmtp_test.go @@ -0,0 +1,107 @@ +package webrtc + +import ( + "reflect" + "testing" +) + +func TestParseFmtp(t *testing.T) { + testCases := map[string]struct { + input string + expected fmtp + }{ + "OneParam": { + input: "key-name=value", + expected: fmtp{ + "key-name": "value", + }, + }, + "OneParamWithWhiteSpeces": { + input: "\tkey-name=value ", + expected: fmtp{ + "key-name": "value", + }, + }, + "TwoParams": { + input: "key-name=value;key2=value2", + expected: fmtp{ + "key-name": "value", + "key2": "value2", + }, + }, + "TwoParamsWithWhiteSpeces": { + input: "key-name=value; \n\tkey2=value2 ", + expected: fmtp{ + "key-name": "value", + "key2": "value2", + }, + }, + } + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + f := parseFmtp(testCase.input) + if !reflect.DeepEqual(testCase.expected, f) { + t.Errorf("Expected Fmtp params: %v, got: %v", testCase.expected, f) + } + }) + } +} + +func TestFmtpConsist(t *testing.T) { + consistString := map[bool]string{true: "consist", false: "inconsist"} + + testCases := map[string]struct { + a, b string + consist bool + }{ + "Equal": { + a: "key1=value1;key2=value2;key3=value3", + b: "key1=value1;key2=value2;key3=value3", + consist: true, + }, + "EqualWithWhitespaceVariants": { + a: "key1=value1;key2=value2;key3=value3", + b: " key1=value1; \nkey2=value2;\t\nkey3=value3", + consist: true, + }, + "EqualWithCase": { + a: "key1=value1;key2=value2;key3=value3", + b: "key1=value1;key2=Value2;Key3=value3", + consist: true, + }, + "OneHasExtraParam": { + a: "key1=value1;key2=value2;key3=value3", + b: "key1=value1;key2=value2;key3=value3;key4=value4", + consist: true, + }, + "Inconsistent": { + a: "key1=value1;key2=value2;key3=value3", + b: "key1=value1;key2=different_value;key3=value3", + consist: false, + }, + "Inconsistent_OneHasExtraParam": { + a: "key1=value1;key2=value2;key3=value3;key4=value4", + b: "key1=value1;key2=different_value;key3=value3", + consist: false, + }, + } + for name, testCase := range testCases { + testCase := testCase + check := func(t *testing.T, a, b string) { + c := fmtpConsist(parseFmtp(a), parseFmtp(b)) + if c != testCase.consist { + t.Errorf( + "'%s' and '%s' are expected to be %s, but treated as %s", + a, b, consistString[testCase.consist], consistString[c], + ) + } + } + t.Run(name, func(t *testing.T) { + check(t, testCase.a, testCase.b) + }) + t.Run(name+"_Reversed", func(t *testing.T) { + check(t, testCase.b, testCase.a) + }) + } +} diff --git a/go.mod b/go.mod index c7f17d7be89..858299df9b0 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/pion/datachannel v1.4.21 github.com/pion/dtls/v2 v2.0.8 github.com/pion/ice/v2 v2.0.16 - github.com/pion/interceptor v0.0.11 + github.com/pion/interceptor v0.0.12 github.com/pion/logging v0.2.2 github.com/pion/randutil v0.1.0 github.com/pion/rtcp v1.2.6 diff --git a/go.sum b/go.sum index 3d855d69958..d4cafd95fc6 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/pion/dtls/v2 v2.0.8 h1:reGe8rNIMfO/UAeFLqO61tl64t154Qfkr4U3Gzu1tsg= github.com/pion/dtls/v2 v2.0.8/go.mod h1:QuDII+8FVvk9Dp5t5vYIMTo7hh7uBkra+8QIm7QGm10= github.com/pion/ice/v2 v2.0.16 h1:K6bzD8ef9vMKbGMTHaUweHXEyuNGnvr2zdqKoLKZPn0= github.com/pion/ice/v2 v2.0.16/go.mod h1:SJNJzC27gDZoOW0UoxIoC8Hf2PDxG28hQyNdSexDu38= -github.com/pion/interceptor v0.0.11 h1:YIEMghiTZYb88q6awk3N/8WUU5P+aVyKMuU3YZfTSmI= -github.com/pion/interceptor v0.0.11/go.mod h1:qzeuWuD/ZXvPqOnxNcnhWfkCZ2e1kwwslicyyPnhoK4= +github.com/pion/interceptor v0.0.12 h1:eC1iVneBIAQJEfaNAfDqAncJWhMDAnaXPRCJsltdokE= +github.com/pion/interceptor v0.0.12/go.mod h1:qzeuWuD/ZXvPqOnxNcnhWfkCZ2e1kwwslicyyPnhoK4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.5 h1:Q2oj/JB3NqfzY9xGZ1fPzZzK7sDSD8rZPOvcIQ10BCw= diff --git a/interceptor_test.go b/interceptor_test.go index c0ed41b9cc1..f7b9ce69ab4 100644 --- a/interceptor_test.go +++ b/interceptor_test.go @@ -5,44 +5,18 @@ package webrtc // import ( "context" + "sync/atomic" "testing" "time" "github.com/pion/interceptor" + mock_interceptor "github.com/pion/interceptor/pkg/mock" "github.com/pion/rtp" "github.com/pion/transport/test" "github.com/pion/webrtc/v3/pkg/media" "github.com/stretchr/testify/assert" ) -type testInterceptor struct { - interceptor.NoOp - - t *testing.T -} - -func (t *testInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { - return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - // set extension on outgoing packet - header.Extension = true - header.ExtensionProfile = 0xBEDE - assert.NoError(t.t, header.SetExtension(2, []byte("foo"))) - - return writer.Write(header, payload, attributes) - }) -} - -func (t *testInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { - if a == nil { - a = interceptor.Attributes{} - } - - a.Set("attribute", "value") - return reader.Read(b, a) - }) -} - // E2E test of the features of Interceptors // * Assert an extension can be set on an outbound packet // * Assert an extension can be read on an outbound packet @@ -59,7 +33,28 @@ func TestPeerConnection_Interceptor(t *testing.T) { assert.NoError(t, m.RegisterDefaultCodecs()) ir := &interceptor.Registry{} - ir.Add(&testInterceptor{t: t}) + ir.Add(&mock_interceptor.Interceptor{ + BindLocalStreamFn: func(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + // set extension on outgoing packet + header.Extension = true + header.ExtensionProfile = 0xBEDE + assert.NoError(t, header.SetExtension(2, []byte("foo"))) + + return writer.Write(header, payload, attributes) + }) + }, + BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + if a == nil { + a = interceptor.Attributes{} + } + + a.Set("attribute", "value") + return reader.Read(b, a) + }) + }, + }) pc, err := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{}) assert.NoError(t, err) @@ -104,3 +99,113 @@ func TestPeerConnection_Interceptor(t *testing.T) { closePairNow(t, offerer, answerer) } + +func Test_Interceptor_BindUnbind(t *testing.T) { + lim := test.TimeOut(time.Second * 10) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + m := &MediaEngine{} + assert.NoError(t, m.RegisterDefaultCodecs()) + + var ( + cntBindRTCPReader uint32 + cntBindRTCPWriter uint32 + cntBindLocalStream uint32 + cntUnbindLocalStream uint32 + cntBindRemoteStream uint32 + cntUnbindRemoteStream uint32 + cntClose uint32 + ) + mockInterceptor := &mock_interceptor.Interceptor{ + BindRTCPReaderFn: func(reader interceptor.RTCPReader) interceptor.RTCPReader { + atomic.AddUint32(&cntBindRTCPReader, 1) + return reader + }, + BindRTCPWriterFn: func(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + atomic.AddUint32(&cntBindRTCPWriter, 1) + return writer + }, + BindLocalStreamFn: func(i *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + atomic.AddUint32(&cntBindLocalStream, 1) + return writer + }, + UnbindLocalStreamFn: func(i *interceptor.StreamInfo) { + atomic.AddUint32(&cntUnbindLocalStream, 1) + }, + BindRemoteStreamFn: func(i *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + atomic.AddUint32(&cntBindRemoteStream, 1) + return reader + }, + UnbindRemoteStreamFn: func(i *interceptor.StreamInfo) { + atomic.AddUint32(&cntUnbindRemoteStream, 1) + }, + CloseFn: func() error { + atomic.AddUint32(&cntClose, 1) + return nil + }, + } + ir := &interceptor.Registry{} + ir.Add(mockInterceptor) + + sender, receiver, err := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(ir)).newPair(Configuration{}) + assert.NoError(t, err) + + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: "video/vp8"}, "video", "pion") + assert.NoError(t, err) + + _, err = sender.AddTrack(track) + assert.NoError(t, err) + + receiverReady, receiverReadyFn := context.WithCancel(context.Background()) + receiver.OnTrack(func(track *TrackRemote, _ *RTPReceiver) { + _, _, readErr := track.ReadRTP() + assert.NoError(t, readErr) + receiverReadyFn() + }) + + assert.NoError(t, signalPair(sender, receiver)) + + ticker := time.NewTicker(time.Millisecond * 20) + defer ticker.Stop() + func() { + for { + select { + case <-receiverReady.Done(): + return + case <-ticker.C: + // Send packet to make receiver track actual creates RTPReceiver. + assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0xAA}, Duration: time.Second})) + } + } + }() + + closePairNow(t, sender, receiver) + + // Bind/UnbindLocal/RemoteStream should be called from one side. + if cnt := atomic.LoadUint32(&cntBindLocalStream); cnt != 1 { + t.Errorf("BindLocalStreamFn is expected to be called once, but called %d times", cnt) + } + if cnt := atomic.LoadUint32(&cntUnbindLocalStream); cnt != 1 { + t.Errorf("UnbindLocalStreamFn is expected to be called once, but called %d times", cnt) + } + if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 1 { + t.Errorf("BindRemoteStreamFn is expected to be called once, but called %d times", cnt) + } + if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 1 { + t.Errorf("UnbindRemoteStreamFn is expected to be called once, but called %d times", cnt) + } + + // BindRTCPWriter/Reader and Close should be called from both side. + if cnt := atomic.LoadUint32(&cntBindRTCPWriter); cnt != 2 { + t.Errorf("BindRTCPWriterFn is expected to be called twice, but called %d times", cnt) + } + if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 2 { + t.Errorf("BindRTCPReaderFn is expected to be called twice, but called %d times", cnt) + } + if cnt := atomic.LoadUint32(&cntClose); cnt != 2 { + t.Errorf("CloseFn is expected to be called twice, but called %d times", cnt) + } +} diff --git a/internal/mux/mux.go b/internal/mux/mux.go index 83aaa26bc39..00bf6ac24ea 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -2,9 +2,12 @@ package mux import ( + "errors" + "io" "net" "sync" + "github.com/pion/ice/v2" "github.com/pion/logging" "github.com/pion/transport/packetio" ) @@ -104,12 +107,19 @@ func (m *Mux) readLoop() { buf := make([]byte, m.bufferSize) for { n, err := m.nextConn.Read(buf) - if err != nil { + switch { + case errors.Is(err, io.EOF), errors.Is(err, ice.ErrClosed): + return + case errors.Is(err, io.ErrShortBuffer), errors.Is(err, packetio.ErrTimeout): + m.log.Errorf("mux: failed to read from packetio.Buffer %s\n", err.Error()) + continue + case err != nil: + m.log.Errorf("mux: ending readLoop packetio.Buffer error %s\n", err.Error()) return } - err = m.dispatch(buf[:n]) - if err != nil { + if err = m.dispatch(buf[:n]); err != nil { + m.log.Errorf("mux: ending readLoop dispatch error %s\n", err.Error()) return } } diff --git a/internal/mux/mux_test.go b/internal/mux/mux_test.go index 87b19400351..e3d9ed590cb 100644 --- a/internal/mux/mux_test.go +++ b/internal/mux/mux_test.go @@ -1,14 +1,19 @@ package mux import ( + "io" "net" "testing" "time" "github.com/pion/logging" + "github.com/pion/transport/packetio" "github.com/pion/transport/test" + "github.com/stretchr/testify/assert" ) +const testPipeBufferSize = 8192 + func TestStressDuplex(t *testing.T) { // Limit runtime in case of deadlocks lim := test.TimeOut(time.Second * 20) @@ -34,40 +39,26 @@ func stressDuplex(t *testing.T) { MsgCount: 100, } - err := test.StressDuplex(ca, cb, opt) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, test.StressDuplex(ca, cb, opt)) } func pipeMemory() (*Endpoint, net.Conn, func(*testing.T)) { // In memory pipe ca, cb := net.Pipe() - matchAll := func([]byte) bool { - return true - } - - config := Config{ + m := NewMux(Config{ Conn: ca, - BufferSize: 8192, + BufferSize: testPipeBufferSize, LoggerFactory: logging.NewDefaultLoggerFactory(), - } + }) - m := NewMux(config) - e := m.NewEndpoint(matchAll) + e := m.NewEndpoint(MatchAll) m.RemoveEndpoint(e) - e = m.NewEndpoint(matchAll) + e = m.NewEndpoint(MatchAll) stop := func(t *testing.T) { - err := cb.Close() - if err != nil { - t.Fatal(err) - } - err = m.Close() - if err != nil { - t.Fatal(err) - } + assert.NoError(t, cb.Close()) + assert.NoError(t, m.Close()) } return e, cb, stop @@ -76,28 +67,78 @@ func pipeMemory() (*Endpoint, net.Conn, func(*testing.T)) { func TestNoEndpoints(t *testing.T) { // In memory pipe ca, cb := net.Pipe() - err := cb.Close() - if err != nil { - panic("Failed to close network pipe") - } + assert.NoError(t, cb.Close()) - config := Config{ + m := NewMux(Config{ Conn: ca, - BufferSize: 8192, + BufferSize: testPipeBufferSize, LoggerFactory: logging.NewDefaultLoggerFactory(), - } + }) + assert.NoError(t, m.dispatch(make([]byte, 1))) + assert.NoError(t, m.Close()) + assert.NoError(t, ca.Close()) +} - m := NewMux(config) - err = m.dispatch(make([]byte, 1)) - if err != nil { - t.Fatal(err) - } - err = m.Close() - if err != nil { - t.Fatalf("Failed to close empty mux") - } - err = ca.Close() - if err != nil { - panic("Failed to close network pipe") - } +type muxErrorConnReadResult struct { + err error + data []byte +} + +// muxErrorConn +type muxErrorConn struct { + net.Conn + readResults []muxErrorConnReadResult +} + +func (m *muxErrorConn) Read(b []byte) (n int, err error) { + err = m.readResults[0].err + copy(b, m.readResults[0].data) + n = len(m.readResults[0].data) + + m.readResults = m.readResults[1:] + return +} + +/* Don't end the mux readLoop for packetio.ErrTimeout or io.ErrShortBuffer, assert the following + * io.ErrShortBuffer and packetio.ErrTimeout don't end the read loop + * io.EOF ends the loop + + pion/webrtc#1720 +*/ +func TestNonFatalRead(t *testing.T) { + expectedData := []byte("expectedData") + + // In memory pipe + ca, cb := net.Pipe() + assert.NoError(t, cb.Close()) + + conn := &muxErrorConn{ca, []muxErrorConnReadResult{ + // Non-fatal timeout error + {packetio.ErrTimeout, nil}, + {nil, expectedData}, + {io.ErrShortBuffer, nil}, + {nil, expectedData}, + {io.EOF, nil}, + }} + + m := NewMux(Config{ + Conn: conn, + BufferSize: testPipeBufferSize, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + + e := m.NewEndpoint(MatchAll) + + buff := make([]byte, testPipeBufferSize) + n, err := e.Read(buff) + assert.NoError(t, err) + assert.Equal(t, buff[:n], expectedData) + + n, err = e.Read(buff) + assert.NoError(t, err) + assert.Equal(t, buff[:n], expectedData) + + <-m.closedCh + assert.NoError(t, m.Close()) + assert.NoError(t, ca.Close()) } diff --git a/internal/mux/muxfunc.go b/internal/mux/muxfunc.go index 2ab585bf9af..32b46f0ed31 100644 --- a/internal/mux/muxfunc.go +++ b/internal/mux/muxfunc.go @@ -13,11 +13,6 @@ func MatchAll(b []byte) bool { return true } -// MatchNone always returns false -func MatchNone(b []byte) bool { - return false -} - // MatchRange is a MatchFunc that accepts packets with the first byte in [lower..upper] func MatchRange(lower, upper byte) MatchFunc { return func(buf []byte) bool { @@ -43,30 +38,12 @@ func MatchRange(lower, upper byte) MatchFunc { // | [128..191] -+--> forward to RTP/RTCP // +----------------+ -// MatchSTUN is a MatchFunc that accepts packets with the first byte in [0..3] -// as defied in RFC7983 -func MatchSTUN(b []byte) bool { - return MatchRange(0, 3)(b) -} - -// MatchZRTP is a MatchFunc that accepts packets with the first byte in [16..19] -// as defied in RFC7983 -func MatchZRTP(b []byte) bool { - return MatchRange(16, 19)(b) -} - // MatchDTLS is a MatchFunc that accepts packets with the first byte in [20..63] // as defied in RFC7983 func MatchDTLS(b []byte) bool { return MatchRange(20, 63)(b) } -// MatchTURN is a MatchFunc that accepts packets with the first byte in [64..79] -// as defied in RFC7983 -func MatchTURN(b []byte) bool { - return MatchRange(64, 79)(b) -} - // MatchSRTPOrSRTCP is a MatchFunc that accepts packets with the first byte in [128..191] // as defied in RFC7983 func MatchSRTPOrSRTCP(b []byte) bool { diff --git a/mediaengine.go b/mediaengine.go index bc3ed999eb3..540f20a8af3 100644 --- a/mediaengine.go +++ b/mediaengine.go @@ -340,8 +340,9 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo codecs = m.audioCodecs } - if strings.HasPrefix(remoteCodec.RTPCodecCapability.SDPFmtpLine, "apt=") { - payloadType, err := strconv.Atoi(strings.TrimPrefix(remoteCodec.RTPCodecCapability.SDPFmtpLine, "apt=")) + remoteFmtp := parseFmtp(remoteCodec.RTPCodecCapability.SDPFmtpLine) + if apt, hasApt := remoteFmtp["apt"]; hasApt { + payloadType, err := strconv.Atoi(apt) if err != nil { return codecMatchNone, err } diff --git a/peerconnection.go b/peerconnection.go index e837959432e..c3748eef518 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1044,7 +1044,8 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error { } } - if t == nil { + switch { + case t == nil: receiver, err := pc.api.NewRTPReceiver(kind, pc.dtlsTransport) if err != nil { return err @@ -1058,10 +1059,14 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error { t = pc.newRTPTransceiver(receiver, nil, localDirection, kind) pc.onNegotiationNeeded() - } else if direction == RTPTransceiverDirectionRecvonly { + case direction == RTPTransceiverDirectionRecvonly: if t.Direction() == RTPTransceiverDirectionSendrecv { t.setDirection(RTPTransceiverDirectionSendonly) } + case direction == RTPTransceiverDirectionSendrecv: + if t.Direction() == RTPTransceiverDirectionSendonly { + t.setDirection(RTPTransceiverDirectionSendrecv) + } } if t.Mid() == "" { diff --git a/peerconnection_renegotiation_test.go b/peerconnection_renegotiation_test.go index 70c00f80f4e..90f6b10d924 100644 --- a/peerconnection_renegotiation_test.go +++ b/peerconnection_renegotiation_test.go @@ -60,6 +60,85 @@ func sdpMidHasSsrc(offer SessionDescription, mid string, ssrc SSRC) bool { return false } +func TestPeerConnection_Renegotiation_AddRecvonlyTransceiver(t *testing.T) { + type testCase struct { + name string + answererSends bool + } + + testCases := []testCase{ + // Assert the following behaviors: + // - Offerer can add a recvonly transceiver + // - During negotiation, answerer peer adds an inactive (or sendonly) transceiver + // - Offerer can add a track + // - Answerer can receive the RTP packets. + {"add recvonly, then receive from answerer", false}, + // Assert the following behaviors: + // - Offerer can add a recvonly transceiver + // - During negotiation, answerer peer adds an inactive (or sendonly) transceiver + // - Answerer can add a track to the existing sendonly transceiver + // - Offerer can receive the RTP packets. + {"add recvonly, then send to answerer", true}, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + pcOffer, pcAnswer, err := newPair() + if err != nil { + t.Fatal(err) + } + + _, err = pcOffer.AddTransceiverFromKind( + RTPCodecTypeVideo, + RtpTransceiverInit{ + Direction: RTPTransceiverDirectionRecvonly, + }, + ) + assert.NoError(t, err) + + assert.NoError(t, signalPair(pcOffer, pcAnswer)) + + localTrack, err := NewTrackLocalStaticSample( + RTPCodecCapability{MimeType: "video/VP8"}, "track-one", "stream-one", + ) + require.NoError(t, err) + + if tc.answererSends { + _, err = pcAnswer.AddTrack(localTrack) + } else { + _, err = pcOffer.AddTrack(localTrack) + } + + require.NoError(t, err) + + onTrackFired, onTrackFiredFunc := context.WithCancel(context.Background()) + + if tc.answererSends { + pcOffer.OnTrack(func(track *TrackRemote, r *RTPReceiver) { + onTrackFiredFunc() + }) + } else { + pcAnswer.OnTrack(func(track *TrackRemote, r *RTPReceiver) { + onTrackFiredFunc() + }) + } + + assert.NoError(t, signalPair(pcOffer, pcAnswer)) + + sendVideoUntilDone(onTrackFired.Done(), t, []*TrackLocalStaticSample{localTrack}) + + closePairNow(t, pcOffer, pcAnswer) + }) + } +} + /* * Assert the following behaviors * - We are able to call AddTrack after signaling diff --git a/renovate.json b/renovate.json index 4400fd9b235..f84608c5136 100644 --- a/renovate.json +++ b/renovate.json @@ -7,6 +7,10 @@ ], "commitBody": "Generated by renovateBot", "packageRules": [ + { + "matchUpdateTypes": ["minor", "patch", "pin", "digest"], + "automerge": true + }, { "packagePatterns": ["^golang.org/x/"], "schedule": ["on the first day of the month"] diff --git a/rtpcodec.go b/rtpcodec.go index da61918596f..d566d715cfe 100644 --- a/rtpcodec.go +++ b/rtpcodec.go @@ -97,23 +97,23 @@ const ( // Used for lookup up a codec in an existing list to find a match // Returns codecMatchExact, codecMatchPartial, or codecMatchNone func codecParametersFuzzySearch(needle RTPCodecParameters, haystack []RTPCodecParameters) (RTPCodecParameters, codecMatchType) { + needleFmtp := parseFmtp(needle.RTPCodecCapability.SDPFmtpLine) + // First attempt to match on MimeType + SDPFmtpLine - // Exact matches means fmtp line cannot be empty for _, c := range haystack { // remote mime type will prefix with media kind,so check with suffix if (strings.EqualFold(c.RTPCodecCapability.MimeType, needle.RTPCodecCapability.MimeType) || strings.HasSuffix(needle.RTPCodecCapability.MimeType, c.RTPCodecCapability.MimeType)) && - c.RTPCodecCapability.SDPFmtpLine == needle.RTPCodecCapability.SDPFmtpLine { + fmtpConsist(needleFmtp, parseFmtp(c.RTPCodecCapability.SDPFmtpLine)) { return c, codecMatchExact } } - // Fallback to just MimeType if either haystack or needle does not have fmtpline set + // Fallback to just MimeType for _, c := range haystack { // remote mime type will prefix with media kind,so check with suffix - if (strings.EqualFold(c.RTPCodecCapability.MimeType, needle.RTPCodecCapability.MimeType) || - strings.HasSuffix(needle.RTPCodecCapability.MimeType, c.RTPCodecCapability.MimeType)) && - (c.RTPCodecCapability.SDPFmtpLine == "" || needle.RTPCodecCapability.SDPFmtpLine == "") { + if strings.EqualFold(c.RTPCodecCapability.MimeType, needle.RTPCodecCapability.MimeType) || + strings.HasSuffix(needle.RTPCodecCapability.MimeType, c.RTPCodecCapability.MimeType) { return c, codecMatchPartial } } diff --git a/rtpreceiver.go b/rtpreceiver.go index 1f06cd59611..977d7fecc52 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -26,6 +26,8 @@ const ( type trackStreams struct { track *TrackRemote + streamInfo interceptor.StreamInfo + rtpReadStream *srtp.ReadStreamSRTP rtpInterceptor interceptor.RTPReader @@ -161,18 +163,17 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error { codec = globalParams.Codecs[0].RTPCodecCapability } - streamInfo := createStreamInfo("", parameters.Encodings[0].SSRC, 0, codec, globalParams.HeaderExtensions) - + t.streamInfo = createStreamInfo("", parameters.Encodings[0].SSRC, 0, codec, globalParams.HeaderExtensions) // fec & rtx if parameters.Encodings[0].FecSSRC != 0 { - streamInfo.Attributes.Set(IctFECSSRCAttr, uint32(parameters.Encodings[0].FecSSRC)) + t.streamInfo.Attributes.Set(IctFECSSRCAttr, uint32(parameters.Encodings[0].FecSSRC)) } if parameters.Encodings[0].RtxSSRC != 0 { - streamInfo.Attributes.Set(IctRTXSSRCAttr, uint32(parameters.Encodings[0].RtxSSRC)) + t.streamInfo.Attributes.Set(IctRTXSSRCAttr, uint32(parameters.Encodings[0].RtxSSRC)) } var err error - if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, streamInfo); err != nil { + if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, t.streamInfo); err != nil { return err } @@ -321,6 +322,7 @@ func (r *RTPReceiver) Stop() error { } err = util.FlattenErrs(errs) + r.api.interceptor.UnbindRemoteStream(&r.tracks[i].streamInfo) } default: } @@ -368,11 +370,11 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, ssrc SSRC) r.tracks[i].track.codec = params.Codecs[0] r.tracks[i].track.params = params r.tracks[i].track.ssrc = ssrc - streamInfo := createStreamInfo("", ssrc, params.Codecs[0].PayloadType, params.Codecs[0].RTPCodecCapability, params.HeaderExtensions) + r.tracks[i].streamInfo = createStreamInfo("", ssrc, params.Codecs[0].PayloadType, params.Codecs[0].RTPCodecCapability, params.HeaderExtensions) r.tracks[i].track.mu.Unlock() var err error - if r.tracks[i].rtpReadStream, r.tracks[i].rtpInterceptor, r.tracks[i].rtcpReadStream, r.tracks[i].rtcpInterceptor, err = r.streamsForSSRC(ssrc, streamInfo); err != nil { + if r.tracks[i].rtpReadStream, r.tracks[i].rtpInterceptor, r.tracks[i].rtcpReadStream, r.tracks[i].rtcpInterceptor, err = r.streamsForSSRC(ssrc, r.tracks[i].streamInfo); err != nil { return nil, err } diff --git a/rtpsender.go b/rtpsender.go index 3b363d85b9d..9fb87037ef8 100644 --- a/rtpsender.go +++ b/rtpsender.go @@ -20,6 +20,7 @@ type RTPSender struct { srtpStream *srtpWriterFuture rtcpInterceptor interceptor.RTCPReader + streamInfo interceptor.StreamInfo context TrackLocalContext @@ -199,15 +200,14 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error { } r.context.params.Codecs = []RTPCodecParameters{codec} - streamInfo := createStreamInfo(r.id, parameters.Encodings[0].SSRC, codec.PayloadType, codec.RTPCodecCapability, parameters.HeaderExtensions) + r.streamInfo = createStreamInfo(r.id, parameters.Encodings[0].SSRC, codec.PayloadType, codec.RTPCodecCapability, parameters.HeaderExtensions) if r.fec != 0 { - streamInfo.Attributes.Set(IctFECSSRCAttr, uint32(r.fec)) + r.streamInfo.Attributes.Set(IctFECSSRCAttr, uint32(r.fec)) } if r.rtx != 0 { - streamInfo.Attributes.Set(IctRTXSSRCAttr, uint32(r.rtx)) + r.streamInfo.Attributes.Set(IctRTXSSRCAttr, uint32(r.rtx)) } - - rtpInterceptor := r.api.interceptor.BindLocalStream(&streamInfo, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + rtpInterceptor := r.api.interceptor.BindLocalStream(&r.streamInfo, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { return r.srtpStream.WriteRTP(header, payload) })) writeStream.interceptor.Store(rtpInterceptor) @@ -236,6 +236,8 @@ func (r *RTPSender) Stop() error { return err } + r.api.interceptor.UnbindLocalStream(&r.streamInfo) + return r.srtpStream.Close() } diff --git a/rtptransceiver.go b/rtptransceiver.go index ee9de2fd265..087cc7d0e34 100644 --- a/rtptransceiver.go +++ b/rtptransceiver.go @@ -115,6 +115,12 @@ func (t *RTPTransceiver) setSendingTrack(track TrackLocal) error { t.setDirection(RTPTransceiverDirectionSendonly) case track == nil && t.Direction() == RTPTransceiverDirectionSendrecv: t.setDirection(RTPTransceiverDirectionRecvonly) + case track != nil && t.Direction() == RTPTransceiverDirectionSendonly: + // Handle the case where a sendonly transceiver was added by a negotiation + // initiated by remote peer. For example a remote peer added a transceiver + // with direction recvonly. + case track != nil && t.Direction() == RTPTransceiverDirectionSendrecv: + // Similar to above, but for sendrecv transceiver. case track == nil && t.Direction() == RTPTransceiverDirectionSendonly: t.setDirection(RTPTransceiverDirectionInactive) default: