diff --git a/pkg/media/media.go b/pkg/media/media.go index a7e5ed09965..ab07e31897b 100644 --- a/pkg/media/media.go +++ b/pkg/media/media.go @@ -18,7 +18,10 @@ type Sample struct { PacketTimestamp uint32 PrevDroppedPackets uint16 Metadata interface{} - RTPHeader *rtp.Header + + // RTP headers of RTP packets forming this Sample. (Optional) + // Useful for accessing RTP extensions associated to the Sample. + RTPHeaders []*rtp.Header } // Writer defines an interface to handle diff --git a/pkg/media/samplebuilder/samplebuilder.go b/pkg/media/samplebuilder/samplebuilder.go index a096f5ae949..8592803a4ac 100644 --- a/pkg/media/samplebuilder/samplebuilder.go +++ b/pkg/media/samplebuilder/samplebuilder.go @@ -48,6 +48,9 @@ type SampleBuilder struct { // allows inspecting head packets of each sample and then returns a custom metadata packetHeadHandler func(headPacket interface{}) interface{} + + // return array of RTP headers as Sample.RTPHeaders + returnRTPHeaders bool } // New constructs a new SampleBuilder. @@ -277,17 +280,18 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { // merge all the buffers into a sample data := []byte{} var metadata interface{} - var rtpHeader rtp.Header + var rtpHeaders []*rtp.Header for i := consume.head; i != consume.tail; i++ { p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload) if err != nil { return nil } - if i == consume.head { - if s.packetHeadHandler != nil { - metadata = s.packetHeadHandler(s.depacketizer) - } - rtpHeader = s.buffer[i].Header.Clone() + if i == consume.head && s.packetHeadHandler != nil { + metadata = s.packetHeadHandler(s.depacketizer) + } + if s.returnRTPHeaders { + h := s.buffer[i].Header.Clone() + rtpHeaders = append(rtpHeaders, &h) } data = append(data, p...) @@ -300,7 +304,7 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { PacketTimestamp: sampleTimestamp, PrevDroppedPackets: s.droppedPackets, Metadata: metadata, - RTPHeader: &rtpHeader, + RTPHeaders: rtpHeaders, } s.droppedPackets = 0 @@ -394,3 +398,11 @@ func WithMaxTimeDelay(maxLateDuration time.Duration) Option { o.maxLateTimestamp = uint32(int64(o.sampleRate) * totalMillis / 1000) } } + +// WithRTPHeaders enables to collect RTP headers forming a Sample. +// Useful for accessing RTP extensions associated to the Sample. +func WithRTPHeaders(enable bool) Option { + return func(o *SampleBuilder) { + o.returnRTPHeaders = enable + } +} diff --git a/pkg/media/samplebuilder/samplebuilder_test.go b/pkg/media/samplebuilder/samplebuilder_test.go index 9bd86f0e841..6fba22de696 100644 --- a/pkg/media/samplebuilder/samplebuilder_test.go +++ b/pkg/media/samplebuilder/samplebuilder_test.go @@ -17,6 +17,7 @@ type sampleBuilderTest struct { message string packets []*rtp.Packet withHeadChecker bool + withRTPHeader bool headBytes []byte samples []*media.Sample maxLate uint16 @@ -84,8 +85,8 @@ func TestSampleBuilder(t *testing.T) { {Header: rtp.Header{SequenceNumber: 5002, Timestamp: 7}, Payload: []byte{0x03}}, }, samples: []*media.Sample{ - {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5}}, - {Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 6, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 6}}, + {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5}, + {Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 6}, }, maxLate: 50, maxLateTimestamp: 0, @@ -102,7 +103,7 @@ func TestSampleBuilder(t *testing.T) { {Header: rtp.Header{SequenceNumber: 5012, Timestamp: 17}, Payload: []byte{0x07}}, }, samples: []*media.Sample{ - {Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5, Marker: true}}, + {Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5}, }, maxLate: 5, maxLateTimestamp: 0, @@ -134,8 +135,8 @@ func TestSampleBuilder(t *testing.T) { {Header: rtp.Header{SequenceNumber: 5012, Timestamp: 17}, Payload: []byte{0x07}}, }, samples: []*media.Sample{ - {Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5, Marker: true}}, - {Data: []byte{0x02}, Duration: time.Second * 2, PacketTimestamp: 7, PrevDroppedPackets: 1, RTPHeader: &rtp.Header{SequenceNumber: 5002, Timestamp: 7, Marker: true}}, + {Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5}, + {Data: []byte{0x02}, Duration: time.Second * 2, PacketTimestamp: 7, PrevDroppedPackets: 1}, }, maxLate: 5, maxLateTimestamp: 0, @@ -149,8 +150,8 @@ func TestSampleBuilder(t *testing.T) { {Header: rtp.Header{SequenceNumber: 5003, Timestamp: 7}, Payload: []byte{0x04}}, }, samples: []*media.Sample{ - {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5}}, - {Data: []byte{0x02, 0x03}, Duration: time.Second, PacketTimestamp: 6, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 6}}, + {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5}, + {Data: []byte{0x02, 0x03}, Duration: time.Second, PacketTimestamp: 6}, }, maxLate: 50, maxLateTimestamp: 0, @@ -203,11 +204,11 @@ func TestSampleBuilder(t *testing.T) { {Header: rtp.Header{SequenceNumber: 5005, Timestamp: 6}, Payload: []byte{0x06}}, }, samples: []*media.Sample{ - {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 1}}, - {Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 2, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 2}}, - {Data: []byte{0x03}, Duration: time.Second, PacketTimestamp: 3, RTPHeader: &rtp.Header{SequenceNumber: 5002, Timestamp: 3}}, - {Data: []byte{0x04}, Duration: time.Second, PacketTimestamp: 4, RTPHeader: &rtp.Header{SequenceNumber: 5003, Timestamp: 4}}, - {Data: []byte{0x05}, Duration: time.Second, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5004, Timestamp: 5}}, + {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1}, + {Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 2}, + {Data: []byte{0x03}, Duration: time.Second, PacketTimestamp: 3}, + {Data: []byte{0x04}, Duration: time.Second, PacketTimestamp: 4}, + {Data: []byte{0x05}, Duration: time.Second, PacketTimestamp: 5}, }, maxLate: 50, maxLateTimestamp: 0, @@ -225,7 +226,7 @@ func TestSampleBuilder(t *testing.T) { {Header: rtp.Header{SequenceNumber: 5017, Timestamp: 7001}, Payload: []byte{0x05}}, }, samples: []*media.Sample{ - {Data: []byte{0x04, 0x05}, Duration: time.Second * time.Duration(2), PacketTimestamp: 4000, PrevDroppedPackets: 13, RTPHeader: &rtp.Header{SequenceNumber: 5013, Timestamp: 4000}}, + {Data: []byte{0x04, 0x05}, Duration: time.Second * time.Duration(2), PacketTimestamp: 4000, PrevDroppedPackets: 13}, }, withHeadChecker: true, headBytes: []byte{0x04}, @@ -247,7 +248,7 @@ func TestSampleBuilder(t *testing.T) { withHeadChecker: true, headBytes: []byte{1}, samples: []*media.Sample{ - {Data: []byte{1, 2, 3}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 1}}, // first sample + {Data: []byte{1, 2, 3}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0}, // first sample }, maxLate: 50, maxLateTimestamp: 2000, @@ -265,11 +266,32 @@ func TestSampleBuilder(t *testing.T) { withHeadChecker: true, headBytes: []byte{1}, samples: []*media.Sample{ - {Data: []byte{1, 2}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 1}}, // 1st sample + {Data: []byte{1, 2}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0}, // 1st sample }, maxLate: 50, maxLateTimestamp: 2000, }, + { + message: "SampleBuilder should emit samples with RTP headers when WithRTPHeaders option is enabled", + packets: []*rtp.Packet{ + {Header: rtp.Header{SequenceNumber: 5000, Timestamp: 5}, Payload: []byte{0x01}}, + {Header: rtp.Header{SequenceNumber: 5001, Timestamp: 6}, Payload: []byte{0x02}}, + {Header: rtp.Header{SequenceNumber: 5002, Timestamp: 6}, Payload: []byte{0x03}}, + {Header: rtp.Header{SequenceNumber: 5003, Timestamp: 7}, Payload: []byte{0x04}}, + }, + samples: []*media.Sample{ + {Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5, RTPHeaders: []*rtp.Header{ + {SequenceNumber: 5000, Timestamp: 5}, + }}, + {Data: []byte{0x02, 0x03}, Duration: time.Second, PacketTimestamp: 6, RTPHeaders: []*rtp.Header{ + {SequenceNumber: 5001, Timestamp: 6}, + {SequenceNumber: 5002, Timestamp: 6}, + }}, + }, + maxLate: 50, + maxLateTimestamp: 0, + withRTPHeader: true, + }, } t.Run("Pop", func(t *testing.T) { @@ -282,6 +304,9 @@ func TestSampleBuilder(t *testing.T) { time.Millisecond*time.Duration(int64(t.maxLateTimestamp)), )) } + if t.withRTPHeader { + opts = append(opts, WithRTPHeaders(true)) + } d := &fakeDepacketizer{ headChecker: t.withHeadChecker, @@ -309,18 +334,18 @@ func TestSampleBuilderMaxLate(t *testing.T) { s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 0, Timestamp: 1}, Payload: []byte{0x01}}) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1, Timestamp: 2}, Payload: []byte{0x01}}) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 2, Timestamp: 3}, Payload: []byte{0x01}}) - assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1, RTPHeader: &rtp.Header{SequenceNumber: 0, Timestamp: 1}}, s.Pop(), "Failed to build samples before gap") + assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1}, s.Pop(), "Failed to build samples before gap") s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) - assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 2, RTPHeader: &rtp.Header{SequenceNumber: 1, Timestamp: 2}}, s.Pop(), "Failed to build samples after large gap") + assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 2}, s.Pop(), "Failed to build samples after large gap") assert.Equal((*media.Sample)(nil), s.Pop(), "Failed to build samples after large gap") s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 6000, Timestamp: 600}, Payload: []byte{0x03}}) - assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 500, PrevDroppedPackets: 4998, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 500}}, s.Pop(), "Failed to build samples after large gap") - assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 501, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 501}}, s.Pop(), "Failed to build samples after large gap") + assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 500, PrevDroppedPackets: 4998}, s.Pop(), "Failed to build samples after large gap") + assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 501}, s.Pop(), "Failed to build samples after large gap") } func TestSeqnumDistance(t *testing.T) { @@ -541,8 +566,8 @@ func TestSampleBuilder_Flush(t *testing.T) { } expected := []*media.Sample{ - {Data: []byte{0x01, 0x11}, Duration: 9 * time.Second, PacketTimestamp: 1, PrevDroppedPackets: 2, RTPHeader: &rtp.Header{SequenceNumber: 1001, Timestamp: 1, Marker: true}}, - {Data: []byte{0x01, 0x12}, Duration: 0, PacketTimestamp: 10, PrevDroppedPackets: 9, RTPHeader: &rtp.Header{SequenceNumber: 1011, Timestamp: 10, Marker: true}}, + {Data: []byte{0x01, 0x11}, Duration: 9 * time.Second, PacketTimestamp: 1, PrevDroppedPackets: 2}, + {Data: []byte{0x01, 0x12}, Duration: 0, PacketTimestamp: 10, PrevDroppedPackets: 9}, } assert.Equal(t, expected, samples)