From e71c86df59334eef0d0efe9dca411d0552c281d9 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 12:59:35 -0700 Subject: [PATCH 1/8] allow StartEgress override in integration tests --- test/integration.go | 2 +- test/integration_test.go | 3 ++- test/room_composite.go | 2 +- test/runner.go | 10 +++++++++- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/test/integration.go b/test/integration.go index fd96230e..670b5bbf 100644 --- a/test/integration.go +++ b/test/integration.go @@ -225,7 +225,7 @@ func (r *Runner) startEgress(t *testing.T, req *rpc.StartEgressRequest) string { func (r *Runner) sendRequest(t *testing.T, req *rpc.StartEgressRequest) *livekit.EgressInfo { // send start request - info, err := r.client.StartEgress(context.Background(), "", req) + info, err := r.start(context.Background(), req) // check returned egress info require.NoError(t, err) diff --git a/test/integration_test.go b/test/integration_test.go index 1f96c9a4..9430a0a0 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -51,5 +51,6 @@ func TestEgress(t *testing.T) { svc, err := server.NewServer(r.ServiceConfig, bus, ioClient) require.NoError(t, err) - r.Run(t, svc, bus, rfs) + r.StartServer(t, svc, bus, rfs) + r.RunTests(t) } diff --git a/test/room_composite.go b/test/room_composite.go index 07c5f77a..3ae8c149 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -169,7 +169,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { }, } - info, err := r.client.StartEgress(context.Background(), "", req) + info, err := r.start(context.Background(), req) require.NoError(t, err) require.Empty(t, info.Error) require.NotEmpty(t, info.EgressId) diff --git a/test/runner.go b/test/runner.go index 9b6a1452..091a4c77 100644 --- a/test/runner.go +++ b/test/runner.go @@ -17,6 +17,7 @@ package test import ( + "context" "encoding/json" "fmt" "io/fs" @@ -38,6 +39,8 @@ import ( ) type Runner struct { + StartEgress func(ctx context.Context, request *rpc.StartEgressRequest) (*livekit.EgressInfo, error) `yaml:"-"` + svc Server `yaml:"-"` client rpc.EgressClient `yaml:"-"` room *lksdk.Room `yaml:"-"` @@ -154,7 +157,7 @@ func NewRunner(t *testing.T) *Runner { return r } -func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs fs.FS) { +func (r *Runner) StartServer(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs fs.FS) { lksdk.SetLogger(logger.LogRLogger(logr.Discard())) r.svc = svc t.Cleanup(func() { @@ -177,6 +180,9 @@ func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs psrpcClient, err := rpc.NewEgressClient(rpc.ClientParams{Bus: bus}) require.NoError(t, err) + r.StartEgress = func(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { + return psrpcClient.StartEgress(ctx, "", req) + } // start templates handler err = r.svc.StartTemplatesServer(templateFs) @@ -201,7 +207,9 @@ func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs require.Len(t, status, 1) require.Contains(t, status, "CpuLoad") } +} +func (r *Runner) RunTests(t *testing.T) { // run tests r.testRoomComposite(t) r.testWeb(t) From 3735d0f1ccd552dc7ddbc573c9f53e40845bfb3f Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 13:05:49 -0700 Subject: [PATCH 2/8] fix StartEgress calls --- test/integration.go | 2 +- test/room_composite.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration.go b/test/integration.go index 670b5bbf..3549525f 100644 --- a/test/integration.go +++ b/test/integration.go @@ -225,7 +225,7 @@ func (r *Runner) startEgress(t *testing.T, req *rpc.StartEgressRequest) string { func (r *Runner) sendRequest(t *testing.T, req *rpc.StartEgressRequest) *livekit.EgressInfo { // send start request - info, err := r.start(context.Background(), req) + info, err := r.StartEgress(context.Background(), req) // check returned egress info require.NoError(t, err) diff --git a/test/room_composite.go b/test/room_composite.go index 3ae8c149..5a434e8c 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -169,7 +169,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { }, } - info, err := r.start(context.Background(), req) + info, err := r.StartEgress(context.Background(), req) require.NoError(t, err) require.Empty(t, info.Error) require.NotEmpty(t, info.EgressId) From d7a9a0785682efceed83d5715d7563853acc518f Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 13:56:04 -0700 Subject: [PATCH 3/8] update upload locations --- test/integration.go | 11 ++------- test/room_composite.go | 53 +++++++++++++++++++++++++---------------- test/track.go | 2 +- test/track_composite.go | 51 +++++++++++++++++++++++---------------- test/web.go | 40 +++++++++++++++++++------------ 5 files changed, 91 insertions(+), 66 deletions(-) diff --git a/test/integration.go b/test/integration.go index 3549525f..0407eea1 100644 --- a/test/integration.go +++ b/test/integration.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "os" - "path" "strings" "testing" "time" @@ -46,6 +45,8 @@ const ( badStreamUrl2 = "rtmp://localhost:1936/live/stream" redactedBadUrl2 = "rtmp://localhost:1936/live/{st...am}" webUrl = "https://videoplayer-2k23.vercel.app/videos/eminem" + + uploadPrefix = "integration" ) var ( @@ -327,11 +328,3 @@ func (r *Runner) stopEgress(t *testing.T, egressID string) *livekit.EgressInfo { return res } - -func (r *Runner) getFilePath(filename string) string { - if r.S3 != nil || r.Azure != nil || r.GCP != nil || r.AliOSS != nil { - return filename - } - - return path.Join(r.FilePrefix, filename) -} diff --git a/test/room_composite.go b/test/room_composite.go index 5a434e8c..2507b726 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -18,6 +18,7 @@ package test import ( "context" + "path" "testing" "time" @@ -83,14 +84,19 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { }, } { r.runRoomTest(t, test.name, types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { - fileOutput := &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: r.getFilePath(test.filename), - } + var fileOutput *livekit.EncodedFileOutput if r.S3Upload != nil { - fileOutput.Filepath = test.filename - fileOutput.Output = &livekit.EncodedFileOutput_S3{ - S3: r.S3Upload, + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(uploadPrefix, test.filename), + Output: &livekit.EncodedFileOutput_S3{ + S3: r.S3Upload, + }, + } + } else { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(r.FilePrefix, test.filename), } } @@ -219,16 +225,23 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { audioOnly: true, }, } { - segmentOutput := &livekit.SegmentedFileOutput{ - FilenamePrefix: r.getFilePath(test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - } + var segmentOutput *livekit.SegmentedFileOutput if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.GCPUpload != nil { - segmentOutput.FilenamePrefix = test.filename - segmentOutput.Output = &livekit.SegmentedFileOutput_Gcp{ - Gcp: r.GCPUpload, + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(uploadPrefix, test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.livePlaylist, + FilenameSuffix: test.filenameSuffix, + Output: &livekit.SegmentedFileOutput_Gcp{ + Gcp: r.GCPUpload, + }, + } + } else { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(r.FilePrefix, test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.livePlaylist, + FilenameSuffix: test.filenameSuffix, } } @@ -273,12 +286,10 @@ func (r *Runner) testRoomCompositeImages(t *testing.T) { }, } { imageOutput := &livekit.ImageOutput{ - FilenamePrefix: r.getFilePath(test.filename), + FilenamePrefix: path.Join(r.FilePrefix, test.filename), FilenameSuffix: test.imageFilenameSuffix, } - // TODO upload - room := &livekit.RoomCompositeEgressRequest{ RoomName: r.RoomName, Layout: "grid-dark", @@ -317,13 +328,13 @@ func (r *Runner) testRoomCompositeMulti(t *testing.T) { Layout: "grid-light", FileOutputs: []*livekit.EncodedFileOutput{{ FileType: livekit.EncodedFileType_MP4, - Filepath: r.getFilePath("rc_multiple_{time}"), + Filepath: path.Join(r.FilePrefix, "rc_multiple_{time}"), }}, ImageOutputs: []*livekit.ImageOutput{{ CaptureInterval: 10, Width: 1280, Height: 720, - FilenamePrefix: r.getFilePath("rc_image"), + FilenamePrefix: path.Join(r.FilePrefix, "rc_image"), }}, }, }, diff --git a/test/track.go b/test/track.go index b20f7119..4a082650 100644 --- a/test/track.go +++ b/test/track.go @@ -94,7 +94,7 @@ func (r *Runner) testTrackFile(t *testing.T) { TrackId: trackID, Output: &livekit.TrackEgressRequest_File{ File: &livekit.DirectFileOutput{ - Filepath: r.getFilePath(test.filename), + Filepath: path.Join(r.FilePrefix, test.filename), }, }, } diff --git a/test/track_composite.go b/test/track_composite.go index 6bb8aa64..1bc1ab09 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -17,6 +17,7 @@ package test import ( + "path" "testing" "github.com/livekit/egress/pkg/types" @@ -80,14 +81,19 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { aID = audioTrackID } - fileOutput := &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: r.getFilePath(test.filename), - } + var fileOutput *livekit.EncodedFileOutput if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil { - fileOutput.Filepath = test.filename - fileOutput.Output = &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(uploadPrefix, test.filename), + Output: &livekit.EncodedFileOutput_Azure{ + Azure: r.AzureUpload, + }, + } + } else { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(r.FilePrefix, test.filename), } } @@ -186,16 +192,23 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { aID = audioTrackID } - segmentOutput := &livekit.SegmentedFileOutput{ - FilenamePrefix: r.getFilePath(test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - } + var segmentOutput *livekit.SegmentedFileOutput if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { - segmentOutput.FilenamePrefix = test.filename - segmentOutput.Output = &livekit.SegmentedFileOutput_S3{ - S3: r.S3Upload, + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(uploadPrefix, test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.livePlaylist, + FilenameSuffix: test.filenameSuffix, + Output: &livekit.SegmentedFileOutput_S3{ + S3: r.S3Upload, + }, + } + } else { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(r.FilePrefix, test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.livePlaylist, + FilenameSuffix: test.filenameSuffix, } } @@ -257,11 +270,9 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) { CaptureInterval: 5, Width: 1280, Height: 720, - FilenamePrefix: r.getFilePath(test.filename), + FilenamePrefix: path.Join(r.FilePrefix, test.filename), } - // TODO Upload - trackRequest := &livekit.TrackCompositeEgressRequest{ RoomName: r.room.Name(), AudioTrackId: aID, @@ -308,7 +319,7 @@ func (r *Runner) testTrackCompositeMulti(t *testing.T) { Protocol: livekit.StreamProtocol_RTMP, }}, SegmentOutputs: []*livekit.SegmentedFileOutput{{ - FilenamePrefix: r.getFilePath("tc_multiple_{time}"), + FilenamePrefix: path.Join(r.FilePrefix, "tc_multiple_{time}"), PlaylistName: "tc_multiple_{time}", }}, }, diff --git a/test/web.go b/test/web.go index 8fae9a13..ae154990 100644 --- a/test/web.go +++ b/test/web.go @@ -17,6 +17,7 @@ package test import ( + "path" "testing" "github.com/livekit/protocol/livekit" @@ -49,13 +50,17 @@ func (r *Runner) testWebFile(t *testing.T) { } r.runWebTest(t, "2A/Web/File", func(t *testing.T) { - fileOutput := &livekit.EncodedFileOutput{ - Filepath: r.getFilePath("web_{time}"), - } + var fileOutput *livekit.EncodedFileOutput if r.GCPUpload != nil { - fileOutput.Filepath = "web_{time}" - fileOutput.Output = &livekit.EncodedFileOutput_Gcp{ - Gcp: r.GCPUpload, + fileOutput = &livekit.EncodedFileOutput{ + Filepath: path.Join(uploadPrefix, "web_{time}"), + Output: &livekit.EncodedFileOutput_Gcp{ + Gcp: r.GCPUpload, + }, + } + } else { + fileOutput = &livekit.EncodedFileOutput{ + Filepath: path.Join(r.FilePrefix, "web_{time}"), } } @@ -105,14 +110,19 @@ func (r *Runner) testWebSegments(t *testing.T) { } r.runWebTest(t, "2C/Web/Segments", func(t *testing.T) { - segmentOutput := &livekit.SegmentedFileOutput{ - FilenamePrefix: r.getFilePath("web_{time}"), - PlaylistName: "web_{time}.m3u8", - } + var segmentOutput *livekit.SegmentedFileOutput if r.AzureUpload != nil { - segmentOutput.FilenamePrefix = "web_{time}" - segmentOutput.Output = &livekit.SegmentedFileOutput_Azure{ - Azure: r.AzureUpload, + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(uploadPrefix, "web_{time}"), + PlaylistName: "web_{time}.m3u8", + Output: &livekit.SegmentedFileOutput_Azure{ + Azure: r.AzureUpload, + }, + } + } else { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(r.FilePrefix, "web_{time}"), + PlaylistName: "web_{time}.m3u8", } } @@ -146,10 +156,10 @@ func (r *Runner) testWebMulti(t *testing.T) { Url: webUrl, FileOutputs: []*livekit.EncodedFileOutput{{ FileType: livekit.EncodedFileType_MP4, - Filepath: r.getFilePath("web_multiple_{time}"), + Filepath: path.Join(r.FilePrefix, "web_multiple_{time}"), }}, SegmentOutputs: []*livekit.SegmentedFileOutput{{ - FilenamePrefix: r.getFilePath("web_multiple_{time}"), + FilenamePrefix: path.Join(r.FilePrefix, "web_multiple_{time}"), PlaylistName: "web_multiple_{time}", }}, }, From 0bb0bbd075d82087e09855dbdf7d1d8baa0e89b2 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 14:03:28 -0700 Subject: [PATCH 4/8] forgot about participant --- test/participant.go | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/test/participant.go b/test/participant.go index 1677a9c0..1d60b762 100644 --- a/test/participant.go +++ b/test/participant.go @@ -17,6 +17,7 @@ package test import ( + "path" "testing" "time" @@ -95,14 +96,19 @@ func (r *Runner) testParticipantFile(t *testing.T) { }, } { r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) { - fileOutput := &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: r.getFilePath(test.filename), - } + var fileOutput *livekit.EncodedFileOutput if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil { - fileOutput.Filepath = test.filename - fileOutput.Output = &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(uploadPrefix, test.filename), + Output: &livekit.EncodedFileOutput_Azure{ + Azure: r.AzureUpload, + }, + } + } else { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(r.FilePrefix, test.filename), } } @@ -193,15 +199,21 @@ func (r *Runner) testParticipantSegments(t *testing.T) { } { r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) { - segmentOutput := &livekit.SegmentedFileOutput{ - FilenamePrefix: r.getFilePath(test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, - } + var segmentOutput *livekit.SegmentedFileOutput if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { - segmentOutput.FilenamePrefix = test.filename - segmentOutput.Output = &livekit.SegmentedFileOutput_S3{ - S3: r.S3Upload, + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(uploadPrefix, test.filename), + PlaylistName: test.playlist, + FilenameSuffix: test.filenameSuffix, + Output: &livekit.SegmentedFileOutput_S3{ + S3: r.S3Upload, + }, + } + } else { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(r.FilePrefix, test.filename), + PlaylistName: test.playlist, + FilenameSuffix: test.filenameSuffix, } } @@ -256,7 +268,7 @@ func (r *Runner) testParticipantMulti(t *testing.T) { Identity: identity, FileOutputs: []*livekit.EncodedFileOutput{{ FileType: livekit.EncodedFileType_MP4, - Filepath: r.getFilePath("participant_multiple_{time}"), + Filepath: path.Join(r.FilePrefix, "participant_multiple_{time}"), }}, StreamOutputs: []*livekit.StreamOutput{{ Protocol: livekit.StreamProtocol_RTMP, From 501a8c3574ff55719c48d2b0cf4f14bb2c4e002c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 14:22:14 -0700 Subject: [PATCH 5/8] more cleaning, updates --- .github/workflows/publish-egress.yaml | 2 +- pkg/pipeline/sink/uploader/azure.go | 3 ++- test/file.go | 5 +++-- test/images.go | 14 +++++++------- test/integration.go | 4 ++-- test/segments.go | 7 ++++--- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/.github/workflows/publish-egress.yaml b/.github/workflows/publish-egress.yaml index 0b87f447..b19e6fd5 100644 --- a/.github/workflows/publish-egress.yaml +++ b/.github/workflows/publish-egress.yaml @@ -33,7 +33,7 @@ jobs: ~/go/pkg/mod ~/go/bin ~/.cache - key: ${{ runner.os }}-egress-${{ hashFiles('**/go.sum') }} + key: "${{ runner.os }}-egress-${{ hashFiles('**/go.sum') }}" restore-keys: ${{ runner.os }}-egress - name: Docker metadata diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index 130abd3a..a84e178f 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "os" + "path" "github.com/Azure/azure-storage-blob-go/azblob" @@ -88,5 +89,5 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType return "", 0, errors.ErrUploadFailed("Azure", err) } - return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil + return path.Join(u.container, storageFilepath), stat.Size(), nil } diff --git a/test/file.go b/test/file.go index 2257a317..b77dc496 100644 --- a/test/file.go +++ b/test/file.go @@ -17,7 +17,7 @@ package test import ( - "fmt" + "path" "strings" "testing" "time" @@ -77,10 +77,11 @@ func (r *Runner) verifyFile(t *testing.T, p *config.PipelineConfig, res *livekit localPath := fileRes.Filename require.NotEmpty(t, storagePath) require.False(t, strings.Contains(storagePath, "{")) + storageFilename := path.Base(storagePath) // download from cloud storage if uploadConfig := p.GetFileConfig().UploadConfig; uploadConfig != nil { - localPath = fmt.Sprintf("%s/%s", r.FilePrefix, storagePath) + localPath = path.Join(r.FilePrefix, storageFilename) download(t, uploadConfig, localPath, storagePath) download(t, uploadConfig, localPath+".json", storagePath+".json") } diff --git a/test/images.go b/test/images.go index f0165956..645aeaec 100644 --- a/test/images.go +++ b/test/images.go @@ -62,15 +62,15 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu // r.verifyManifest(t, p, segments.PlaylistName) } -//func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { -// localPlaylistPath := fmt.Sprintf("%s/%s", r.FilePrefix, plName) +// func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { +// localPlaylistPath := path.Join(r.FilePrefix, plName) // // if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { // download(t, uploadConfig, localPlaylistPath+".json", plName+".json") // } -//} +// } -//func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { +// func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) { // require.NotEmpty(t, plName) // require.NotEmpty(t, plLocation) @@ -79,14 +79,14 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu // // download from cloud storage // if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { -// localPlaylistPath = fmt.Sprintf("%s/%s", r.FilePrefix, storedPlaylistPath) +// localPlaylistPath = path.Join(r.FilePrefix, storedPlaylistPath) // download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) // if plType == m3u8.PlaylistTypeEvent { // // Only download segments once // base := storedPlaylistPath[:len(storedPlaylistPath)-5] // for i := 0; i < int(segmentCount); i++ { // cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) -// localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) +// localPath := path.Join(r.FilePrefix, cloudPath) // download(t, uploadConfig, localPath, cloudPath) // } // } @@ -94,4 +94,4 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu // verify // verify(t, localPlaylistPath, p, res, types.EgressTypeSegments, r.Muting, r.sourceFramerate, plType == m3u8.PlaylistTypeLive) -//} +// } diff --git a/test/integration.go b/test/integration.go index 0407eea1..f1b6100b 100644 --- a/test/integration.go +++ b/test/integration.go @@ -45,8 +45,6 @@ const ( badStreamUrl2 = "rtmp://localhost:1936/live/stream" redactedBadUrl2 = "rtmp://localhost:1936/live/{st...am}" webUrl = "https://videoplayer-2k23.vercel.app/videos/eminem" - - uploadPrefix = "integration" ) var ( @@ -62,6 +60,8 @@ var ( types.MimeTypeVP8: time.Microsecond * 41708, types.MimeTypeVP9: time.Microsecond * 41708, } + + uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02")) ) type testCase struct { diff --git a/test/segments.go b/test/segments.go index 048e097a..78e9c858 100644 --- a/test/segments.go +++ b/test/segments.go @@ -19,6 +19,7 @@ package test import ( "fmt" "os" + "path" "strconv" "strings" "testing" @@ -76,7 +77,7 @@ func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filename } func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { - localPlaylistPath := fmt.Sprintf("%s/%s", r.FilePrefix, plName) + localPlaylistPath := path.Join(r.FilePrefix, plName) if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { download(t, uploadConfig, localPlaylistPath+".json", plName+".json") @@ -92,14 +93,14 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil // download from cloud storage if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { - localPlaylistPath = fmt.Sprintf("%s/%s", r.FilePrefix, storedPlaylistPath) + localPlaylistPath = path.Join(r.FilePrefix, storedPlaylistPath) download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) if plType == m3u8.PlaylistTypeEvent { // Only download segments once base := storedPlaylistPath[:len(storedPlaylistPath)-5] for i := 0; i < segmentCount; i++ { cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) - localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath) + localPath := path.Join(r.FilePrefix, cloudPath) download(t, uploadConfig, localPath, cloudPath) } } From 055dae3a7f7b31a6777b014daf2bcaf549c0c32c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 14:45:55 -0700 Subject: [PATCH 6/8] organize debug files --- pkg/config/base.go | 1 - pkg/pipeline/builder/video.go | 24 ------------------------ pkg/pipeline/debug.go | 15 +++++++++++---- pkg/pipeline/source/sdk.go | 4 ++-- test/segments.go | 3 ++- 5 files changed, 15 insertions(+), 32 deletions(-) diff --git a/pkg/config/base.go b/pkg/config/base.go index 325cadb1..4138f1ed 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -52,7 +52,6 @@ type BaseConfig struct { type DebugConfig struct { EnableProfiling bool `yaml:"enable_profiling"` // create dot file and pprof on internal error PathPrefix string `yaml:"path_prefix"` // filepath prefix for uploads - LogKeyFrames bool `yaml:"log_keyframes"` // log first 15s of keyframes when streaming StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS) } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 6af5e752..c492ea8a 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -542,9 +542,6 @@ func (b *VideoBin) addEncoder() error { x264Enc.SetArg("speed-preset", "veryfast") if b.conf.KeyFrameInterval != 0 { keyframeInterval := uint(b.conf.KeyFrameInterval * float64(b.conf.Framerate)) - if b.conf.Debug.LogKeyFrames { - logger.Debugw("setting key frame interval", "interval", keyframeInterval) - } if err = x264Enc.SetProperty("key-int-max", keyframeInterval); err != nil { return errors.ErrGstPipelineError(err) } @@ -585,27 +582,6 @@ func (b *VideoBin) addEncoder() error { if err = b.bin.AddElements(x264Enc, caps); err != nil { return err } - - if b.conf.GetStreamConfig() != nil && b.conf.Debug.LogKeyFrames { - var firstKeyFrame *time.Duration - x264Enc.GetStaticPad("src").AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn { - buffer := info.GetBuffer() - if !buffer.HasFlags(gst.BufferFlagDeltaUnit) { - clockTime := buffer.PresentationTimestamp().AsDuration() - if firstKeyFrame == nil { - firstKeyFrame = clockTime - } - pts := *clockTime - *firstKeyFrame - logger.Debugw("keyframe generated", "pts", pts) - if pts > time.Second*15 { - return gst.PadProbeRemove - } - } - - return gst.PadProbeOK - }) - } - return nil case types.MimeTypeVP9: diff --git a/pkg/pipeline/debug.go b/pkg/pipeline/debug.go index 64b291d9..e423b7c3 100644 --- a/pkg/pipeline/debug.go +++ b/pkg/pipeline/debug.go @@ -25,7 +25,6 @@ import ( "github.com/go-gst/go-gst/gst" - "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/pipeline/sink/uploader" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" @@ -44,6 +43,7 @@ func (c *Controller) uploadDebugFiles() { } done := make(chan struct{}) + var dotUploaded, pprofUploaded, trackUploaded bool var wg sync.WaitGroup wg.Add(3) @@ -67,9 +67,16 @@ func (c *Controller) uploadDebugFiles() { select { case <-done: logger.Infow("debug files uploaded") - return case <-time.After(time.Second * 3): - logger.Errorw("failed to upload debug files", errors.New("timed out")) + if !dotUploaded { + logger.Warnw("failed to upload dotfile", nil) + } + if !pprofUploaded { + logger.Warnw("failed to upload pprof file", nil) + } + if !trackUploaded { + logger.Warnw("failed to upload track debug files", nil) + } } } @@ -123,7 +130,7 @@ func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExten filename := fmt.Sprintf("%s%s", c.Info.EgressId, fileExtension) local := path.Join(dir, filename) - storage := path.Join(c.Debug.PathPrefix, filename) + storage := path.Join(c.Debug.PathPrefix, c.Info.EgressId, filename) f, err := os.Create(local) if err != nil { diff --git a/pkg/pipeline/source/sdk.go b/pkg/pipeline/source/sdk.go index 748f8f5d..191ebb73 100644 --- a/pkg/pipeline/source/sdk.go +++ b/pkg/pipeline/source/sdk.go @@ -477,9 +477,9 @@ func (s *SDKSource) createWriter( var logFilename string if s.Debug.EnableProfiling { if s.Debug.ToUploadConfig() == nil { - logFilename = path.Join(s.Debug.PathPrefix, fmt.Sprintf("%s.csv", track.ID())) + logFilename = path.Join(s.Debug.PathPrefix, fmt.Sprintf("%s/%s.csv", s.Info.EgressId, track.ID())) } else { - logFilename = path.Join(s.TmpDir, fmt.Sprintf("%s.csv", track.ID())) + logFilename = path.Join(s.TmpDir, fmt.Sprintf("%s/%s.csv", s.Info.EgressId, track.ID())) } } diff --git a/test/segments.go b/test/segments.go index 78e9c858..88104d8e 100644 --- a/test/segments.go +++ b/test/segments.go @@ -90,10 +90,11 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil storedPlaylistPath := plName localPlaylistPath := plName + storedPlaylistName := path.Base(plName) // download from cloud storage if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { - localPlaylistPath = path.Join(r.FilePrefix, storedPlaylistPath) + localPlaylistPath = path.Join(r.FilePrefix, storedPlaylistName) download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) if plType == m3u8.PlaylistTypeEvent { // Only download segments once From 4315da202da211589f53a1136c28a7eccafa67a1 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 15:09:11 -0700 Subject: [PATCH 7/8] fix segment downloads --- test/segments.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/segments.go b/test/segments.go index 88104d8e..dd87bf88 100644 --- a/test/segments.go +++ b/test/segments.go @@ -90,18 +90,17 @@ func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, fil storedPlaylistPath := plName localPlaylistPath := plName - storedPlaylistName := path.Base(plName) // download from cloud storage if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { - localPlaylistPath = path.Join(r.FilePrefix, storedPlaylistName) + localPlaylistPath = path.Join(r.FilePrefix, path.Base(storedPlaylistPath)) download(t, uploadConfig, localPlaylistPath, storedPlaylistPath) if plType == m3u8.PlaylistTypeEvent { // Only download segments once base := storedPlaylistPath[:len(storedPlaylistPath)-5] for i := 0; i < segmentCount; i++ { cloudPath := fmt.Sprintf("%s_%05d.ts", base, i) - localPath := path.Join(r.FilePrefix, cloudPath) + localPath := path.Join(r.FilePrefix, path.Base(cloudPath)) download(t, uploadConfig, localPath, cloudPath) } } From 273147e0af4daff5d0b542766699cb55928c381d Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 25 Jun 2024 15:35:33 -0700 Subject: [PATCH 8/8] fix manifest download --- test/segments.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/segments.go b/test/segments.go index dd87bf88..bdb2943e 100644 --- a/test/segments.go +++ b/test/segments.go @@ -77,7 +77,7 @@ func (r *Runner) verifySegments(t *testing.T, p *config.PipelineConfig, filename } func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) { - localPlaylistPath := path.Join(r.FilePrefix, plName) + localPlaylistPath := path.Join(r.FilePrefix, path.Base(plName)) if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil { download(t, uploadConfig, localPlaylistPath+".json", plName+".json")