diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index d8e2f99b..e7176b1e 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -42,13 +42,12 @@ type SegmentSink struct { conf *config.PipelineConfig callbacks *gstreamer.Callbacks - playlist *m3u8.PlaylistWriter - currentItemStartTimestamp int64 - currentItemFilename string - startDate time.Time - startDateTimestamp time.Duration + playlist *m3u8.PlaylistWriter + initialized bool + startTime time.Time + startRunningTime uint64 - openSegmentsStartTime map[string]int64 + openSegmentsStartTime map[string]uint64 openSegmentsLock sync.Mutex endedSegments chan SegmentUpdate @@ -56,7 +55,7 @@ type SegmentSink struct { } type SegmentUpdate struct { - endTime int64 + endTime uint64 filename string } @@ -73,10 +72,9 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg conf: p, callbacks: callbacks, playlist: playlist, - openSegmentsStartTime: make(map[string]int64), + openSegmentsStartTime: make(map[string]uint64), endedSegments: make(chan SegmentUpdate, maxPendingUploads), done: core.NewFuse(), - startDateTimestamp: -1, }, nil } @@ -131,22 +129,19 @@ func (s *SegmentSink) getSegmentOutputType() types.OutputType { } } -func (s *SegmentSink) StartSegment(filepath string, startTime int64) error { +func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } filename := filepath[len(s.LocalDir):] - if startTime < 0 { - return fmt.Errorf("invalid start timestamp") - } - s.openSegmentsLock.Lock() defer s.openSegmentsLock.Unlock() - if s.startDateTimestamp < 0 { - s.startDateTimestamp = time.Duration(startTime) + if !s.initialized { + s.initialized = true + s.startRunningTime = startTime } if _, ok := s.openSegmentsStartTime[filename]; ok { @@ -154,7 +149,6 @@ func (s *SegmentSink) StartSegment(filepath string, startTime int64) error { } s.openSegmentsStartTime[filename] = startTime - return nil } @@ -162,10 +156,10 @@ func (s *SegmentSink) UpdateStartDate(t time.Time) { s.openSegmentsLock.Lock() defer s.openSegmentsLock.Unlock() - s.startDate = t + s.startTime = t } -func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime int64) error { +func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) error { if !strings.HasPrefix(filepath, s.LocalDir) { return fmt.Errorf("invalid filepath") } @@ -183,11 +177,7 @@ func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime int64) error } } -func (s *SegmentSink) endSegment(filename string, endTime int64) error { - if endTime <= s.currentItemStartTimestamp { - return fmt.Errorf("segment end time before start time") - } - +func (s *SegmentSink) endSegment(filename string, endTime uint64) error { s.openSegmentsLock.Lock() defer s.openSegmentsLock.Unlock() @@ -197,10 +187,8 @@ func (s *SegmentSink) endSegment(filename string, endTime int64) error { } delete(s.openSegmentsStartTime, filename) - duration := float64(endTime-t) / float64(time.Second) - - segmentStartDate := s.startDate.Add(-s.startDateTimestamp).Add(time.Duration(t)) - if err := s.playlist.Append(segmentStartDate, duration, filename); err != nil { + segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime)) + if err := s.playlist.Append(segmentStartTime, float64(endTime-t), filename); err != nil { return err } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 4d7d20a6..1ec9a74d 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -290,7 +290,7 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { return nil } -func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time int64, err error) { +func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time uint64, err error) { loc, err := s.GetValue(fragmentLocation) if err != nil { return "", 0, err @@ -309,7 +309,7 @@ func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time i return "", 0, errors.ErrGstPipelineError(errors.New("invalid type for time")) } - return filepath, int64(ti), nil + return filepath, ti, nil } func getFirstSampleMetadataFromGstStructure(s *gst.Structure) (startDate time.Time, err error) {