diff --git a/test/edge.go b/test/edge.go index a2ed1255..4677d252 100644 --- a/test/edge.go +++ b/test/edge.go @@ -35,7 +35,8 @@ func (r *Runner) testEdgeCases(t *testing.T) { } t.Run("EdgeCases", func(t *testing.T) { - r.testNoPublish(t) + r.testParticipantNoPublish(t) + r.testRoomCompositeStaysOpen(t) r.testRtmpFailure(t) r.testSrtFailure(t) r.testTrackDisconnection(t) @@ -43,7 +44,7 @@ func (r *Runner) testEdgeCases(t *testing.T) { } // ParticipantComposite where the participant never publishes -func (r *Runner) testNoPublish(t *testing.T) { +func (r *Runner) testParticipantNoPublish(t *testing.T) { r.runParticipantTest(t, "ParticipantNoPublish", &testCase{}, func(t *testing.T, identity string) { req := &rpc.StartEgressRequest{ @@ -80,6 +81,47 @@ func (r *Runner) testNoPublish(t *testing.T) { ) } +// Test that the egress continues if a user leaves +func (r *Runner) testRoomCompositeStaysOpen(t *testing.T) { + r.run(t, "RoomCompositeStaysOpen", func(t *testing.T) { + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{ + RoomName: r.RoomName, + Layout: "speaker", + FileOutputs: []*livekit.EncodedFileOutput{{ + Filepath: path.Join(r.FilePrefix, "room_composite_duration_{time}.mp4"), + }}, + }, + }, + } + + info := r.sendRequest(t, req) + time.Sleep(time.Second * 10) + identity := r.room.LocalParticipant.Identity() + r.room.Disconnect() + time.Sleep(time.Second * 10) + + // reconnect the publisher to the room + room, err := lksdk.ConnectToRoom(r.WsUrl, lksdk.ConnectInfo{ + APIKey: r.ApiKey, + APISecret: r.ApiSecret, + RoomName: r.RoomName, + ParticipantName: "egress-sample", + ParticipantIdentity: identity, + }, lksdk.NewRoomCallback()) + require.NoError(t, err) + r.room = room + + r.publishSamples(t, types.MimeTypeOpus, types.MimeTypeVP8) + time.Sleep(time.Second * 10) + + r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_ACTIVE) + r.stopEgress(t, info.EgressId) + }) +} + // RTMP output with no valid urls func (r *Runner) testRtmpFailure(t *testing.T) { r.runRoomTest(t, "RtmpFailure", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { @@ -117,7 +159,7 @@ func (r *Runner) testRtmpFailure(t *testing.T) { // SRT output with a no valid urls func (r *Runner) testSrtFailure(t *testing.T) { - r.runWebTest(t, "SrtFailure", func(t *testing.T) { + r.run(t, "SrtFailure", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_Web{ @@ -151,9 +193,7 @@ func (r *Runner) testSrtFailure(t *testing.T) { // Track composite with data loss due to a disconnection func (r *Runner) testTrackDisconnection(t *testing.T) { - run(t, "TrackDisconnection", func(t *testing.T) { - r.awaitIdle(t) - + r.run(t, "TrackDisconnection", func(t *testing.T) { test := &testCase{ fileType: livekit.EncodedFileType_MP4, audioCodec: types.MimeTypeOpus, diff --git a/test/integration.go b/test/integration.go index 385b5bab..0285b8f3 100644 --- a/test/integration.go +++ b/test/integration.go @@ -30,25 +30,9 @@ import ( "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" - lksdk "github.com/livekit/server-sdk-go/v2" ) -var ( - samples = map[types.MimeType]string{ - types.MimeTypeOpus: "/workspace/test/sample/matrix-trailer.ogg", - types.MimeTypeH264: "/workspace/test/sample/matrix-trailer.h264", - types.MimeTypeVP8: "/workspace/test/sample/matrix-trailer-vp8.ivf", - types.MimeTypeVP9: "/workspace/test/sample/matrix-trailer-vp9.ivf", - } - - frameDurations = map[types.MimeType]time.Duration{ - types.MimeTypeH264: time.Microsecond * 41708, - types.MimeTypeVP8: time.Microsecond * 41708, - types.MimeTypeVP9: time.Microsecond * 41708, - } - - uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02")) -) +var uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02")) type testCase struct { name string @@ -86,6 +70,24 @@ type testCase struct { expectVideoEncoding bool } +func (r *Runner) RunTests(t *testing.T) { + // run tests + r.testRoomComposite(t) + r.testWeb(t) + r.testParticipant(t) + r.testTrackComposite(t) + r.testTrack(t) + r.testEdgeCases(t) +} + +var testNumber int + +func (r *Runner) run(t *testing.T, name string, f func(t *testing.T)) { + r.awaitIdle(t) + testNumber++ + t.Run(fmt.Sprintf("%d/%s", testNumber, name), f) +} + func (r *Runner) awaitIdle(t *testing.T) { r.svc.KillAll() for i := 0; i < 30; i++ { @@ -97,114 +99,6 @@ func (r *Runner) awaitIdle(t *testing.T) { t.Fatal("service not idle after 30s") } -func (r *Runner) publishSamples(t *testing.T, audioCodec, videoCodec types.MimeType) (audioTrackID, videoTrackID string) { - withAudioMuting := false - if videoCodec != "" { - videoTrackID = r.publishSample(t, videoCodec, r.Muting) - } else { - withAudioMuting = r.Muting - } - if audioCodec != "" { - audioTrackID = r.publishSample(t, audioCodec, withAudioMuting) - } - - time.Sleep(time.Second) - return -} - -func (r *Runner) publishSample(t *testing.T, codec types.MimeType, withMuting bool) string { - done := make(chan struct{}) - pub := r.publish(t, codec, done) - trackID := pub.SID() - - t.Cleanup(func() { - _ = r.room.LocalParticipant.UnpublishTrack(trackID) - }) - - if withMuting { - go func() { - muted := false - time.Sleep(time.Second * 15) - for { - select { - case <-done: - return - default: - pub.SetMuted(!muted) - muted = !muted - time.Sleep(time.Second * 10) - } - } - }() - } - - return trackID -} - -func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration) { - if codec == "" { - return - } - - time.AfterFunc(publishAfter, func() { - done := make(chan struct{}) - pub := r.publish(t, codec, done) - if unpublishAfter != 0 { - time.AfterFunc(unpublishAfter-publishAfter, func() { - select { - case <-done: - return - default: - _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) - } - }) - } else { - t.Cleanup(func() { - _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) - }) - } - }) -} - -func (r *Runner) publishSampleWithDisconnection(t *testing.T, codec types.MimeType) string { - done := make(chan struct{}) - pub := r.publish(t, codec, done) - trackID := pub.SID() - - time.AfterFunc(time.Second*10, func() { - pub.SimulateDisconnection(time.Second * 10) - }) - - return trackID -} - -func (r *Runner) publish(t *testing.T, codec types.MimeType, done chan struct{}) *lksdk.LocalTrackPublication { - filename := samples[codec] - frameDuration := frameDurations[codec] - - var pub *lksdk.LocalTrackPublication - opts := []lksdk.ReaderSampleProviderOption{ - lksdk.ReaderTrackWithOnWriteComplete(func() { - close(done) - if pub != nil { - _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) - } - }), - } - - if frameDuration != 0 { - opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration)) - } - - track, err := lksdk.NewLocalFileTrack(filename, opts...) - require.NoError(t, err) - - pub, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: filename}) - require.NoError(t, err) - - return pub -} - func (r *Runner) startEgress(t *testing.T, req *rpc.StartEgressRequest) string { info := r.sendRequest(t, req) diff --git a/test/participant.go b/test/participant.go index 9d4e45c3..0b17c8e7 100644 --- a/test/participant.go +++ b/test/participant.go @@ -45,8 +45,7 @@ func (r *Runner) runParticipantTest( t *testing.T, name string, test *testCase, f func(t *testing.T, identity string), ) { - run(t, name, func(t *testing.T) { - r.awaitIdle(t) + r.run(t, name, func(t *testing.T) { r.publishSampleOffset(t, test.audioCodec, test.audioDelay, test.audioUnpublish) if test.audioRepublish != 0 { r.publishSampleOffset(t, test.audioCodec, test.audioRepublish, 0) diff --git a/test/publish.go b/test/publish.go new file mode 100644 index 00000000..a6039da6 --- /dev/null +++ b/test/publish.go @@ -0,0 +1,150 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration + +package test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/livekit/egress/pkg/types" + lksdk "github.com/livekit/server-sdk-go/v2" +) + +var ( + samples = map[types.MimeType]string{ + types.MimeTypeOpus: "/workspace/test/sample/matrix-trailer.ogg", + types.MimeTypeH264: "/workspace/test/sample/matrix-trailer.h264", + types.MimeTypeVP8: "/workspace/test/sample/matrix-trailer-vp8.ivf", + types.MimeTypeVP9: "/workspace/test/sample/matrix-trailer-vp9.ivf", + } + + frameDurations = map[types.MimeType]time.Duration{ + types.MimeTypeH264: time.Microsecond * 41708, + types.MimeTypeVP8: time.Microsecond * 41708, + types.MimeTypeVP9: time.Microsecond * 41708, + } +) + +func (r *Runner) publishSamples(t *testing.T, audioCodec, videoCodec types.MimeType) (audioTrackID, videoTrackID string) { + withAudioMuting := false + if videoCodec != "" { + videoTrackID = r.publishSample(t, videoCodec, r.Muting) + } else { + withAudioMuting = r.Muting + } + if audioCodec != "" { + audioTrackID = r.publishSample(t, audioCodec, withAudioMuting) + } + + time.Sleep(time.Second) + return +} + +func (r *Runner) publishSample(t *testing.T, codec types.MimeType, withMuting bool) string { + done := make(chan struct{}) + pub := r.publish(t, codec, done) + trackID := pub.SID() + + t.Cleanup(func() { + _ = r.room.LocalParticipant.UnpublishTrack(trackID) + }) + + if withMuting { + go func() { + muted := false + time.Sleep(time.Second * 15) + for { + select { + case <-done: + return + default: + pub.SetMuted(!muted) + muted = !muted + time.Sleep(time.Second * 10) + } + } + }() + } + + return trackID +} + +func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration) { + if codec == "" { + return + } + + time.AfterFunc(publishAfter, func() { + done := make(chan struct{}) + pub := r.publish(t, codec, done) + if unpublishAfter != 0 { + time.AfterFunc(unpublishAfter-publishAfter, func() { + select { + case <-done: + return + default: + _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + }) + } else { + t.Cleanup(func() { + _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) + }) + } + }) +} + +func (r *Runner) publishSampleWithDisconnection(t *testing.T, codec types.MimeType) string { + done := make(chan struct{}) + pub := r.publish(t, codec, done) + trackID := pub.SID() + + time.AfterFunc(time.Second*10, func() { + pub.SimulateDisconnection(time.Second * 10) + }) + + return trackID +} + +func (r *Runner) publish(t *testing.T, codec types.MimeType, done chan struct{}) *lksdk.LocalTrackPublication { + filename := samples[codec] + frameDuration := frameDurations[codec] + + var pub *lksdk.LocalTrackPublication + opts := []lksdk.ReaderSampleProviderOption{ + lksdk.ReaderTrackWithOnWriteComplete(func() { + close(done) + if pub != nil { + _ = r.room.LocalParticipant.UnpublishTrack(pub.SID()) + } + }), + } + + if frameDuration != 0 { + opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration)) + } + + track, err := lksdk.NewLocalFileTrack(filename, opts...) + require.NoError(t, err) + + pub, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: filename}) + require.NoError(t, err) + + return pub +} diff --git a/test/room_composite.go b/test/room_composite.go index 2c953efb..1bc63e70 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -42,8 +42,7 @@ func (r *Runner) testRoomComposite(t *testing.T) { } func (r *Runner) runRoomTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T)) { - run(t, name, func(t *testing.T) { - r.awaitIdle(t) + r.run(t, name, func(t *testing.T) { r.publishSamples(t, audioCodec, videoCodec) f(t) }) diff --git a/test/runner.go b/test/runner.go index 0efc0eec..5022adc3 100644 --- a/test/runner.go +++ b/test/runner.go @@ -219,20 +219,3 @@ func (r *Runner) StartServer(t *testing.T, svc Server, bus psrpc.MessageBus, tem require.Contains(t, status, "CpuLoad") } } - -func (r *Runner) RunTests(t *testing.T) { - // run tests - r.testRoomComposite(t) - r.testWeb(t) - r.testParticipant(t) - r.testTrackComposite(t) - r.testTrack(t) - r.testEdgeCases(t) -} - -var testNumber int - -func run(t *testing.T, name string, f func(t *testing.T)) { - testNumber++ - t.Run(fmt.Sprintf("%d/%s", testNumber, name), f) -} diff --git a/test/track_composite.go b/test/track_composite.go index aa563f2d..835f19f3 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -45,8 +45,7 @@ func (r *Runner) runTrackTest( t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T, audioTrackID, videoTrackID string), ) { - run(t, name, func(t *testing.T) { - r.awaitIdle(t) + r.run(t, name, func(t *testing.T) { audioTrackID, videoTrackID := r.publishSamples(t, audioCodec, videoCodec) f(t, audioTrackID, videoTrackID) }) diff --git a/test/web.go b/test/web.go index 46077755..e1677682 100644 --- a/test/web.go +++ b/test/web.go @@ -42,19 +42,12 @@ func (r *Runner) testWeb(t *testing.T) { }) } -func (r *Runner) runWebTest(t *testing.T, name string, f func(t *testing.T)) { - run(t, name, func(t *testing.T) { - r.awaitIdle(t) - f(t) - }) -} - func (r *Runner) testWebFile(t *testing.T) { if !r.should(runFile) { return } - r.runWebTest(t, "File", func(t *testing.T) { + r.run(t, "File", func(t *testing.T) { var fileOutput *livekit.EncodedFileOutput if r.GCPUpload != nil { fileOutput = &livekit.EncodedFileOutput{ @@ -91,7 +84,7 @@ func (r *Runner) testWebStream(t *testing.T) { return } - r.runWebTest(t, "Stream", func(t *testing.T) { + r.run(t, "Stream", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_Web{ @@ -117,7 +110,7 @@ func (r *Runner) testWebSegments(t *testing.T) { return } - r.runWebTest(t, "Segments", func(t *testing.T) { + r.run(t, "Segments", func(t *testing.T) { var segmentOutput *livekit.SegmentedFileOutput if r.AzureUpload != nil { segmentOutput = &livekit.SegmentedFileOutput{ @@ -155,7 +148,7 @@ func (r *Runner) testWebMulti(t *testing.T) { return } - r.runWebTest(t, "Multi", func(t *testing.T) { + r.run(t, "Multi", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix),