From de74bd7850bd025cb7ab7826232cb2fa47cb374a Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 2 Oct 2023 12:29:58 -0700 Subject: [PATCH 1/2] segments rework --- pkg/pipeline/sink/segments.go | 261 ++++++++++++++++++---------------- pkg/pipeline/watch.go | 22 +-- 2 files changed, 150 insertions(+), 133 deletions(-) diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 251193ae..902c503e 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -45,22 +45,29 @@ type SegmentSink struct { conf *config.PipelineConfig callbacks *gstreamer.Callbacks - playlist m3u8.PlaylistWriter - livePlaylist m3u8.PlaylistWriter - initialized bool - startTime time.Time - startRunningTime uint64 + playlist m3u8.PlaylistWriter + livePlaylist m3u8.PlaylistWriter + segmentLock sync.Mutex + infoLock sync.Mutex + playlistLock sync.Mutex + + initialized bool + startTime time.Time + outputType types.OutputType + startRunningTime uint64 openSegmentsStartTime map[string]uint64 - openSegmentsLock sync.Mutex - endedSegments chan SegmentUpdate - done core.Fuse + closedSegments chan SegmentUpdate + playlistUpdates chan SegmentUpdate + throttle core.Throttle + done core.Fuse } type SegmentUpdate struct { - endTime uint64 - filename string + endTime uint64 + filename string + uploadComplete chan struct{} } func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks) (*SegmentSink, error) { @@ -79,6 +86,11 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg } } + outputType := o.OutputType + if outputType == types.OutputTypeHLS { + outputType = types.OutputTypeTS + } + return &SegmentSink{ Uploader: u, SegmentConfig: o, @@ -86,93 +98,123 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg callbacks: callbacks, playlist: playlist, livePlaylist: livePlaylist, + outputType: outputType, openSegmentsStartTime: make(map[string]uint64), - endedSegments: make(chan SegmentUpdate, maxPendingUploads), + closedSegments: make(chan SegmentUpdate, maxPendingUploads), + playlistUpdates: make(chan SegmentUpdate, maxPendingUploads), + throttle: core.NewThrottle(time.Second * 2), done: core.NewFuse(), }, nil } func (s *SegmentSink) Start() error { go func() { - var err error - defer func() { - if err != nil { + defer close(s.playlistUpdates) + for update := range s.closedSegments { + s.handleClosedSegment(update) + } + }() + + go func() { + defer s.done.Break() + for update := range s.playlistUpdates { + if err := s.handlePlaylistUpdates(update); err != nil { s.callbacks.OnError(err) + return } - s.done.Break() - }() + } + }() - for update := range s.endedSegments { - segmentLocalPath := path.Join(s.LocalDir, update.filename) - segmentStoragePath := path.Join(s.StorageDir, update.filename) + return nil +} - var size int64 - _, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType(), true) - if err != nil { - return - } +func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { + // keep playlist updates in order + s.playlistUpdates <- update - s.SegmentsInfo.SegmentCount++ - s.SegmentsInfo.Size += size + segmentLocalPath := path.Join(s.LocalDir, update.filename) + segmentStoragePath := path.Join(s.StorageDir, update.filename) - err = s.endSegment(update.filename, update.endTime) - if err != nil { - logger.Errorw("failed to end segment", err, "path", segmentLocalPath) - return - } + // upload in parallel + go func() { + defer close(update.uploadComplete) - count := s.SegmentsInfo.SegmentCount - pending := len(s.endedSegments) - if pending > 50 { - if count%10 == 0 { - logger.Warnw("segment queue growing", nil, "pending", pending, "total", count) - err = s.uploadPlaylist() - } - } else if pending > 25 { - if count%5 > 0 { - err = s.uploadPlaylist() - } - } else if pending > 10 { - if count%2 > 0 { - err = s.uploadPlaylist() - } - } else { - err = s.uploadPlaylist() - } - if err != nil { - return - } + _, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true) + if err != nil { + s.callbacks.OnError(err) + return + } + + // lock segment info updates + s.infoLock.Lock() + s.SegmentsInfo.SegmentCount++ + s.SegmentsInfo.Size += size + s.infoLock.Unlock() + }() +} + +func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error { + s.segmentLock.Lock() + defer s.segmentLock.Unlock() - if s.livePlaylist != nil { - if err = s.uploadLivePlaylist(); err != nil { - return - } + t, ok := s.openSegmentsStartTime[update.filename] + if !ok { + return fmt.Errorf("no open segment with the name %s", update.filename) + } + delete(s.openSegmentsStartTime, update.filename) + + duration := float64(time.Duration(update.endTime-t)) / float64(time.Second) + segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime)) + + // do not update playlist until upload is complete + <-update.uploadComplete + + s.playlistLock.Lock() + if err := s.playlist.Append(segmentStartTime, duration, update.filename); err != nil { + s.playlistLock.Unlock() + return err + } + if s.livePlaylist != nil { + if err := s.livePlaylist.Append(segmentStartTime, duration, update.filename); err != nil { + s.playlistLock.Unlock() + return err + } + } + + // throttle playlist uploads + s.throttle(func() { + s.playlistLock.Lock() + defer s.playlistLock.Unlock() + + if err := s.uploadPlaylist(); err != nil { + s.callbacks.OnError(err) + } + if s.livePlaylist != nil { + if err := s.uploadLivePlaylist(); err != nil { + s.callbacks.OnError(err) } } - }() + }) return nil } -func (s *SegmentSink) getSegmentOutputType() types.OutputType { - switch s.OutputType { - case types.OutputTypeHLS: - // HLS is always mpeg ts for now. We may implement fmp4 in the future - return types.OutputTypeTS - default: - return s.OutputType - } +func (s *SegmentSink) UpdateStartDate(t time.Time) { + s.segmentLock.Lock() + defer s.segmentLock.Unlock() + + s.startTime = t } -func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error { +func (s *SegmentSink) FragmentOpened(filepath string, startTime uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } filename := filepath[len(s.LocalDir):] - s.openSegmentsLock.Lock() - defer s.openSegmentsLock.Unlock() + s.segmentLock.Lock() + defer s.segmentLock.Unlock() if !s.initialized { s.initialized = true @@ -187,14 +229,7 @@ func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error { return nil } -func (s *SegmentSink) UpdateStartDate(t time.Time) { - s.openSegmentsLock.Lock() - defer s.openSegmentsLock.Unlock() - - s.startTime = t -} - -func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) error { +func (s *SegmentSink) FragmentClosed(filepath string, endTime uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } @@ -202,7 +237,11 @@ func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) erro filename := filepath[len(s.LocalDir):] select { - case s.endedSegments <- SegmentUpdate{filename: filename, endTime: endTime}: + case s.closedSegments <- SegmentUpdate{ + filename: filename, + endTime: endTime, + uploadComplete: make(chan struct{}), + }: return nil default: @@ -212,54 +251,16 @@ func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) erro } } -func (s *SegmentSink) endSegment(filename string, endTime uint64) error { - s.openSegmentsLock.Lock() - defer s.openSegmentsLock.Unlock() - - t, ok := s.openSegmentsStartTime[filename] - if !ok { - return fmt.Errorf("no open segment with the name %s", filename) - } - delete(s.openSegmentsStartTime, filename) - - duration := float64(time.Duration(endTime-t)) / float64(time.Second) - segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime)) - if err := s.playlist.Append(segmentStartTime, duration, filename); err != nil { - return err - } - - if s.livePlaylist != nil { - if err := s.livePlaylist.Append(segmentStartTime, duration, filename); err != nil { - return err - } - } - - return nil -} - -func (s *SegmentSink) uploadPlaylist() error { - var err error - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) - return err -} - -func (s *SegmentSink) uploadLivePlaylist() error { - var err error - liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) - liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) - return err -} - func (s *SegmentSink) Close() error { - // wait for all pending upload jobs to finish - close(s.endedSegments) + // wait for pending jobs to finish + close(s.closedSegments) <-s.done.Watch() + s.playlistLock.Lock() + defer s.playlistLock.Unlock() + if err := s.playlist.Close(); err != nil { - logger.Errorw("failed to send EOS to playlist writer", err) + return err } if err := s.uploadPlaylist(); err != nil { return err @@ -267,7 +268,7 @@ func (s *SegmentSink) Close() error { if s.livePlaylist != nil { if err := s.livePlaylist.Close(); err != nil { - logger.Errorw("failed to send EOS to live playlist writer", err) + return err } if err := s.uploadLivePlaylist(); err != nil { return err @@ -299,3 +300,19 @@ func (s *SegmentSink) Cleanup() { } } } + +func (s *SegmentSink) uploadPlaylist() error { + var err error + playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) + playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + return err +} + +func (s *SegmentSink) uploadLivePlaylist() error { + var err error + liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) + liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) + s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + return err +} diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 49bdfed9..67f3b08d 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -241,6 +241,15 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { s := msg.GetStructure() if s != nil { switch s.Name() { + case msgFirstSampleMetadata: + startDate, err := getFirstSampleMetadataFromGstStructure(s) + if err != nil { + return err + } + logger.Debugw("received FirstSampleMetadata message", "startDate", startDate) + + c.getSegmentSink().UpdateStartDate(startDate) + case msgFragmentOpened: filepath, t, err := getSegmentParamsFromGstStructure(s) if err != nil { @@ -248,7 +257,7 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { return err } - if err = c.getSegmentSink().StartSegment(filepath, t); err != nil { + if err = c.getSegmentSink().FragmentOpened(filepath, t); err != nil { logger.Errorw("failed to register new segment with playlist writer", err, "location", filepath, "runningTime", t) return err } @@ -265,19 +274,10 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { // We need to dispatch to a queue to: // 1. Avoid concurrent access to the SegmentsInfo structure // 2. Ensure that playlists are uploaded in the same order they are enqueued to avoid an older playlist overwriting a newer one - if err = c.getSegmentSink().EnqueueSegmentUpload(filepath, t); err != nil { + if err = c.getSegmentSink().FragmentClosed(filepath, t); err != nil { logger.Errorw("failed to end segment with playlist writer", err, "runningTime", t) return err } - - case msgFirstSampleMetadata: - startDate, err := getFirstSampleMetadataFromGstStructure(s) - if err != nil { - return err - } - logger.Debugw("received FirstSampleMetadata message", "startDate", startDate) - - c.getSegmentSink().UpdateStartDate(startDate) } } From 0bf994f0cafdd586d7592d0de41c12f4e1bb100c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 2 Oct 2023 12:31:27 -0700 Subject: [PATCH 2/2] tighten locking --- pkg/pipeline/sink/segments.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 902c503e..57413929 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -155,13 +155,13 @@ func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) { func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error { s.segmentLock.Lock() - defer s.segmentLock.Unlock() - t, ok := s.openSegmentsStartTime[update.filename] if !ok { + s.segmentLock.Unlock() return fmt.Errorf("no open segment with the name %s", update.filename) } delete(s.openSegmentsStartTime, update.filename) + s.segmentLock.Unlock() duration := float64(time.Duration(update.endTime-t)) / float64(time.Second) segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime)) @@ -180,6 +180,7 @@ func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error { return err } } + s.playlistLock.Unlock() // throttle playlist uploads s.throttle(func() {