Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

segments rework #509

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 140 additions & 122 deletions pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,29 @@ type SegmentSink struct {
conf *config.PipelineConfig
callbacks *gstreamer.Callbacks

playlist m3u8.PlaylistWriter
livePlaylist m3u8.PlaylistWriter
initialized bool
startTime time.Time
startRunningTime uint64
playlist m3u8.PlaylistWriter
livePlaylist m3u8.PlaylistWriter

segmentLock sync.Mutex
infoLock sync.Mutex
playlistLock sync.Mutex

initialized bool
startTime time.Time
outputType types.OutputType
startRunningTime uint64
openSegmentsStartTime map[string]uint64
openSegmentsLock sync.Mutex

endedSegments chan SegmentUpdate
done core.Fuse
closedSegments chan SegmentUpdate
playlistUpdates chan SegmentUpdate
throttle core.Throttle
done core.Fuse
}

type SegmentUpdate struct {
endTime uint64
filename string
endTime uint64
filename string
uploadComplete chan struct{}
}

func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks) (*SegmentSink, error) {
Expand All @@ -79,100 +86,136 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg
}
}

outputType := o.OutputType
if outputType == types.OutputTypeHLS {
outputType = types.OutputTypeTS
}

return &SegmentSink{
Uploader: u,
SegmentConfig: o,
conf: p,
callbacks: callbacks,
playlist: playlist,
livePlaylist: livePlaylist,
outputType: outputType,
openSegmentsStartTime: make(map[string]uint64),
endedSegments: make(chan SegmentUpdate, maxPendingUploads),
closedSegments: make(chan SegmentUpdate, maxPendingUploads),
playlistUpdates: make(chan SegmentUpdate, maxPendingUploads),
throttle: core.NewThrottle(time.Second * 2),
done: core.NewFuse(),
}, nil
}

func (s *SegmentSink) Start() error {
go func() {
var err error
defer func() {
if err != nil {
defer close(s.playlistUpdates)
for update := range s.closedSegments {
s.handleClosedSegment(update)
}
}()

go func() {
defer s.done.Break()
for update := range s.playlistUpdates {
if err := s.handlePlaylistUpdates(update); err != nil {
s.callbacks.OnError(err)
return
}
s.done.Break()
}()
}
}()

for update := range s.endedSegments {
segmentLocalPath := path.Join(s.LocalDir, update.filename)
segmentStoragePath := path.Join(s.StorageDir, update.filename)
return nil
}

var size int64
_, size, err = s.Upload(segmentLocalPath, segmentStoragePath, s.getSegmentOutputType(), true)
if err != nil {
return
}
func (s *SegmentSink) handleClosedSegment(update SegmentUpdate) {
// keep playlist updates in order
s.playlistUpdates <- update

s.SegmentsInfo.SegmentCount++
s.SegmentsInfo.Size += size
segmentLocalPath := path.Join(s.LocalDir, update.filename)
segmentStoragePath := path.Join(s.StorageDir, update.filename)

err = s.endSegment(update.filename, update.endTime)
if err != nil {
logger.Errorw("failed to end segment", err, "path", segmentLocalPath)
return
}
// upload in parallel
go func() {
defer close(update.uploadComplete)

count := s.SegmentsInfo.SegmentCount
pending := len(s.endedSegments)
if pending > 50 {
if count%10 == 0 {
logger.Warnw("segment queue growing", nil, "pending", pending, "total", count)
err = s.uploadPlaylist()
}
} else if pending > 25 {
if count%5 > 0 {
err = s.uploadPlaylist()
}
} else if pending > 10 {
if count%2 > 0 {
err = s.uploadPlaylist()
}
} else {
err = s.uploadPlaylist()
}
if err != nil {
return
}
_, size, err := s.Upload(segmentLocalPath, segmentStoragePath, s.outputType, true)
if err != nil {
s.callbacks.OnError(err)
return
}

// lock segment info updates
s.infoLock.Lock()
s.SegmentsInfo.SegmentCount++
s.SegmentsInfo.Size += size
s.infoLock.Unlock()
}()
}

func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error {
s.segmentLock.Lock()
t, ok := s.openSegmentsStartTime[update.filename]
if !ok {
s.segmentLock.Unlock()
return fmt.Errorf("no open segment with the name %s", update.filename)
}
delete(s.openSegmentsStartTime, update.filename)
s.segmentLock.Unlock()

duration := float64(time.Duration(update.endTime-t)) / float64(time.Second)
segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime))

// do not update playlist until upload is complete
<-update.uploadComplete

s.playlistLock.Lock()
if err := s.playlist.Append(segmentStartTime, duration, update.filename); err != nil {
s.playlistLock.Unlock()
return err
}
if s.livePlaylist != nil {
if err := s.livePlaylist.Append(segmentStartTime, duration, update.filename); err != nil {
s.playlistLock.Unlock()
return err
}
}
s.playlistLock.Unlock()

// throttle playlist uploads
s.throttle(func() {
s.playlistLock.Lock()
defer s.playlistLock.Unlock()

if s.livePlaylist != nil {
if err = s.uploadLivePlaylist(); err != nil {
return
}
if err := s.uploadPlaylist(); err != nil {
s.callbacks.OnError(err)
}
if s.livePlaylist != nil {
if err := s.uploadLivePlaylist(); err != nil {
s.callbacks.OnError(err)
}
}
}()
})

return nil
}

func (s *SegmentSink) getSegmentOutputType() types.OutputType {
switch s.OutputType {
case types.OutputTypeHLS:
// HLS is always mpeg ts for now. We may implement fmp4 in the future
return types.OutputTypeTS
default:
return s.OutputType
}
func (s *SegmentSink) UpdateStartDate(t time.Time) {
s.segmentLock.Lock()
defer s.segmentLock.Unlock()

s.startTime = t
}

func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error {
func (s *SegmentSink) FragmentOpened(filepath string, startTime uint64) error {
if !strings.HasPrefix(filepath, s.LocalDir) {
return fmt.Errorf("invalid filepath")
}

filename := filepath[len(s.LocalDir):]

s.openSegmentsLock.Lock()
defer s.openSegmentsLock.Unlock()
s.segmentLock.Lock()
defer s.segmentLock.Unlock()

if !s.initialized {
s.initialized = true
Expand All @@ -187,22 +230,19 @@ func (s *SegmentSink) StartSegment(filepath string, startTime uint64) error {
return nil
}

func (s *SegmentSink) UpdateStartDate(t time.Time) {
s.openSegmentsLock.Lock()
defer s.openSegmentsLock.Unlock()

s.startTime = t
}

func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) error {
func (s *SegmentSink) FragmentClosed(filepath string, endTime uint64) error {
if !strings.HasPrefix(filepath, s.LocalDir) {
return fmt.Errorf("invalid filepath")
}

filename := filepath[len(s.LocalDir):]

select {
case s.endedSegments <- SegmentUpdate{filename: filename, endTime: endTime}:
case s.closedSegments <- SegmentUpdate{
filename: filename,
endTime: endTime,
uploadComplete: make(chan struct{}),
}:
return nil

default:
Expand All @@ -212,62 +252,24 @@ func (s *SegmentSink) EnqueueSegmentUpload(filepath string, endTime uint64) erro
}
}

func (s *SegmentSink) endSegment(filename string, endTime uint64) error {
s.openSegmentsLock.Lock()
defer s.openSegmentsLock.Unlock()

t, ok := s.openSegmentsStartTime[filename]
if !ok {
return fmt.Errorf("no open segment with the name %s", filename)
}
delete(s.openSegmentsStartTime, filename)

duration := float64(time.Duration(endTime-t)) / float64(time.Second)
segmentStartTime := s.startTime.Add(time.Duration(t - s.startRunningTime))
if err := s.playlist.Append(segmentStartTime, duration, filename); err != nil {
return err
}

if s.livePlaylist != nil {
if err := s.livePlaylist.Append(segmentStartTime, duration, filename); err != nil {
return err
}
}

return nil
}

func (s *SegmentSink) uploadPlaylist() error {
var err error
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
return err
}

func (s *SegmentSink) uploadLivePlaylist() error {
var err error
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
return err
}

func (s *SegmentSink) Close() error {
// wait for all pending upload jobs to finish
close(s.endedSegments)
// wait for pending jobs to finish
close(s.closedSegments)
<-s.done.Watch()

s.playlistLock.Lock()
defer s.playlistLock.Unlock()

if err := s.playlist.Close(); err != nil {
logger.Errorw("failed to send EOS to playlist writer", err)
return err
}
if err := s.uploadPlaylist(); err != nil {
return err
}

if s.livePlaylist != nil {
if err := s.livePlaylist.Close(); err != nil {
logger.Errorw("failed to send EOS to live playlist writer", err)
return err
}
if err := s.uploadLivePlaylist(); err != nil {
return err
Expand Down Expand Up @@ -299,3 +301,19 @@ func (s *SegmentSink) Cleanup() {
}
}
}

func (s *SegmentSink) uploadPlaylist() error {
var err error
playlistLocalPath := path.Join(s.LocalDir, s.PlaylistFilename)
playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename)
s.SegmentsInfo.PlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false)
return err
}

func (s *SegmentSink) uploadLivePlaylist() error {
var err error
liveLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename)
liveStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename)
s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(liveLocalPath, liveStoragePath, s.OutputType, false)
return err
}
22 changes: 11 additions & 11 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,23 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
s := msg.GetStructure()
if s != nil {
switch s.Name() {
case msgFirstSampleMetadata:
startDate, err := getFirstSampleMetadataFromGstStructure(s)
if err != nil {
return err
}
logger.Debugw("received FirstSampleMetadata message", "startDate", startDate)

c.getSegmentSink().UpdateStartDate(startDate)

case msgFragmentOpened:
filepath, t, err := getSegmentParamsFromGstStructure(s)
if err != nil {
logger.Errorw("failed to retrieve segment parameters from event", err)
return err
}

if err = c.getSegmentSink().StartSegment(filepath, t); err != nil {
if err = c.getSegmentSink().FragmentOpened(filepath, t); err != nil {
logger.Errorw("failed to register new segment with playlist writer", err, "location", filepath, "runningTime", t)
return err
}
Expand All @@ -265,19 +274,10 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error {
// We need to dispatch to a queue to:
// 1. Avoid concurrent access to the SegmentsInfo structure
// 2. Ensure that playlists are uploaded in the same order they are enqueued to avoid an older playlist overwriting a newer one
if err = c.getSegmentSink().EnqueueSegmentUpload(filepath, t); err != nil {
if err = c.getSegmentSink().FragmentClosed(filepath, t); err != nil {
logger.Errorw("failed to end segment with playlist writer", err, "runningTime", t)
return err
}

case msgFirstSampleMetadata:
startDate, err := getFirstSampleMetadataFromGstStructure(s)
if err != nil {
return err
}
logger.Debugw("received FirstSampleMetadata message", "startDate", startDate)

c.getSegmentSink().UpdateStartDate(startDate)
}
}

Expand Down