diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cd9c1825..246820bd 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -134,62 +134,66 @@ func TestSegmentNaming(t *testing.T) { }) for _, test := range []struct { - filenamePrefix string - playlistName string - expectedStorageDir string - expectedPlaylistFilename string - expectedSegmentPrefix string + filenamePrefix string + playlistName string + livePlaylistName string + expectedStorageDir string + expectedPlaylistFilename string + expectedLivePlaylistFilename string + expectedSegmentPrefix string }{ { - filenamePrefix: "", playlistName: "playlist", - expectedStorageDir: "", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "playlist", + filenamePrefix: "", playlistName: "playlist", livePlaylistName: "", + expectedStorageDir: "", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "playlist", }, { - filenamePrefix: "", playlistName: "conf_test/playlist", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "playlist", + filenamePrefix: "", playlistName: "conf_test/playlist", livePlaylistName: "conf_test/live_playlist", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "live_playlist.m3u8", expectedSegmentPrefix: "playlist", }, { - filenamePrefix: "filename", playlistName: "", - expectedStorageDir: "", expectedPlaylistFilename: "filename.m3u8", expectedSegmentPrefix: "filename", + filenamePrefix: "filename", playlistName: "", livePlaylistName: "live_playlist2.m3u8", + expectedStorageDir: "", expectedPlaylistFilename: "filename.m3u8", expectedLivePlaylistFilename: "live_playlist2.m3u8", expectedSegmentPrefix: "filename", }, { - filenamePrefix: "filename", playlistName: "playlist", - expectedStorageDir: "", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "filename", + filenamePrefix: "filename", playlistName: "playlist", livePlaylistName: "", + expectedStorageDir: "", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "filename", }, { - filenamePrefix: "filename", playlistName: "conf_test/", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "filename.m3u8", expectedSegmentPrefix: "filename", + filenamePrefix: "filename", playlistName: "conf_test/", livePlaylistName: "", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "filename.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "filename", }, { - filenamePrefix: "filename", playlistName: "conf_test/playlist", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "filename", + filenamePrefix: "filename", playlistName: "conf_test/playlist", livePlaylistName: "", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "filename", }, { - filenamePrefix: "conf_test/", playlistName: "playlist", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "playlist", + filenamePrefix: "conf_test/", playlistName: "playlist", livePlaylistName: "", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "playlist", }, { - filenamePrefix: "conf_test/filename", playlistName: "playlist", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "filename", + filenamePrefix: "conf_test/filename", playlistName: "playlist", livePlaylistName: "", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "filename", }, { - filenamePrefix: "conf_test/filename", playlistName: "conf_test/playlist", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "filename", + filenamePrefix: "conf_test/filename", playlistName: "conf_test/playlist", livePlaylistName: "", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "filename", }, { - filenamePrefix: "conf_test_2/filename", playlistName: "conf_test/playlist", - expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedSegmentPrefix: "conf_test_2/filename", + filenamePrefix: "conf_test_2/filename", playlistName: "conf_test/playlist", livePlaylistName: "", + expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "conf_test_2/filename", }, } { p := &PipelineConfig{Info: &livekit.EgressInfo{EgressId: "egress_ID"}} o, err := p.getSegmentConfig(&livekit.SegmentedFileOutput{ - FilenamePrefix: test.filenamePrefix, - PlaylistName: test.playlistName, + FilenamePrefix: test.filenamePrefix, + PlaylistName: test.playlistName, + LivePlaylistName: test.livePlaylistName, }) require.NoError(t, err) require.Equal(t, test.expectedStorageDir, o.StorageDir) require.Equal(t, test.expectedPlaylistFilename, o.PlaylistFilename) + require.Equal(t, test.expectedLivePlaylistFilename, o.LivePlaylistFilename) require.Equal(t, test.expectedSegmentPrefix, o.SegmentPrefix) } } diff --git a/pkg/config/output_segment.go b/pkg/config/output_segment.go index 99dfcf53..fb07b52d 100644 --- a/pkg/config/output_segment.go +++ b/pkg/config/output_segment.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" ) @@ -28,13 +29,14 @@ import ( type SegmentConfig struct { outputConfig - SegmentsInfo *livekit.SegmentsInfo - LocalDir string - StorageDir string - PlaylistFilename string - SegmentPrefix string - SegmentSuffix livekit.SegmentedFileSuffix - SegmentDuration int + SegmentsInfo *livekit.SegmentsInfo + LocalDir string + StorageDir string + PlaylistFilename string + LivePlaylistFilename string + SegmentPrefix string + SegmentSuffix livekit.SegmentedFileSuffix + SegmentDuration int DisableManifest bool UploadConfig UploadConfig @@ -51,13 +53,14 @@ func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig { // segments should always be added last, so we can check keyframe interval from file/stream func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) (*SegmentConfig, error) { conf := &SegmentConfig{ - SegmentsInfo: &livekit.SegmentsInfo{}, - SegmentPrefix: clean(segments.FilenamePrefix), - SegmentSuffix: segments.FilenameSuffix, - PlaylistFilename: clean(segments.PlaylistName), - SegmentDuration: int(segments.SegmentDuration), - DisableManifest: segments.DisableManifest, - UploadConfig: p.getUploadConfig(segments), + SegmentsInfo: &livekit.SegmentsInfo{}, + SegmentPrefix: clean(segments.FilenamePrefix), + SegmentSuffix: segments.FilenameSuffix, + PlaylistFilename: clean(segments.PlaylistName), + LivePlaylistFilename: clean(segments.LivePlaylistName), + SegmentDuration: int(segments.SegmentDuration), + DisableManifest: segments.DisableManifest, + UploadConfig: p.getUploadConfig(segments), } if conf.SegmentDuration == 0 { @@ -91,26 +94,40 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput) return conf, nil } +func removeKnownExtension(filename string) string { + if extIdx := strings.LastIndex(filename, "."); extIdx > -1 { + existingExt := types.FileExtension(filename[extIdx:]) + if _, ok := types.FileExtensions[existingExt]; ok { + filename = filename[:extIdx] + } + filename = filename[:extIdx] + } + + return filename +} + func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { identifier, replacements := p.getFilenameInfo() o.SegmentPrefix = stringReplace(o.SegmentPrefix, replacements) o.PlaylistFilename = stringReplace(o.PlaylistFilename, replacements) + o.LivePlaylistFilename = stringReplace(o.LivePlaylistFilename, replacements) ext := types.FileExtensionForOutputType[o.OutputType] playlistDir, playlistName := path.Split(o.PlaylistFilename) + livePlaylistDir, livePlaylistName := path.Split(o.LivePlaylistFilename) fileDir, filePrefix := path.Split(o.SegmentPrefix) - // remove extension from playlist name - if extIdx := strings.LastIndex(playlistName, "."); extIdx > -1 { - existingExt := types.FileExtension(playlistName[extIdx:]) - if _, ok := types.FileExtensions[existingExt]; ok { - playlistName = playlistName[:extIdx] - } - playlistName = playlistName[:extIdx] + // force live playlist to be in the same directory as the main playlist + if livePlaylistDir != "" && livePlaylistDir != playlistDir { + return errors.ErrInvalidInput("live_playlist_name must be in same directory as playlist_name") } + // remove extension from playlist name + playlistName = removeKnownExtension(playlistName) + livePlaylistName = removeKnownExtension(livePlaylistName) + // only keep fileDir if it is a subdirectory of playlistDir if fileDir != "" { if playlistDir == fileDir { @@ -130,6 +147,7 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { playlistName = fmt.Sprintf("%s-%s", identifier, time.Now().Format("2006-01-02T150405")) } } + // live playlist disabled by default // ensure filePrefix if filePrefix == "" { @@ -139,8 +157,15 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { // update config o.StorageDir = playlistDir o.PlaylistFilename = fmt.Sprintf("%s%s", playlistName, ext) + if livePlaylistName != "" { + o.LivePlaylistFilename = fmt.Sprintf("%s%s", livePlaylistName, ext) + } o.SegmentPrefix = fmt.Sprintf("%s%s", fileDir, filePrefix) + if o.PlaylistFilename == o.LivePlaylistFilename { + return errors.ErrInvalidInput("live_playlist_name cannot be identical to playlist_name") + } + if o.UploadConfig == nil { o.LocalDir = playlistDir } else { @@ -162,5 +187,8 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error { } o.SegmentsInfo.PlaylistName = path.Join(o.StorageDir, o.PlaylistFilename) + if o.LivePlaylistFilename != "" { + o.SegmentsInfo.LivePlaylistName = path.Join(o.StorageDir, o.LivePlaylistFilename) + } return nil } diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 31bceb81..99abd757 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -572,8 +572,10 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s o.LocalDir = stringReplace(o.LocalDir, replacements) o.StorageDir = stringReplace(o.StorageDir, replacements) o.PlaylistFilename = stringReplace(o.PlaylistFilename, replacements) + o.LivePlaylistFilename = stringReplace(o.LivePlaylistFilename, replacements) o.SegmentPrefix = stringReplace(o.SegmentPrefix, replacements) o.SegmentsInfo.PlaylistName = stringReplace(o.SegmentsInfo.PlaylistName, replacements) + o.SegmentsInfo.LivePlaylistName = stringReplace(o.SegmentsInfo.LivePlaylistName, replacements) } } diff --git a/pkg/pipeline/sink/m3u8/writer.go b/pkg/pipeline/sink/m3u8/writer.go index 93416ecc..30ee63b9 100644 --- a/pkg/pipeline/sink/m3u8/writer.go +++ b/pkg/pipeline/sink/m3u8/writer.go @@ -15,6 +15,7 @@ package m3u8 import ( + "container/list" "fmt" "io/fs" "os" @@ -23,32 +24,82 @@ import ( "time" ) -type PlaylistWriter struct { +type PlaylistType string + +const ( + PlaylistTypeLive PlaylistType = "" + PlaylistTypeEvent PlaylistType = "EVENT" +) + +type PlaylistWriter interface { + Append(dateTime time.Time, duration float64, filename string) error + Close() error +} + +type basePlaylistWriter struct { filename string targetDuration int } -func NewPlaylistWriter(filename string, targetDuration int) (*PlaylistWriter, error) { - p := &PlaylistWriter{ - filename: filename, - targetDuration: targetDuration, - } +type eventPlaylistWriter struct { + basePlaylistWriter +} - f, err := os.Create(p.filename) - if err != nil { - return nil, err - } - defer f.Close() +type livePlaylistWriter struct { + basePlaylistWriter + + windowSize int + mediaSeq int + + livePlaylistHeader string + livePlaylistSegments *list.List +} +func (p *basePlaylistWriter) createHeader(plType PlaylistType) string { var sb strings.Builder sb.WriteString("#EXTM3U\n") sb.WriteString("#EXT-X-VERSION:4\n") - sb.WriteString("#EXT-X-PLAYLIST-TYPE:EVENT\n") + if plType != PlaylistTypeLive { + sb.WriteString(fmt.Sprintf("#EXT-X-PLAYLIST-TYPE:%s\n", plType)) + } sb.WriteString("#EXT-X-ALLOW-CACHE:NO\n") - sb.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n") sb.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", p.targetDuration)) + if plType != PlaylistTypeLive { + sb.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n") + } - _, err = f.WriteString(sb.String()) + return sb.String() +} + +func (p *basePlaylistWriter) createSegmentEntry(dateTime time.Time, duration float64, filename string) string { + var sb strings.Builder + + sb.WriteString("#EXT-X-PROGRAM-DATE-TIME:") + sb.WriteString(dateTime.UTC().Format("2006-01-02T15:04:05.999Z07:00")) + sb.WriteString("\n#EXTINF:") + sb.WriteString(strconv.FormatFloat(duration, 'f', 3, 32)) + sb.WriteString(",\n") + sb.WriteString(filename) + sb.WriteString("\n") + + return sb.String() +} + +func NewEventPlaylistWriter(filename string, targetDuration int) (PlaylistWriter, error) { + p := &eventPlaylistWriter{ + basePlaylistWriter: basePlaylistWriter{ + filename: filename, + targetDuration: targetDuration, + }, + } + + f, err := os.Create(p.filename) + if err != nil { + return nil, err + } + defer f.Close() + + _, err = f.WriteString(p.createHeader(PlaylistTypeEvent)) if err != nil { return nil, err } @@ -56,28 +107,19 @@ func NewPlaylistWriter(filename string, targetDuration int) (*PlaylistWriter, er return p, nil } -func (p *PlaylistWriter) Append(dateTime time.Time, duration float64, filename string) error { +func (p *eventPlaylistWriter) Append(dateTime time.Time, duration float64, filename string) error { f, err := os.OpenFile(p.filename, os.O_WRONLY|os.O_APPEND, fs.ModeAppend) if err != nil { return err } defer f.Close() - var sb strings.Builder - sb.WriteString("#EXT-X-PROGRAM-DATE-TIME:") - sb.WriteString(dateTime.UTC().Format("2006-01-02T15:04:05.999Z07:00")) - sb.WriteString("\n#EXTINF:") - sb.WriteString(strconv.FormatFloat(duration, 'f', 3, 32)) - sb.WriteString(",\n") - sb.WriteString(filename) - sb.WriteString("\n") - - _, err = f.WriteString(sb.String()) + _, err = f.WriteString(p.createSegmentEntry(dateTime, duration, filename)) return err } // Close sliding playlist and make them fixed. -func (p *PlaylistWriter) Close() error { +func (p *eventPlaylistWriter) Close() error { f, err := os.OpenFile(p.filename, os.O_WRONLY|os.O_APPEND, fs.ModeAppend) if err != nil { return err @@ -87,3 +129,65 @@ func (p *PlaylistWriter) Close() error { _, err = f.WriteString("#EXT-X-ENDLIST\n") return err } + +func NewLivePlaylistWriter(filename string, targetDuration int, windowSize int) (PlaylistWriter, error) { + p := &livePlaylistWriter{ + basePlaylistWriter: basePlaylistWriter{ + filename: filename, + targetDuration: targetDuration, + }, + windowSize: windowSize, + livePlaylistSegments: list.New(), + } + + p.livePlaylistHeader = p.createHeader(PlaylistTypeLive) + + return p, nil +} + +func (p *livePlaylistWriter) Append(dateTime time.Time, duration float64, filename string) error { + f, err := os.Create(p.filename) + if err != nil { + return err + } + defer f.Close() + + segmentStr := p.createSegmentEntry(dateTime, duration, filename) + p.livePlaylistSegments.PushBack(segmentStr) + + for p.livePlaylistSegments.Len() > p.windowSize { + p.livePlaylistSegments.Remove(p.livePlaylistSegments.Front()) + p.mediaSeq++ + } + + _, err = f.WriteString(p.generatePlaylist()) + return err +} + +func (p *livePlaylistWriter) Close() error { + f, err := os.Create(p.filename) + if err != nil { + return err + } + defer f.Close() + + _, err = f.WriteString(p.generatePlaylist()) + if err != nil { + return err + } + + _, err = f.WriteString("#EXT-X-ENDLIST\n") + return err +} + +func (p *livePlaylistWriter) generatePlaylist() string { + var sb strings.Builder + sb.WriteString(p.livePlaylistHeader) + sb.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n", p.mediaSeq)) + for elem := p.livePlaylistSegments.Front(); elem != nil; elem = elem.Next() { + segmentStr := elem.Value.(string) + sb.WriteString(segmentStr) + } + + return sb.String() +} diff --git a/pkg/pipeline/sink/m3u8/writer_test.go b/pkg/pipeline/sink/m3u8/writer_test.go index 385b5689..8423fd30 100644 --- a/pkg/pipeline/sink/m3u8/writer_test.go +++ b/pkg/pipeline/sink/m3u8/writer_test.go @@ -23,10 +23,10 @@ import ( "github.com/stretchr/testify/require" ) -func TestPlaylistWriter(t *testing.T) { +func TestEventPlaylistWriter(t *testing.T) { playlistName := "playlist.m3u8" - w, err := NewPlaylistWriter(playlistName, 6) + w, err := NewEventPlaylistWriter(playlistName, 6) require.NoError(t, err) t.Cleanup(func() { _ = os.Remove(playlistName) }) @@ -44,6 +44,42 @@ func TestPlaylistWriter(t *testing.T) { b, err := os.ReadFile(playlistName) require.NoError(t, err) - expected := "#EXTM3U\n#EXT-X-VERSION:4\n#EXT-X-PLAYLIST-TYPE:EVENT\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-TARGETDURATION:6\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:04.814Z\n#EXTINF:5.994,\nplaylist_00000.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:10.808Z\n#EXTINF:5.994,\nplaylist_00001.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:16.802Z\n#EXTINF:5.994,\nplaylist_00002.ts\n#EXT-X-ENDLIST\n" + expected := "#EXTM3U\n#EXT-X-VERSION:4\n#EXT-X-PLAYLIST-TYPE:EVENT\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:6\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:04.814Z\n#EXTINF:5.994,\nplaylist_00000.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:10.808Z\n#EXTINF:5.994,\nplaylist_00001.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:16.802Z\n#EXTINF:5.994,\nplaylist_00002.ts\n#EXT-X-ENDLIST\n" + require.Equal(t, expected, string(b)) +} + +func TestLivePlaylistWriter(t *testing.T) { + playlistName := "playlist.m3u8" + + w, err := NewLivePlaylistWriter(playlistName, 6, 3) + require.NoError(t, err) + + t.Cleanup(func() { _ = os.Remove(playlistName) }) + + now := time.Unix(0, 1683154504814142000) + duration := 5.994 + + for i := 0; i < 2; i++ { + require.NoError(t, w.Append(now, duration, fmt.Sprintf("playlist_0000%d.ts", i))) + now = now.Add(time.Millisecond * 5994) + } + + b, err := os.ReadFile(playlistName) + require.NoError(t, err) + + expected := "#EXTM3U\n#EXT-X-VERSION:4\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:6\n#EXT-X-MEDIA-SEQUENCE:0\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:04.814Z\n#EXTINF:5.994,\nplaylist_00000.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:10.808Z\n#EXTINF:5.994,\nplaylist_00001.ts\n" + require.Equal(t, expected, string(b)) + + for i := 2; i < 4; i++ { + require.NoError(t, w.Append(now, duration, fmt.Sprintf("playlist_0000%d.ts", i))) + now = now.Add(time.Millisecond * 5994) + } + + require.NoError(t, w.Close()) + + b, err = os.ReadFile(playlistName) + require.NoError(t, err) + + expected = "#EXTM3U\n#EXT-X-VERSION:4\n#EXT-X-ALLOW-CACHE:NO\n#EXT-X-TARGETDURATION:6\n#EXT-X-MEDIA-SEQUENCE:1\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:04.814Z\n#EXTINF:5.994,\nplaylist_00001.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:16.802Z\n#EXTINF:5.994,\nplaylist_00002.ts\n#EXT-X-PROGRAM-DATE-TIME:2023-05-03T22:55:22.796Z\n#EXTINF:5.994,\nplaylist_00003.ts\n#EXT-X-ENDLIST\n" require.Equal(t, expected, string(b)) } diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index d8e2f99b..dce2d8ff 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -33,7 +33,10 @@ import ( "github.com/livekit/protocol/logger" ) -const maxPendingUploads = 100 +const ( + maxPendingUploads = 100 + defaultLivePlaylistWindow = 5 +) type SegmentSink struct { uploader.Uploader @@ -42,7 +45,8 @@ type SegmentSink struct { conf *config.PipelineConfig callbacks *gstreamer.Callbacks - playlist *m3u8.PlaylistWriter + playlist m3u8.PlaylistWriter + livePlaylist m3u8.PlaylistWriter currentItemStartTimestamp int64 currentItemFilename string startDate time.Time @@ -62,17 +66,27 @@ type SegmentUpdate struct { func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.SegmentConfig, callbacks *gstreamer.Callbacks) (*SegmentSink, error) { playlistName := path.Join(o.LocalDir, o.PlaylistFilename) - playlist, err := m3u8.NewPlaylistWriter(playlistName, o.SegmentDuration) + playlist, err := m3u8.NewEventPlaylistWriter(playlistName, o.SegmentDuration) if err != nil { return nil, err } + var livePlaylist m3u8.PlaylistWriter + if o.LivePlaylistFilename != "" { + playlistName = path.Join(o.LocalDir, o.LivePlaylistFilename) + livePlaylist, err = m3u8.NewLivePlaylistWriter(playlistName, o.SegmentDuration, defaultLivePlaylistWindow) + if err != nil { + return nil, err + } + } + return &SegmentSink{ Uploader: u, SegmentConfig: o, conf: p, callbacks: callbacks, playlist: playlist, + livePlaylist: livePlaylist, openSegmentsStartTime: make(map[string]int64), endedSegments: make(chan SegmentUpdate, maxPendingUploads), done: core.NewFuse(), @@ -115,6 +129,15 @@ func (s *SegmentSink) Start() error { if err != nil { return } + + if s.LivePlaylistFilename != "" { + playlistLocalPath = path.Join(s.LocalDir, s.LivePlaylistFilename) + playlistStoragePath = path.Join(s.StorageDir, s.LivePlaylistFilename) + s.SegmentsInfo.LivePlaylistLocation, _, err = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + if err != nil { + return + } + } } }() @@ -203,6 +226,11 @@ func (s *SegmentSink) endSegment(filename string, endTime int64) error { if err := s.playlist.Append(segmentStartDate, duration, filename); err != nil { return err } + if s.livePlaylist != nil { + if err := s.livePlaylist.Append(segmentStartDate, duration, filename); err != nil { + return err + } + } return nil } @@ -221,6 +249,17 @@ func (s *SegmentSink) Close() error { playlistStoragePath := path.Join(s.StorageDir, s.PlaylistFilename) s.SegmentsInfo.PlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + if s.livePlaylist != nil { + if err := s.livePlaylist.Close(); err != nil { + logger.Errorw("failed to send EOS to live playlist writer", err) + } + + // upload the finalized live playlist + playlistLocalPath := path.Join(s.LocalDir, s.LivePlaylistFilename) + playlistStoragePath := path.Join(s.StorageDir, s.LivePlaylistFilename) + s.SegmentsInfo.LivePlaylistLocation, _, _ = s.Upload(playlistLocalPath, playlistStoragePath, s.OutputType, false) + } + if !s.DisableManifest { manifestLocalPath := fmt.Sprintf("%s.json", playlistLocalPath) manifestStoragePath := fmt.Sprintf("%s.json", playlistStoragePath) diff --git a/test/ffprobe.go b/test/ffprobe.go index c20fc780..0be62149 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -104,7 +104,7 @@ func ffprobe(input string) (*FFProbeInfo, error) { return info, err } -func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.EgressInfo, egressType types.EgressType, withMuting bool, sourceFramerate float64) { +func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.EgressInfo, egressType types.EgressType, withMuting bool, sourceFramerate float64, live bool) { var info *FFProbeInfo var err error @@ -165,8 +165,13 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.Len(t, res.GetSegmentResults(), 1) segments := res.GetSegmentResults()[0] - expected := int64(math.Ceil(actual / float64(p.GetSegmentConfig().SegmentDuration))) - require.InDelta(t, expected, segments.SegmentCount, 1) + + if live { + require.InDelta(t, float64(5*p.GetSegmentConfig().SegmentDuration), actual, float64(p.GetSegmentConfig().SegmentDuration)) + } else { + expected := int64(math.Ceil(actual / float64(p.GetSegmentConfig().SegmentDuration))) + require.InDelta(t, expected, segments.SegmentCount, 1) + } case types.EgressTypeWebsocket: size, err := strconv.Atoi(info.Format.Size) diff --git a/test/file.go b/test/file.go index cde38e25..3ab8844b 100644 --- a/test/file.go +++ b/test/file.go @@ -86,5 +86,5 @@ func (r *Runner) verifyFile(t *testing.T, p *config.PipelineConfig, res *livekit } // verify - verify(t, localPath, p, res, types.EgressTypeFile, r.Muting, r.sourceFramerate) + verify(t, localPath, p, res, types.EgressTypeFile, r.Muting, r.sourceFramerate, false) } diff --git a/test/integration.go b/test/integration.go index 20273b9d..08bfe37c 100644 --- a/test/integration.go +++ b/test/integration.go @@ -76,6 +76,7 @@ type testCase struct { // used by segmented file tests playlist string + live_playlist string filenameSuffix livekit.SegmentedFileSuffix // used by sdk tests diff --git a/test/multi.go b/test/multi.go index b6b914e3..ade90e27 100644 --- a/test/multi.go +++ b/test/multi.go @@ -63,6 +63,6 @@ func (r *Runner) runMultipleTest( r.verifyFile(t, p, res) } if segments { - r.verifySegments(t, p, filenameSuffix, res) + r.verifySegments(t, p, filenameSuffix, res, false) } } diff --git a/test/room_composite.go b/test/room_composite.go index 35e6b937..245e7e16 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -204,6 +204,7 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { }, filename: "r_{room_name}_{time}", playlist: "r_{room_name}_{time}.m3u8", + live_playlist: "r_live_{room_name}_{time}.m3u8", filenameSuffix: livekit.SegmentedFileSuffix_TIMESTAMP, expectVideoTranscoding: true, }, @@ -218,9 +219,10 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { }, } { segmentOutput := &livekit.SegmentedFileOutput{ - FilenamePrefix: r.getFilePath(test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, + FilenamePrefix: r.getFilePath(test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.live_playlist, + FilenameSuffix: test.filenameSuffix, } if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.GCPUpload != nil { segmentOutput.FilenamePrefix = test.filename diff --git a/test/segments.go b/test/segments.go index 754c8271..747b8114 100644 --- a/test/segments.go +++ b/test/segments.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/pipeline/sink/m3u8" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" @@ -48,13 +49,13 @@ func (r *Runner) runSegmentsTest(t *testing.T, req *rpc.StartEgressRequest, test p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) + r.verifySegments(t, p, test.filenameSuffix, res, test.live_playlist != "") if !test.audioOnly { require.Equal(t, test.expectVideoTranscoding, p.VideoTranscoding) } - r.verifySegments(t, p, test.filenameSuffix, res) } -func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, res *livekit.EgressInfo) { +func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, res *livekit.EgressInfo, enableLivePlaylist bool) { // egress info require.Equal(t, res.Error == "", res.Status != livekit.EgressStatus_EGRESS_FAILED) require.NotZero(t, res.StartedAt) @@ -64,37 +65,56 @@ func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filename require.Len(t, res.GetSegmentResults(), 1) segments := res.GetSegmentResults()[0] - require.NotEmpty(t, segments.PlaylistName) - require.NotEmpty(t, segments.PlaylistLocation) require.Greater(t, segments.Size, int64(0)) require.Greater(t, segments.Duration, int64(0)) - storedPlaylistPath := segments.PlaylistName - localPlaylistPath := segments.PlaylistName + r.verifySegmentOutput(t, p, filenameSuffix, segments.PlaylistName, segments.PlaylistLocation, int(segments.SegmentCount), res, m3u8.PlaylistTypeEvent) + r.verifyManifest(t, p, segments.PlaylistName) + if enableLivePlaylist { + r.verifySegmentOutput(t, p, filenameSuffix, segments.LivePlaylistName, segments.LivePlaylistLocation, 5, res, m3u8.PlaylistTypeLive) + } +} + +func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { + localPlaylistPath := fmt.Sprintf("%s/%s", r.FilePrefix, plName) + + if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { + download(t, uploadConfig, localPlaylistPath+".json", plName+".json") + } +} + +func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { + require.NotEmpty(t, plName) + require.NotEmpty(t, plLocation) + + storedPlaylistPath := plName + localPlaylistPath := plName // download from cloud storage if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { - base := storedPlaylistPath[:len(storedPlaylistPath)-5] localPlaylistPath = fmt.Sprintf("%s/%s", r.FilePrefix, storedPlaylistPath) download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) - download(t, uploadConfig, localPlaylistPath+".json", storedPlaylistPath+".json") - for i := 0; i < int(segments.SegmentCount); i++ { - cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) - localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) - download(t, uploadConfig, localPath, cloudPath) + if plType == m3u8.PlaylistTypeEvent { + // Only download segments once + base := storedPlaylistPath[:len(storedPlaylistPath)-5] + for i := 0; i < int(segmentCount); i++ { + cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) + localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) + download(t, uploadConfig, localPath, cloudPath) + } } } - verifyPlaylistProgramDateTime(t, filenameSuffix, localPlaylistPath) + verifyPlaylistProgramDateTime(t, filenameSuffix, localPlaylistPath, plType) // verify - verify(t, localPlaylistPath, p, res, types.EgressTypeSegments, r.Muting, r.sourceFramerate) + verify(t, localPlaylistPath, p, res, types.EgressTypeSegments, r.Muting, r.sourceFramerate, plType == m3u8.PlaylistTypeLive) } -func verifyPlaylistProgramDateTime(t *testing.T, filenameSuffix livekit.SegmentedFileSuffix, localPlaylistPath string) { +func verifyPlaylistProgramDateTime(t *testing.T, filenameSuffix livekit.SegmentedFileSuffix, localPlaylistPath string, plType m3u8.PlaylistType) { p, err := readPlaylist(localPlaylistPath) require.NoError(t, err) - require.Equal(t, "EVENT", p.MediaType) + require.Equal(t, string(plType), p.MediaType) require.True(t, p.Closed) now := time.Now() @@ -149,10 +169,20 @@ func readPlaylist(filename string) (*Playlist, error) { return nil, err } + var segmentLineStart = 5 + var i = 1 + lines := strings.Split(string(b), "\n") - version, _ := strconv.Atoi(strings.Split(lines[1], ":")[1]) - mediaType := strings.Split(lines[2], ":")[1] - targetDuration, _ := strconv.Atoi(strings.Split(lines[5], ":")[1]) + version, _ := strconv.Atoi(strings.Split(lines[i], ":")[1]) + i++ + var mediaType string + if strings.Contains(string(b), "#EXT-X-PLAYLIST-TYPE") { + mediaType = strings.Split(lines[i], ":")[1] + segmentLineStart++ + i++ + } + i++ // #EXT-X-ALLOW-CACHE:NO hardcoded + targetDuration, _ := strconv.Atoi(strings.Split(lines[i], ":")[1]) p := &Playlist{ Version: version, @@ -161,7 +191,7 @@ func readPlaylist(filename string) (*Playlist, error) { Segments: make([]*Segment, 0), } - for i := 6; i < len(lines)-3; i += 3 { + for i := segmentLineStart; i < len(lines)-3; i += 3 { startTime, _ := time.Parse("2006-01-02T15:04:05.999Z07:00", strings.SplitN(lines[i], ":", 2)[1]) durStr := strings.Split(lines[i+1], ":")[1] durStr = durStr[:len(durStr)-1] // remove trailing comma diff --git a/test/stream.go b/test/stream.go index 3476f8b1..db7b3839 100644 --- a/test/stream.go +++ b/test/stream.go @@ -127,6 +127,6 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * func (r *Runner) verifyStreams(t *testing.T, p *config.PipelineConfig, urls ...string) { for _, url := range urls { - verify(t, url, p, nil, types.EgressTypeStream, false, r.sourceFramerate) + verify(t, url, p, nil, types.EgressTypeStream, false, r.sourceFramerate, false) } } diff --git a/test/track.go b/test/track.go index af66e6f7..3f116cb7 100644 --- a/test/track.go +++ b/test/track.go @@ -165,7 +165,7 @@ func (r *Runner) testTrackStream(t *testing.T) { time.Sleep(time.Second * 30) res := r.stopEgress(t, egressID) - verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate) + verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate, false) }) if r.Short { return diff --git a/test/track_composite.go b/test/track_composite.go index 8843bbbe..7bda4ab9 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -160,11 +160,12 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { playlist: "tcs_{publisher_identity}_vp8_{time}.m3u8", }, { - name: "H264", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - filename: "tcs_{room_name}_h264_{time}", - playlist: "tcs_{room_name}_h264_{time}.m3u8", + name: "H264", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + filename: "tcs_{room_name}_h264_{time}", + playlist: "tcs_{room_name}_h264_{time}.m3u8", + live_playlist: "tcs_live_{room_name}_h264_{time}.m3u8", }, { name: "Audio Only", @@ -185,9 +186,10 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { } segmentOutput := &livekit.SegmentedFileOutput{ - FilenamePrefix: r.getFilePath(test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, + FilenamePrefix: r.getFilePath(test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.live_playlist, + FilenameSuffix: test.filenameSuffix, } if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { segmentOutput.FilenamePrefix = test.filename