Skip to content

Commit

Permalink
add test for egress staying open until room closes
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Jul 24, 2024
1 parent 5b6bd31 commit 087a8bd
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 165 deletions.
52 changes: 46 additions & 6 deletions test/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ func (r *Runner) testEdgeCases(t *testing.T) {
}

t.Run("EdgeCases", func(t *testing.T) {
r.testNoPublish(t)
r.testParticipantNoPublish(t)
r.testRoomCompositeStaysOpen(t)
r.testRtmpFailure(t)
r.testSrtFailure(t)
r.testTrackDisconnection(t)
})
}

// ParticipantComposite where the participant never publishes
func (r *Runner) testNoPublish(t *testing.T) {
func (r *Runner) testParticipantNoPublish(t *testing.T) {
r.runParticipantTest(t, "ParticipantNoPublish", &testCase{},
func(t *testing.T, identity string) {
req := &rpc.StartEgressRequest{
Expand Down Expand Up @@ -80,6 +81,47 @@ func (r *Runner) testNoPublish(t *testing.T) {
)
}

// Test that the egress continues if a user leaves
func (r *Runner) testRoomCompositeStaysOpen(t *testing.T) {
r.run(t, "RoomCompositeStaysOpen", func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{
RoomName: r.RoomName,
Layout: "speaker",
FileOutputs: []*livekit.EncodedFileOutput{{
Filepath: path.Join(r.FilePrefix, "room_composite_duration_{time}.mp4"),
}},
},
},
}

info := r.sendRequest(t, req)
time.Sleep(time.Second * 10)
identity := r.room.LocalParticipant.Identity()
r.room.Disconnect()
time.Sleep(time.Second * 10)

// reconnect the publisher to the room
room, err := lksdk.ConnectToRoom(r.WsUrl, lksdk.ConnectInfo{
APIKey: r.ApiKey,
APISecret: r.ApiSecret,
RoomName: r.RoomName,
ParticipantName: "egress-sample",
ParticipantIdentity: identity,
}, lksdk.NewRoomCallback())
require.NoError(t, err)
r.room = room

r.publishSamples(t, types.MimeTypeOpus, types.MimeTypeVP8)
time.Sleep(time.Second * 10)

r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_ACTIVE)
r.stopEgress(t, info.EgressId)
})
}

// RTMP output with no valid urls
func (r *Runner) testRtmpFailure(t *testing.T) {
r.runRoomTest(t, "RtmpFailure", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) {
Expand Down Expand Up @@ -117,7 +159,7 @@ func (r *Runner) testRtmpFailure(t *testing.T) {

// SRT output with a no valid urls
func (r *Runner) testSrtFailure(t *testing.T) {
r.runWebTest(t, "SrtFailure", func(t *testing.T) {
r.run(t, "SrtFailure", func(t *testing.T) {
req := &rpc.StartEgressRequest{
EgressId: utils.NewGuid(utils.EgressPrefix),
Request: &rpc.StartEgressRequest_Web{
Expand Down Expand Up @@ -151,9 +193,7 @@ func (r *Runner) testSrtFailure(t *testing.T) {

// Track composite with data loss due to a disconnection
func (r *Runner) testTrackDisconnection(t *testing.T) {
run(t, "TrackDisconnection", func(t *testing.T) {
r.awaitIdle(t)

r.run(t, "TrackDisconnection", func(t *testing.T) {
test := &testCase{
fileType: livekit.EncodedFileType_MP4,
audioCodec: types.MimeTypeOpus,
Expand Down
144 changes: 19 additions & 125 deletions test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,9 @@ import (
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
lksdk "github.com/livekit/server-sdk-go/v2"
)

var (
samples = map[types.MimeType]string{
types.MimeTypeOpus: "/workspace/test/sample/matrix-trailer.ogg",
types.MimeTypeH264: "/workspace/test/sample/matrix-trailer.h264",
types.MimeTypeVP8: "/workspace/test/sample/matrix-trailer-vp8.ivf",
types.MimeTypeVP9: "/workspace/test/sample/matrix-trailer-vp9.ivf",
}

frameDurations = map[types.MimeType]time.Duration{
types.MimeTypeH264: time.Microsecond * 41708,
types.MimeTypeVP8: time.Microsecond * 41708,
types.MimeTypeVP9: time.Microsecond * 41708,
}

uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02"))
)
var uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02"))

type testCase struct {
name string
Expand Down Expand Up @@ -86,6 +70,24 @@ type testCase struct {
expectVideoEncoding bool
}

func (r *Runner) RunTests(t *testing.T) {
// run tests
r.testRoomComposite(t)
r.testWeb(t)
r.testParticipant(t)
r.testTrackComposite(t)
r.testTrack(t)
r.testEdgeCases(t)
}

var testNumber int

func (r *Runner) run(t *testing.T, name string, f func(t *testing.T)) {
r.awaitIdle(t)
testNumber++
t.Run(fmt.Sprintf("%d/%s", testNumber, name), f)
}

func (r *Runner) awaitIdle(t *testing.T) {
r.svc.KillAll()
for i := 0; i < 30; i++ {
Expand All @@ -97,114 +99,6 @@ func (r *Runner) awaitIdle(t *testing.T) {
t.Fatal("service not idle after 30s")
}

func (r *Runner) publishSamples(t *testing.T, audioCodec, videoCodec types.MimeType) (audioTrackID, videoTrackID string) {
withAudioMuting := false
if videoCodec != "" {
videoTrackID = r.publishSample(t, videoCodec, r.Muting)
} else {
withAudioMuting = r.Muting
}
if audioCodec != "" {
audioTrackID = r.publishSample(t, audioCodec, withAudioMuting)
}

time.Sleep(time.Second)
return
}

func (r *Runner) publishSample(t *testing.T, codec types.MimeType, withMuting bool) string {
done := make(chan struct{})
pub := r.publish(t, codec, done)
trackID := pub.SID()

t.Cleanup(func() {
_ = r.room.LocalParticipant.UnpublishTrack(trackID)
})

if withMuting {
go func() {
muted := false
time.Sleep(time.Second * 15)
for {
select {
case <-done:
return
default:
pub.SetMuted(!muted)
muted = !muted
time.Sleep(time.Second * 10)
}
}
}()
}

return trackID
}

func (r *Runner) publishSampleOffset(t *testing.T, codec types.MimeType, publishAfter, unpublishAfter time.Duration) {
if codec == "" {
return
}

time.AfterFunc(publishAfter, func() {
done := make(chan struct{})
pub := r.publish(t, codec, done)
if unpublishAfter != 0 {
time.AfterFunc(unpublishAfter-publishAfter, func() {
select {
case <-done:
return
default:
_ = r.room.LocalParticipant.UnpublishTrack(pub.SID())
}
})
} else {
t.Cleanup(func() {
_ = r.room.LocalParticipant.UnpublishTrack(pub.SID())
})
}
})
}

func (r *Runner) publishSampleWithDisconnection(t *testing.T, codec types.MimeType) string {
done := make(chan struct{})
pub := r.publish(t, codec, done)
trackID := pub.SID()

time.AfterFunc(time.Second*10, func() {
pub.SimulateDisconnection(time.Second * 10)
})

return trackID
}

func (r *Runner) publish(t *testing.T, codec types.MimeType, done chan struct{}) *lksdk.LocalTrackPublication {
filename := samples[codec]
frameDuration := frameDurations[codec]

var pub *lksdk.LocalTrackPublication
opts := []lksdk.ReaderSampleProviderOption{
lksdk.ReaderTrackWithOnWriteComplete(func() {
close(done)
if pub != nil {
_ = r.room.LocalParticipant.UnpublishTrack(pub.SID())
}
}),
}

if frameDuration != 0 {
opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration))
}

track, err := lksdk.NewLocalFileTrack(filename, opts...)
require.NoError(t, err)

pub, err = r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: filename})
require.NoError(t, err)

return pub
}

func (r *Runner) startEgress(t *testing.T, req *rpc.StartEgressRequest) string {
info := r.sendRequest(t, req)

Expand Down
3 changes: 1 addition & 2 deletions test/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func (r *Runner) runParticipantTest(
t *testing.T, name string, test *testCase,
f func(t *testing.T, identity string),
) {
run(t, name, func(t *testing.T) {
r.awaitIdle(t)
r.run(t, name, func(t *testing.T) {
r.publishSampleOffset(t, test.audioCodec, test.audioDelay, test.audioUnpublish)
if test.audioRepublish != 0 {
r.publishSampleOffset(t, test.audioCodec, test.audioRepublish, 0)
Expand Down
Loading

0 comments on commit 087a8bd

Please sign in to comment.