Skip to content

Commit

Permalink
segments rework
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Oct 2, 2023
1 parent 2885c36 commit de74bd7
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 133 deletions.
261 changes: 139 additions & 122 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,29 @@ type SegmentSink struct {
conf *config.PipelineConfig
callbacks *gstreamer.Callbacks

playlist m3u8.PlaylistWriter
livePlaylist m3u8.PlaylistWriter
initialized bool
startTime time.Time
startRunningTime uint64
playlist m3u8.PlaylistWriter
livePlaylist m3u8.PlaylistWriter

segmentLock sync.Mutex
infoLock sync.Mutex
playlistLock sync.Mutex

initialized bool
startTime time.Time
outputType types.OutputType
startRunningTime uint64
openSegmentsStartTime map[string]uint64
openSegmentsLock sync.Mutex

endedSegments chan SegmentUpdate
done core.Fuse
closedSegments chan SegmentUpdate
playlistUpdates chan SegmentUpdate
throttle core.Throttle
done core.Fuse
}

type SegmentUpdate struct {
endTime uint64
filename string
endTime uint64
filename string
uploadComplete chan struct{}
}

func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks) (*SegmentSink, error) {
Expand All @@ -79,100 +86,135 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg
}
}

outputType := o.OutputType
if outputType == types.OutputTypeHLS {
outputType = types.OutputTypeTS
}

return &SegmentSink{
Uploader: u,
SegmentConfig: o,
conf: p,
callbacks: callbacks,
playlist: playlist,
livePlaylist: livePlaylist,
outputType: outputType,
openSegmentsStartTime: make(map[string]uint64),
endedSegments: make(chan SegmentUpdate, maxPendingUploads),
closedSegments: make(chan SegmentUpdate, maxPendingUploads),
playlistUpdates: make(chan SegmentUpdate, maxPendingUploads),
throttle: core.NewThrottle(time.Second * 2),
done: core.NewFuse(),
}, nil
}

func (s *SegmentSink) Start() error {
go func() {
var err error
defer func() {
if err != nil {
defer close(s.playlistUpdates)
for update := range s.closedSegments {
s.handleClosedSegment(update)
}
}()

go func() {
defer s.done.Break()
for update := range s.playlistUpdates {
if err := s.handlePlaylistUpdates(update); err != nil {
s.callbacks.OnError(err)
return
}
s.done.Break()
}()
}
}()

for update := range s.endedSegments {
segmentLocalPath := path.Join(s.LocalDir, update.filename)
segmentStoragePath := path.Join(s.StorageDir, update.filename)
return nil
}

var size int64
_, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType(), true)
if err != nil {
return
}
func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
// keep playlist updates in order
s.playlistUpdates <- update

s.SegmentsInfo.SegmentCount++
s.SegmentsInfo.Size += size
segmentLocalPath := path.Join(s.LocalDir, update.filename)
segmentStoragePath := path.Join(s.StorageDir, update.filename)

err = s.endSegment(update.filename, update.endTime)
if err != nil {
logger.Errorw("failed to end segment", err, "path", segmentLocalPath)
return
}
// upload in parallel
go func() {
defer close(update.uploadComplete)

count := s.SegmentsInfo.SegmentCount
pending := len(s.endedSegments)
if pending > 50 {
if count%10 == 0 {
logger.Warnw("segment queue growing", nil, "pending", pending, "total", count)
err = s.uploadPlaylist()
}
} else if pending > 25 {
if count%5 > 0 {
err = s.uploadPlaylist()
}
} else if pending > 10 {
if count%2 > 0 {
err = s.uploadPlaylist()
}
} else {
err = s.uploadPlaylist()
}
if err != nil {
return
}
_, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
if err != nil {
s.callbacks.OnError(err)
return
}

// lock segment info updates
s.infoLock.Lock()
s.SegmentsInfo.SegmentCount++
s.SegmentsInfo.Size += size
s.infoLock.Unlock()
}()
}

func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error {
s.segmentLock.Lock()
defer s.segmentLock.Unlock()

if s.livePlaylist != nil {
if err = s.uploadLivePlaylist(); err != nil {
return
}
t, ok := s.openSegmentsStartTime[update.filename]
if !ok {
return fmt.Errorf("no open segment with the name %s", update.filename)
}
delete(s.openSegmentsStartTime, update.filename)

duration := float64(time.Duration(update.endTime-t)) / float64(time.Second)
segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime))

// do not update playlist until upload is complete
<-update.uploadComplete

s.playlistLock.Lock()
if err := s.playlist.Append(segmentStartTime, duration, update.filename); err != nil {
s.playlistLock.Unlock()
return err
}
if s.livePlaylist != nil {
if err := s.livePlaylist.Append(segmentStartTime, duration, update.filename); err != nil {
s.playlistLock.Unlock()
return err
}
}

// throttle playlist uploads
s.throttle(func() {
s.playlistLock.Lock()
defer s.playlistLock.Unlock()

if err := s.uploadPlaylist(); err != nil {
s.callbacks.OnError(err)
}
if s.livePlaylist != nil {
if err := s.uploadLivePlaylist(); err != nil {
s.callbacks.OnError(err)
}
}
}()
})

return nil
}

func (s *SegmentSink) getSegmentOutputType() types.OutputType {
switch s.OutputType {
case types.OutputTypeHLS:
// HLS is always mpeg ts for now. We may implement fmp4 in the future
return types.OutputTypeTS
default:
return s.OutputType
}
func (s *SegmentSink) UpdateStartDate(t time.Time) {
s.segmentLock.Lock()
defer s.segmentLock.Unlock()

s.startTime = t
}

func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error {
func (s *SegmentSink) FragmentOpened(filepath string, startTime uint64) error {
if !strings.HasPrefix(filepath, s.LocalDir) {
return fmt.Errorf("invalid filepath")
}

filename := filepath[len(s.LocalDir):]

s.openSegmentsLock.Lock()
defer s.openSegmentsLock.Unlock()
s.segmentLock.Lock()
defer s.segmentLock.Unlock()

if !s.initialized {
s.initialized = true
Expand All @@ -187,22 +229,19 @@ func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error {
return nil
}

func (s *SegmentSink) UpdateStartDate(t time.Time) {
s.openSegmentsLock.Lock()
defer s.openSegmentsLock.Unlock()

s.startTime = t
}

func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) error {
func (s *SegmentSink) FragmentClosed(filepath string, endTime uint64) error {
if !strings.HasPrefix(filepath, s.LocalDir) {
return fmt.Errorf("invalid filepath")
}

filename := filepath[len(s.LocalDir):]

select {
case s.endedSegments <- SegmentUpdate{filename: filename, endTime: endTime}:
case s.closedSegments <- SegmentUpdate{
filename: filename,
endTime: endTime,
uploadComplete: make(chan struct{}),
}:
return nil

default:
Expand All @@ -212,62 +251,24 @@ func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) erro
}
}

func (s *SegmentSink) endSegment(filename string, endTime uint64) error {
s.openSegmentsLock.Lock()
defer s.openSegmentsLock.Unlock()

t, ok := s.openSegmentsStartTime[filename]
if !ok {
return fmt.Errorf("no open segment with the name %s", filename)
}
delete(s.openSegmentsStartTime, filename)

duration := float64(time.Duration(endTime-t)) / float64(time.Second)
segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime))
if err := s.playlist.Append(segmentStartTime, duration, filename); err != nil {
return err
}

if s.livePlaylist != nil {
if err := s.livePlaylist.Append(segmentStartTime, duration, filename); err != nil {
return err
}
}

return nil
}

func (s *SegmentSink) uploadPlaylist() error {
var err error
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
return err
}

func (s *SegmentSink) uploadLivePlaylist() error {
var err error
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
return err
}

func (s *SegmentSink) Close() error {
// wait for all pending upload jobs to finish
close(s.endedSegments)
// wait for pending jobs to finish
close(s.closedSegments)
<-s.done.Watch()

s.playlistLock.Lock()
defer s.playlistLock.Unlock()

if err := s.playlist.Close(); err != nil {
logger.Errorw("failed to send EOS to playlist writer", err)
return err
}
if err := s.uploadPlaylist(); err != nil {
return err
}

if s.livePlaylist != nil {
if err := s.livePlaylist.Close(); err != nil {
logger.Errorw("failed to send EOS to live playlist writer", err)
return err
}
if err := s.uploadLivePlaylist(); err != nil {
return err
Expand Down Expand Up @@ -299,3 +300,19 @@ func (s *SegmentSink) Cleanup() {
}
}
}

func (s *SegmentSink) uploadPlaylist() error {
var err error
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
return err
}

func (s *SegmentSink) uploadLivePlaylist() error {
var err error
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
return err
}
22 changes: 11 additions & 11 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,23 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
s := msg.GetStructure()
if s != nil {
switch s.Name() {
case msgFirstSampleMetadata:
startDate, err := getFirstSampleMetadataFromGstStructure(s)
if err != nil {
return err
}
logger.Debugw("received FirstSampleMetadata message", "startDate", startDate)

c.getSegmentSink().UpdateStartDate(startDate)

case msgFragmentOpened:
filepath, t, err := getSegmentParamsFromGstStructure(s)
if err != nil {
logger.Errorw("failed to retrieve segment parameters from event", err)
return err
}

if err = c.getSegmentSink().StartSegment(filepath, t); err != nil {
if err = c.getSegmentSink().FragmentOpened(filepath, t); err != nil {
logger.Errorw("failed to register new segment with playlist writer", err, "location", filepath, "runningTime", t)
return err
}
Expand All @@ -265,19 +274,10 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
// We need to dispatch to a queue to:
// 1. Avoid concurrent access to the SegmentsInfo structure
// 2. Ensure that playlists are uploaded in the same order they are enqueued to avoid an older playlist overwriting a newer one
if err = c.getSegmentSink().EnqueueSegmentUpload(filepath, t); err != nil {
if err = c.getSegmentSink().FragmentClosed(filepath, t); err != nil {
logger.Errorw("failed to end segment with playlist writer", err, "runningTime", t)
return err
}

case msgFirstSampleMetadata:
startDate, err := getFirstSampleMetadataFromGstStructure(s)
if err != nil {
return err
}
logger.Debugw("received FirstSampleMetadata message", "startDate", startDate)

c.getSegmentSink().UpdateStartDate(startDate)
}
}

Expand Down

0 comments on commit de74bd7

Please sign in to comment.