diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index dce2d8ff..e27fc723 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -45,14 +45,13 @@ type SegmentSink struct { conf *config.PipelineConfig callbacks *gstreamer.Callbacks - playlist m3u8.PlaylistWriter - livePlaylist m3u8.PlaylistWriter - currentItemStartTimestamp int64 - currentItemFilename string - startDate time.Time - startDateTimestamp time.Duration - - openSegmentsStartTime map[string]int64 + playlist m3u8.PlaylistWriter + livePlaylist m3u8.PlaylistWriter + initialized bool + startTime time.Time + startRunningTime uint64 + + openSegmentsStartTime map[string]uint64 openSegmentsLock sync.Mutex endedSegments chan SegmentUpdate @@ -60,7 +59,7 @@ type SegmentSink struct { } type SegmentUpdate struct { - endTime int64 + endTime uint64 filename string } @@ -87,10 +86,9 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg callbacks: callbacks, playlist: playlist, livePlaylist: livePlaylist, - openSegmentsStartTime: make(map[string]int64), + openSegmentsStartTime: make(map[string]uint64), endedSegments: make(chan SegmentUpdate, maxPendingUploads), done: core.NewFuse(), - startDateTimestamp: -1, }, nil } @@ -154,22 +152,19 @@ func (s *SegmentSink) getSegmentOutputType() types.OutputType { } } -func (s *SegmentSink) StartSegment(filepath string, startTime int64) error { +func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } filename := filepath[len(s.LocalDir):] - if startTime < 0 { - return fmt.Errorf("invalid start timestamp") - } - s.openSegmentsLock.Lock() defer s.openSegmentsLock.Unlock() - if s.startDateTimestamp < 0 { - s.startDateTimestamp = time.Duration(startTime) + if !s.initialized { + s.initialized = true + s.startRunningTime = startTime } if _, ok := s.openSegmentsStartTime[filename]; ok { @@ -177,7 +172,6 @@ func (s *SegmentSink) StartSegment(filepath string, startTime int64) error { } s.openSegmentsStartTime[filename] = startTime - return nil } @@ -185,10 +179,10 @@ func (s *SegmentSink) UpdateStartDate(t time.Time) { s.openSegmentsLock.Lock() defer s.openSegmentsLock.Unlock() - s.startDate = t + s.startTime = t } -func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime int64) error { +func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } @@ -206,11 +200,7 @@ func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime int64) error } } -func (s *SegmentSink) endSegment(filename string, endTime int64) error { - if endTime <= s.currentItemStartTimestamp { - return fmt.Errorf("segment end time before start time") - } - +func (s *SegmentSink) endSegment(filename string, endTime uint64) error { s.openSegmentsLock.Lock() defer s.openSegmentsLock.Unlock() @@ -220,14 +210,14 @@ func (s *SegmentSink) endSegment(filename string, endTime int64) error { } delete(s.openSegmentsStartTime, filename) - duration := float64(endTime-t) / float64(time.Second) - - segmentStartDate := s.startDate.Add(-s.startDateTimestamp).Add(time.Duration(t)) - if err := s.playlist.Append(segmentStartDate, duration, filename); err != nil { + duration := float64(time.Duration(endTime-t)) / float64(time.Second) + segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime)) + if err := s.playlist.Append(segmentStartTime, duration, filename); err != nil { return err } + if s.livePlaylist != nil { - if err := s.livePlaylist.Append(segmentStartDate, duration, filename); err != nil { + if err := s.livePlaylist.Append(segmentStartTime, duration, filename); err != nil { return err } } @@ -255,9 +245,9 @@ func (s *SegmentSink) Close() error { } // upload the finalized live playlist - playlistLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) - playlistStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) - s.SegmentsInfo.LivePlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) + liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) + s.SegmentsInfo.LivePlaylistLocation, _, _ = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false) } if !s.DisableManifest { diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 50c031e9..93f587ee 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -290,7 +290,7 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { return nil } -func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time int64, err error) { +func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time uint64, err error) { loc, err := s.GetValue(fragmentLocation) if err != nil { return "", 0, err @@ -309,7 +309,7 @@ func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time i return "", 0, errors.ErrGstPipelineError(errors.New("invalid type for time")) } - return filepath, int64(ti), nil + return filepath, ti, nil } func getFirstSampleMetadataFromGstStructure(s *gst.Structure) (startDate time.Time, err error) { diff --git a/test/segments.go b/test/segments.go index 747b8114..d7c250e7 100644 --- a/test/segments.go +++ b/test/segments.go @@ -140,7 +140,7 @@ func verifyPlaylistProgramDateTime(t *testing.T, filenameSuffix livekit.Segmente require.InDelta(t, s.ProgramDateTime.UnixNano(), tm.UnixNano(), float64(time.Millisecond)) } - if i < len(p.Segments)-1 { + if i < len(p.Segments)-2 { nextSegmentStartDate := p.Segments[i+1].ProgramDateTime dateDuration := nextSegmentStartDate.Sub(s.ProgramDateTime)