From e2988cf1f2c586ca2310ffb82512620b918f7131 Mon Sep 17 00:00:00 2001 From: Atsushi Watanabe Date: Wed, 28 Feb 2024 17:23:14 +0900 Subject: [PATCH] SampleBuilder: Add Flush method Flush marks all valid samples in the buffer to be popped. Useful for graceful shutdown without losing buffered data as much as possible. --- pkg/media/samplebuilder/samplebuilder.go | 11 ++++-- pkg/media/samplebuilder/samplebuilder_test.go | 39 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/pkg/media/samplebuilder/samplebuilder.go b/pkg/media/samplebuilder/samplebuilder.go index 9581973f589..a096f5ae949 100644 --- a/pkg/media/samplebuilder/samplebuilder.go +++ b/pkg/media/samplebuilder/samplebuilder.go @@ -144,10 +144,10 @@ func (s *SampleBuilder) purgeConsumedLocation(consume sampleSequenceLocation, fo // purgeBuffers flushes all buffers that are already consumed or those buffers // that are too late to consume. -func (s *SampleBuilder) purgeBuffers() { +func (s *SampleBuilder) purgeBuffers(flush bool) { s.purgeConsumedBuffers() - for (s.tooOld(s.filled) || (s.filled.count() > s.maxLate)) && s.filled.hasData() { + for (s.tooOld(s.filled) || (s.filled.count() > s.maxLate) || flush) && s.filled.hasData() { if s.active.empty() { // refill the active based on the filled packets s.active = s.filled @@ -188,7 +188,12 @@ func (s *SampleBuilder) Push(p *rtp.Packet) { case slCompareInside: break } - s.purgeBuffers() + s.purgeBuffers(false) +} + +// Flush marks all samples in the buffer to be popped. +func (s *SampleBuilder) Flush() { + s.purgeBuffers(true) } const secondToNanoseconds = 1000000000 diff --git a/pkg/media/samplebuilder/samplebuilder_test.go b/pkg/media/samplebuilder/samplebuilder_test.go index ceffa10dcc6..9bd86f0e841 100644 --- a/pkg/media/samplebuilder/samplebuilder_test.go +++ b/pkg/media/samplebuilder/samplebuilder_test.go @@ -509,6 +509,45 @@ func TestSampleBuilderData(t *testing.T) { assert.Equal(t, j, 0x1FFFF) } +func TestSampleBuilder_Flush(t *testing.T) { + s := New(50, &fakeDepacketizer{ + headChecker: true, + headBytes: []byte{0x01}, + }, 1) + + s.Push(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 999, Timestamp: 0}, + Payload: []byte{0x00}, + }) // Invalid packet + // Gap preventing below packets to be processed + s.Push(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 1001, Timestamp: 1, Marker: true}, + Payload: []byte{0x01, 0x11}, + }) // Valid packet + s.Push(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 1011, Timestamp: 10, Marker: true}, + Payload: []byte{0x01, 0x12}, + }) // Valid packet + + if sample := s.Pop(); sample != nil { + t.Fatal("Unexpected sample is retuned. Test precondition may be broken") + } + + s.Flush() + + samples := []*media.Sample{} + for sample := s.Pop(); sample != nil; sample = s.Pop() { + samples = append(samples, sample) + } + + expected := []*media.Sample{ + {Data: []byte{0x01, 0x11}, Duration: 9 * time.Second, PacketTimestamp: 1, PrevDroppedPackets: 2, RTPHeader: &rtp.Header{SequenceNumber: 1001, Timestamp: 1, Marker: true}}, + {Data: []byte{0x01, 0x12}, Duration: 0, PacketTimestamp: 10, PrevDroppedPackets: 9, RTPHeader: &rtp.Header{SequenceNumber: 1011, Timestamp: 10, Marker: true}}, + } + + assert.Equal(t, expected, samples) +} + func BenchmarkSampleBuilderSequential(b *testing.B) { s := New(100, &fakeDepacketizer{}, 1) b.ResetTimer()