From 490dc834a0d7040c2952d0a3a4171b68bbc6226c Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 26 Nov 2024 10:31:46 -0800 Subject: [PATCH] fix test collision with media-mtx, downgrade sdk errors (#816) * fix test collision with media-mtx, downgrade sdk errors * update checkStreamUpdate * != * fix another stream test race --- pkg/config/base.go | 10 +++++++++- test/edge.go | 8 ++++---- test/integration.go | 15 +++++++++------ test/multi.go | 6 +++--- test/stream.go | 8 +++++++- 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/pkg/config/base.go b/pkg/config/base.go index bfac0703..7c13e3e3 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -98,6 +98,14 @@ func (c *BaseConfig) initLogger(values ...interface{}) error { l := zl.WithValues(values...) logger.SetLogger(l, "egress") - lksdk.SetLogger(l) + lksdk.SetLogger(&downgradeLogger{Logger: l}) return nil } + +type downgradeLogger struct { + logger.Logger +} + +func (d *downgradeLogger) Errorw(msg string, err error, keysAndValues ...interface{}) { + d.Logger.Warnw(msg, err, keysAndValues...) +} diff --git a/test/edge.go b/test/edge.go index 6f14b8e0..1c3b6001 100644 --- a/test/edge.go +++ b/test/edge.go @@ -110,7 +110,7 @@ func (r *Runner) testEdgeCases(t *testing.T) { videoCodec: types.MimeTypeVP8, }, streamOptions: &streamOptions{ - streamUrls: []string{rtmpUrl1, badRtmpUrl1}, + streamUrls: []string{rtmpUrl4, badRtmpUrl1}, outputType: types.OutputTypeRTMP, }, segmentOptions: &segmentOptions{ @@ -247,16 +247,16 @@ func (r *Runner) testEmptyStreamBin(t *testing.T, test *testCase) { require.NoError(t, err) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE, + rtmpUrl4Redacted: livekit.StreamInfo_ACTIVE, badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, }) _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ EgressId: egressID, - RemoveOutputUrls: []string{rtmpUrl1}, + RemoveOutputUrls: []string{rtmpUrl4}, }) require.NoError(t, err) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - rtmpUrl1Redacted: livekit.StreamInfo_FINISHED, + rtmpUrl4Redacted: livekit.StreamInfo_FINISHED, badRtmpUrl1Redacted: livekit.StreamInfo_FAILED, }) diff --git a/test/integration.go b/test/integration.go index b29eaf4d..b5c76a93 100644 --- a/test/integration.go +++ b/test/integration.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" ) @@ -138,21 +139,23 @@ func (r *Runner) checkUpdate(t *testing.T, egressID string, status livekit.Egres func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[string]livekit.StreamInfo_Status) { for { info := r.getUpdate(t, egressID) + if len(expected) != len(info.StreamResults) { + continue + } require.Equal(t, len(expected), len(info.StreamResults)) - failureStillActive := false + checkNext := false for _, s := range info.StreamResults { require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") - - if expected[s.Url] == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { - failureStillActive = true + if expected[s.Url] > s.Status { + logger.Debugw(fmt.Sprintf("stream status %s, expecting %s", s.Status.String(), expected[s.Url].String())) + checkNext = true continue } - require.Equal(t, expected[s.Url], s.Status) } - if !failureStillActive { + if !checkNext { return } } diff --git a/test/multi.go b/test/multi.go index 5a3b16d6..69bef45c 100644 --- a/test/multi.go +++ b/test/multi.go @@ -125,14 +125,14 @@ func (r *Runner) runMultiTest(t *testing.T, test *testCase) { if test.streamOptions != nil { _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ EgressId: egressID, - AddOutputUrls: []string{rtmpUrl1}, + AddOutputUrls: []string{rtmpUrl3}, }) require.NoError(t, err) time.Sleep(time.Second * 10) - r.verifyStreams(t, p, rtmpUrl1) + r.verifyStreams(t, p, rtmpUrl3) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE, + rtmpUrl3Redacted: livekit.StreamInfo_ACTIVE, }) time.Sleep(time.Second * 10) } else { diff --git a/test/stream.go b/test/stream.go index e66772a6..563c94eb 100644 --- a/test/stream.go +++ b/test/stream.go @@ -49,10 +49,16 @@ const ( var ( streamKey1 = utils.NewGuid("") streamKey2 = utils.NewGuid("") + streamKey3 = utils.NewGuid("") + streamKey4 = utils.NewGuid("") rtmpUrl1 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey1) - rtmpUrl1Redacted, _ = utils.RedactStreamKey(rtmpUrl1) rtmpUrl2 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey2) + rtmpUrl3 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey3) + rtmpUrl4 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey4) + rtmpUrl1Redacted, _ = utils.RedactStreamKey(rtmpUrl1) rtmpUrl2Redacted, _ = utils.RedactStreamKey(rtmpUrl2) + rtmpUrl3Redacted, _ = utils.RedactStreamKey(rtmpUrl3) + rtmpUrl4Redacted, _ = utils.RedactStreamKey(rtmpUrl4) srtPublishUrl1 = fmt.Sprintf("srt://localhost:8890?streamid=publish:%s&pkt_size=1316", streamKey1) srtReadUrl1 = fmt.Sprintf("srt://localhost:8890?streamid=read:%s", streamKey1) srtPublishUrl2 = fmt.Sprintf("srt://localhost:8890?streamid=publish:%s&pkt_size=1316", streamKey2)