diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index e27fc723..e6dcde9b 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -103,16 +103,16 @@ func (s *SegmentSink) Start() error { }() for update := range s.endedSegments { - var size int64 - s.SegmentsInfo.SegmentCount++ - segmentLocalPath := path.Join(s.LocalDir, update.filename) segmentStoragePath := path.Join(s.StorageDir, update.filename) + + var size int64 _, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType(), true) if err != nil { return } + s.SegmentsInfo.SegmentCount++ s.SegmentsInfo.Size += size err = s.endSegment(update.filename, update.endTime) @@ -121,18 +121,28 @@ func (s *SegmentSink) Start() error { return } - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + count := s.SegmentsInfo.SegmentCount + pending := len(s.endedSegments) + if pending > 50 { + if count%10 == 0 { + logger.Warnw("segment queue growing", nil, "pending", pending, "total", count) + err = s.uploadPlaylist() + } + } else if pending > 25 { + if count%5 > 0 { + err = s.uploadPlaylist() + } + } else if pending > 10 { + if count%2 > 0 { + err = s.uploadPlaylist() + } + } if err != nil { return } - if s.LivePlaylistFilename != "" { - playlistLocalPath = path.Join(s.LocalDir, s.LivePlaylistFilename) - playlistStoragePath = path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) - if err != nil { + if s.livePlaylist != nil { + if err = s.uploadLivePlaylist(); err != nil { return } } @@ -225,6 +235,22 @@ func (s *SegmentSink) endSegment(filename string, endTime uint64) error { return nil } +func (s *SegmentSink) uploadPlaylist() error { + var err error + playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) + playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) + s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + return err +} + +func (s *SegmentSink) uploadLivePlaylist() error { + var err error + liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) + liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) + s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + return err +} + func (s *SegmentSink) Close() error { // wait for all pending upload jobs to finish close(s.endedSegments) @@ -233,24 +259,22 @@ func (s *SegmentSink) Close() error { if err := s.playlist.Close(); err != nil { logger.Errorw("failed to send EOS to playlist writer", err) } - - // upload the finalized playlist - playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) - s.SegmentsInfo.PlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + if err := s.uploadPlaylist(); err != nil { + return err + } if s.livePlaylist != nil { if err := s.livePlaylist.Close(); err != nil { logger.Errorw("failed to send EOS to live playlist writer", err) } - - // upload the finalized live playlist - liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) - liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, _ = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) + if err := s.uploadLivePlaylist(); err != nil { + return err + } } if !s.DisableManifest { + playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename) + playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) manifestLocalPath := fmt.Sprintf("%s.json", playlistLocalPath) manifestStoragePath := fmt.Sprintf("%s.json", playlistStoragePath) if err := uploadManifest(s.conf, s.Uploader, manifestLocalPath, manifestStoragePath); err != nil {