From 3143e260990c77ace409fe6193b9d25ba5362b17 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 4 Mar 2024 20:09:52 -0800 Subject: [PATCH] remove playlist throttle --- pkg/pipeline/sink/segments.go | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 6cda02a5..6a4939b1 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -61,7 +61,6 @@ type SegmentSink struct { closedSegments chan SegmentUpdate playlistUpdates chan SegmentUpdate - throttle core.Throttle done core.Fuse } @@ -103,7 +102,6 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg openSegmentsStartTime: make(map[string]uint64), closedSegments: make(chan SegmentUpdate, maxPendingUploads), playlistUpdates: make(chan SegmentUpdate, maxPendingUploads), - throttle: core.NewThrottle(time.Second * 2), done: core.NewFuse(), } @@ -183,35 +181,22 @@ func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error { <-update.uploadComplete s.playlistLock.Lock() + defer s.playlistLock.Unlock() + if err := s.playlist.Append(segmentStartTime, duration, update.filename); err != nil { - s.playlistLock.Unlock() return err } + if err := s.uploadPlaylist(); err != nil { + s.callbacks.OnError(err) + } if s.livePlaylist != nil { if err := s.livePlaylist.Append(segmentStartTime, duration, update.filename); err != nil { - s.playlistLock.Unlock() return err } - } - s.playlistLock.Unlock() - - // throttle playlist uploads - s.throttle(func() { - s.playlistLock.Lock() - defer s.playlistLock.Unlock() - if s.done.IsBroken() { - return - } - - if err := s.uploadPlaylist(); err != nil { + if err := s.uploadLivePlaylist(); err != nil { s.callbacks.OnError(err) } - if s.livePlaylist != nil { - if err := s.uploadLivePlaylist(); err != nil { - s.callbacks.OnError(err) - } - } - }) + } return nil }