Skip to content

Commit

Permalink
fix test collision with media-mtx, downgrade sdk errors (#816)
Browse files Browse the repository at this point in the history
* fix test collision with media-mtx, downgrade sdk errors

* update checkStreamUpdate

* !=

* fix another stream test race
  • Loading branch information
frostbyte73 authored Nov 26, 2024
1 parent 5dee37a commit 490dc83
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 15 deletions.
10 changes: 9 additions & 1 deletion pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ func (c *BaseConfig) initLogger(values ...interface{}) error {
l := zl.WithValues(values...)

logger.SetLogger(l, "egress")
lksdk.SetLogger(l)
lksdk.SetLogger(&downgradeLogger{Logger: l})
return nil
}

type downgradeLogger struct {
logger.Logger
}

func (d *downgradeLogger) Errorw(msg string, err error, keysAndValues ...interface{}) {
d.Logger.Warnw(msg, err, keysAndValues...)
}
8 changes: 4 additions & 4 deletions test/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *Runner) testEdgeCases(t *testing.T) {
videoCodec: types.MimeTypeVP8,
},
streamOptions: &streamOptions{
streamUrls: []string{rtmpUrl1, badRtmpUrl1},
streamUrls: []string{rtmpUrl4, badRtmpUrl1},
outputType: types.OutputTypeRTMP,
},
segmentOptions: &segmentOptions{
Expand Down Expand Up @@ -247,16 +247,16 @@ func (r *Runner) testEmptyStreamBin(t *testing.T, test *testCase) {
require.NoError(t, err)

r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{
rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE,
rtmpUrl4Redacted: livekit.StreamInfo_ACTIVE,
badRtmpUrl1Redacted: livekit.StreamInfo_FAILED,
})
_, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{
EgressId: egressID,
RemoveOutputUrls: []string{rtmpUrl1},
RemoveOutputUrls: []string{rtmpUrl4},
})
require.NoError(t, err)
r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{
rtmpUrl1Redacted: livekit.StreamInfo_FINISHED,
rtmpUrl4Redacted: livekit.StreamInfo_FINISHED,
badRtmpUrl1Redacted: livekit.StreamInfo_FAILED,
})

Expand Down
15 changes: 9 additions & 6 deletions test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
)

Expand Down Expand Up @@ -138,21 +139,23 @@ func (r *Runner) checkUpdate(t *testing.T, egressID string, status livekit.Egres
func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[string]livekit.StreamInfo_Status) {
for {
info := r.getUpdate(t, egressID)
if len(expected) != len(info.StreamResults) {
continue
}
require.Equal(t, len(expected), len(info.StreamResults))

failureStillActive := false
checkNext := false
for _, s := range info.StreamResults {
require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "")

if expected[s.Url] == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE {
failureStillActive = true
if expected[s.Url] > s.Status {
logger.Debugw(fmt.Sprintf("stream status %s, expecting %s", s.Status.String(), expected[s.Url].String()))
checkNext = true
continue
}

require.Equal(t, expected[s.Url], s.Status)
}

if !failureStillActive {
if !checkNext {
return
}
}
Expand Down
6 changes: 3 additions & 3 deletions test/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ func (r *Runner) runMultiTest(t *testing.T, test *testCase) {
if test.streamOptions != nil {
_, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{
EgressId: egressID,
AddOutputUrls: []string{rtmpUrl1},
AddOutputUrls: []string{rtmpUrl3},
})
require.NoError(t, err)

time.Sleep(time.Second * 10)
r.verifyStreams(t, p, rtmpUrl1)
r.verifyStreams(t, p, rtmpUrl3)
r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{
rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE,
rtmpUrl3Redacted: livekit.StreamInfo_ACTIVE,
})
time.Sleep(time.Second * 10)
} else {
Expand Down
8 changes: 7 additions & 1 deletion test/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ const (
var (
streamKey1 = utils.NewGuid("")
streamKey2 = utils.NewGuid("")
streamKey3 = utils.NewGuid("")
streamKey4 = utils.NewGuid("")
rtmpUrl1 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey1)
rtmpUrl1Redacted, _ = utils.RedactStreamKey(rtmpUrl1)
rtmpUrl2 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey2)
rtmpUrl3 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey3)
rtmpUrl4 = fmt.Sprintf("rtmp://localhost:1935/live/%s", streamKey4)
rtmpUrl1Redacted, _ = utils.RedactStreamKey(rtmpUrl1)
rtmpUrl2Redacted, _ = utils.RedactStreamKey(rtmpUrl2)
rtmpUrl3Redacted, _ = utils.RedactStreamKey(rtmpUrl3)
rtmpUrl4Redacted, _ = utils.RedactStreamKey(rtmpUrl4)
srtPublishUrl1 = fmt.Sprintf("srt://localhost:8890?streamid=publish:%s&pkt_size=1316", streamKey1)
srtReadUrl1 = fmt.Sprintf("srt://localhost:8890?streamid=read:%s", streamKey1)
srtPublishUrl2 = fmt.Sprintf("srt://localhost:8890?streamid=publish:%s&pkt_size=1316", streamKey2)
Expand Down

0 comments on commit 490dc83

Please sign in to comment.