From 6b166095c239398209325032dd68feedd5974851 Mon Sep 17 00:00:00 2001 From: yaruno Date: Mon, 22 Jul 2024 21:58:08 +0200 Subject: [PATCH 1/6] Allow SRT output on egress server (#688) * initial commit, preliminary srt support * use switch/case instead of condition logic --------- Co-authored-by: David Colburn --- pkg/config/output.go | 19 ++++++++++++++++- pkg/config/output_stream.go | 4 ++++ pkg/config/pipeline.go | 37 ++++++++++++++++++++++++++++++++++ pkg/pipeline/builder/stream.go | 17 +++++++++++++++- pkg/pipeline/controller.go | 33 +++++++++++++++++++++++++++++- pkg/types/types.go | 7 +++++++ 6 files changed, 114 insertions(+), 3 deletions(-) diff --git a/pkg/config/output.go b/pkg/config/output.go index 4b059087..36963f71 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -15,6 +15,8 @@ package config import ( + "net/url" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/egress" @@ -85,7 +87,22 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error { return errors.ErrInvalidInput("multiple stream outputs") } if stream != nil { - conf, err := p.getStreamConfig(types.OutputTypeRTMP, stream.Urls) + u, err := url.Parse(stream.Urls[0]) + if err != nil { + return errors.ErrInvalidInput("malformed url") + } + + var outputType types.OutputType + switch u.Scheme { + case "srt": + outputType = types.OutputTypeSRT + case "rtmp": + outputType = types.OutputTypeRTMP + default: + return errors.ErrInvalidInput("invalid stream type") + } + + conf, err := p.getStreamConfig(outputType, stream.Urls) if err != nil { return err } diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index dcd11419..b58aac85 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -67,6 +67,10 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str p.AudioOutCodec = types.MimeTypeAAC p.VideoOutCodec = types.MimeTypeH264 + case types.OutputTypeSRT: + p.AudioOutCodec = types.MimeTypeAAC + p.VideoOutCodec = types.MimeTypeH264 + case types.OutputTypeRaw: p.AudioOutCodec = types.MimeTypeRawAudio } diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index ab800ed9..9028a9e1 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -596,6 +596,43 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s return nil } +// func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) { +// parsed, err := url.Parse(rawUrl) +// if err != nil { +// return "", "", errors.ErrInvalidUrl(rawUrl, err.Error()) +// } + +// switch outputType { +// case types.OutputTypeRTMP: +// if parsed.Scheme == "mux" { +// rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host) +// } + +// redacted, ok := utils.RedactStreamKey(rawUrl) +// if !ok { +// return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)") +// } +// return rawUrl, redacted, nil + +// case types.OutputTypeSRT: +// if parsed.Scheme != "srt" { +// return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") +// } +// // Todo: Optionally, you can redact the SRT stream key or other sensitive parts of the URL +// redacted := rawUrl +// return rawUrl, redacted, nil + +// case types.OutputTypeRaw: +// if parsed.Scheme != "ws" && parsed.Scheme != "wss" { +// return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") +// } +// return rawUrl, rawUrl, nil + +// default: +// return "", "", errors.ErrInvalidInput("stream output type") +// } +// } + func (p *PipelineConfig) GetEncodedOutputs() []OutputConfig { ret := make([]OutputConfig, 0) diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index 2cd1ae9e..cf2ab578 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -68,6 +68,13 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St return nil, nil, errors.ErrGstPipelineError(err) } + case types.OutputTypeSRT: + // add SRT steam + mux, err = gst.NewElement("mpegtsmux") + if err != nil { + return nil, nil, errors.ErrGstPipelineError(err) + } + default: err = errors.ErrInvalidInput("output type") } @@ -145,6 +152,14 @@ func (sb *StreamBin) AddStream(url string) error { if err = sink.Set("location", url); err != nil { return errors.ErrGstPipelineError(err) } + case types.OutputTypeSRT: + sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", name)) + if err != nil { + return errors.ErrGstPipelineError(err) + } + if err = sink.SetProperty("uri", url); err != nil { + return errors.ErrGstPipelineError(err) + } default: return errors.ErrInvalidInput("output type") @@ -161,7 +176,7 @@ func (sb *StreamBin) AddStream(url string) error { // It is later released in RemoveSink proxy.Ref() - // Intercept flows from rtmp2sink. Anything besides EOS will be ignored + // Intercept flows from the sink. Anything besides EOS will be ignored proxy.SetChainFunction(func(self *gst.Pad, _ *gst.Object, buffer *gst.Buffer) gst.FlowReturn { // Buffer gets automatically unreferenced by go-gst. // Without referencing it here, it will sometimes be garbage collected before being written diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 78e6e261..9e255c76 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -16,6 +16,7 @@ package pipeline import ( "context" + "net/url" "sync" "time" @@ -263,8 +264,23 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream // add stream outputs first for _, rawUrl := range req.AddOutputUrls { + u, err := url.Parse(rawUrl) + if err != nil { + return errors.ErrInvalidInput("malformed url") + } + + var outputType types.OutputType + switch u.Scheme { + case "srt": + outputType = types.OutputTypeSRT + case "rtmp": + outputType = types.OutputTypeRTMP + default: + return errors.ErrInvalidInput("invalid stream type") + } + // validate and redact url - url, redacted, err := config.ValidateUrl(rawUrl, types.OutputTypeRTMP) + url, redacted, err := config.ValidateUrl(rawUrl, outputType) if err != nil { errs.AppendErr(err) continue @@ -298,6 +314,21 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream // remove stream outputs for _, rawUrl := range req.RemoveOutputUrls { + u, err := url.Parse(rawUrl) + if err != nil { + return errors.ErrInvalidInput("malformed url") + } + + var outputType types.OutputType + switch u.Scheme { + case "srt": + outputType = types.OutputTypeSRT + case "rtmp": + outputType = types.OutputTypeRTMP + default: + return errors.ErrInvalidInput("invalid stream type") + } + url, err := o.GetStreamUrl(rawUrl) if err != nil { errs.AppendErr(err) diff --git a/pkg/types/types.go b/pkg/types/types.go index e98764cf..e6a11fb1 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -66,6 +66,7 @@ const ( OutputTypeWebM OutputType = "video/webm" OutputTypeJPEG OutputType = "image/jpeg" OutputTypeRTMP OutputType = "rtmp" + OutputTypeSRT OutputType = "srt" OutputTypeHLS OutputType = "application/x-mpegurl" OutputTypeJSON OutputType = "application/json" OutputTypeBlob OutputType = "application/octet-stream" @@ -89,6 +90,7 @@ var ( OutputTypeTS: MimeTypeAAC, OutputTypeWebM: MimeTypeOpus, OutputTypeRTMP: MimeTypeAAC, + OutputTypeSRT: MimeTypeAAC, OutputTypeHLS: MimeTypeAAC, } @@ -98,6 +100,7 @@ var ( OutputTypeTS: MimeTypeH264, OutputTypeWebM: MimeTypeVP8, OutputTypeRTMP: MimeTypeH264, + OutputTypeSRT: MimeTypeH264, OutputTypeHLS: MimeTypeH264, } @@ -153,6 +156,10 @@ var ( MimeTypeAAC: true, MimeTypeH264: true, }, + OutputTypeSRT: { + MimeTypeAAC: true, + MimeTypeH264: true, + }, OutputTypeHLS: { MimeTypeAAC: true, MimeTypeH264: true, From 85155bb85aa7c5eb67437eeb4a8411c965e35fbe Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 22 Jul 2024 22:24:33 -0400 Subject: [PATCH 2/6] testing and fixes --- go.mod | 2 +- go.sum | 4 +- pkg/config/output.go | 32 ++- pkg/config/pipeline.go | 37 --- pkg/config/urls.go | 9 +- pkg/gstreamer/bin.go | 12 +- pkg/pipeline/builder/stream.go | 72 +++--- pkg/pipeline/controller.go | 55 +---- pkg/pipeline/watch.go | 115 +++++---- pkg/types/types.go | 8 + test/edge.go | 62 ++++- test/ffprobe.go | 9 - test/integration.go | 14 +- test/multi.go | 6 +- test/participant.go | 275 +++++++++++----------- test/room_composite.go | 159 ++++++------- test/runner.go | 7 + test/stream.go | 84 ++++--- test/track.go | 118 +++++----- test/track_composite.go | 411 ++++++++++++++++----------------- test/web.go | 30 ++- 21 files changed, 797 insertions(+), 724 deletions(-) diff --git a/go.mod b/go.mod index b48c88cd..b2dfa532 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/gorilla/websocket v1.5.2 github.com/livekit/livekit-server v1.6.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa + github.com/livekit/protocol v1.19.2-0.20240722200827-2c910325dbfb github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad github.com/pion/rtp v1.8.6 diff --git a/go.sum b/go.sum index 18188c39..94f78982 100644 --- a/go.sum +++ b/go.sum @@ -184,8 +184,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec= github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa h1:rkX4blO/giAiqWM/E5T0N7SU0OA9pMHjWekhv+a6byI= -github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240722200827-2c910325dbfb h1:2B+mVyYk79MQ1UHcVE5GPGBqzS6NrrkymHzpVxNmLvo= +github.com/livekit/protocol v1.19.2-0.20240722200827-2c910325dbfb/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto= github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad h1:SfX8OBXfUx9WHGEIsJi+rpWMsPhgtlRlQpHk3bnEZrI= diff --git a/pkg/config/output.go b/pkg/config/output.go index 36963f71..86497c5a 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -87,19 +87,29 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error { return errors.ErrInvalidInput("multiple stream outputs") } if stream != nil { - u, err := url.Parse(stream.Urls[0]) - if err != nil { - return errors.ErrInvalidInput("malformed url") - } - var outputType types.OutputType - switch u.Scheme { - case "srt": - outputType = types.OutputTypeSRT - case "rtmp": + switch stream.Protocol { + case livekit.StreamProtocol_DEFAULT_PROTOCOL: + if len(stream.Urls) == 0 { + return errors.ErrInvalidInput("stream protocol") + } + + parsed, err := url.Parse(stream.Urls[0]) + if err != nil { + return errors.ErrInvalidUrl(stream.Urls[0], err.Error()) + } + + var ok bool + outputType, ok = types.StreamOutputTypes[parsed.Scheme] + if !ok { + return errors.ErrInvalidUrl(stream.Urls[0], "invalid protocol") + } + + case livekit.StreamProtocol_RTMP: outputType = types.OutputTypeRTMP - default: - return errors.ErrInvalidInput("invalid stream type") + + case livekit.StreamProtocol_SRT: + outputType = types.OutputTypeSRT } conf, err := p.getStreamConfig(outputType, stream.Urls) diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 9028a9e1..ab800ed9 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -596,43 +596,6 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s return nil } -// func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) { -// parsed, err := url.Parse(rawUrl) -// if err != nil { -// return "", "", errors.ErrInvalidUrl(rawUrl, err.Error()) -// } - -// switch outputType { -// case types.OutputTypeRTMP: -// if parsed.Scheme == "mux" { -// rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host) -// } - -// redacted, ok := utils.RedactStreamKey(rawUrl) -// if !ok { -// return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)") -// } -// return rawUrl, redacted, nil - -// case types.OutputTypeSRT: -// if parsed.Scheme != "srt" { -// return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") -// } -// // Todo: Optionally, you can redact the SRT stream key or other sensitive parts of the URL -// redacted := rawUrl -// return rawUrl, redacted, nil - -// case types.OutputTypeRaw: -// if parsed.Scheme != "ws" && parsed.Scheme != "wss" { -// return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") -// } -// return rawUrl, rawUrl, nil - -// default: -// return "", "", errors.ErrInvalidInput("stream output type") -// } -// } - func (p *PipelineConfig) GetEncodedOutputs() []OutputConfig { ret := make([]OutputConfig, 0) diff --git a/pkg/config/urls.go b/pkg/config/urls.go index 30aa9ae9..39e6dc17 100644 --- a/pkg/config/urls.go +++ b/pkg/config/urls.go @@ -35,6 +35,9 @@ func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, er if err != nil { return "", "", errors.ErrInvalidUrl(rawUrl, err.Error()) } + if types.StreamOutputTypes[parsed.Scheme] != outputType { + return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") + } switch outputType { case types.OutputTypeRTMP: @@ -58,10 +61,10 @@ func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, er } return rawUrl, redacted, nil + case types.OutputTypeSRT: + return rawUrl, rawUrl, nil + case types.OutputTypeRaw: - if parsed.Scheme != "ws" && parsed.Scheme != "wss" { - return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") - } return rawUrl, rawUrl, nil default: diff --git a/pkg/gstreamer/bin.go b/pkg/gstreamer/bin.go index 20018158..b8f8023a 100644 --- a/pkg/gstreamer/bin.go +++ b/pkg/gstreamer/bin.go @@ -262,26 +262,34 @@ func (b *Bin) probeRemoveSink(sink *Bin) { return } - srcGhostPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { + logger.Debugw("adding probe") + srcGhostPad.AddProbe(gst.PadProbeTypeAllBoth, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { + logger.Debugw("unlinking") srcGhostPad.Unlink(sinkGhostPad.Pad) + logger.Debugw("sending EOS to sinkGhostPad") sinkGhostPad.Pad.SendEvent(gst.NewEOSEvent()) b.mu.Lock() + logger.Debugw("removing sink bin") err := b.pipeline.Remove(sink.bin.Element) b.mu.Unlock() if err != nil { + logger.Debugw("failed to remove sink bin", "error", err) b.OnError(errors.ErrGstPipelineError(err)) return gst.PadProbeRemove } + logger.Debugw("setting state to null") if err = sink.SetState(gst.StateNull); err != nil { logger.Warnw(fmt.Sprintf("failed to change %s state", sink.bin.GetName()), err) } + logger.Debugw("releasing tee request pad") b.elements[len(b.elements)-1].ReleaseRequestPad(srcGhostPad.GetTarget()) + logger.Debugw("removing tee pad") b.bin.RemovePad(srcGhostPad.Pad) - return gst.PadProbeRemove + return gst.PadProbeOK }) } diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index cf2ab578..2cea7727 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -35,6 +35,7 @@ type StreamBin struct { b *gstreamer.Bin outputType types.OutputType sinks map[string]*StreamSink + removals map[string]struct{} } type StreamSink struct { @@ -68,8 +69,11 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St return nil, nil, errors.ErrGstPipelineError(err) } + b.SetGetSrcPad(func(name string) *gst.Pad { + return mux.GetRequestPad(name) + }) + case types.OutputTypeSRT: - // add SRT steam mux, err = gst.NewElement("mpegtsmux") if err != nil { return nil, nil, errors.ErrGstPipelineError(err) @@ -98,6 +102,7 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St b: b, outputType: o.OutputType, sinks: make(map[string]*StreamSink), + removals: make(map[string]struct{}), } for _, url := range o.Urls { @@ -106,32 +111,17 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St } } - b.SetGetSrcPad(func(name string) *gst.Pad { - return mux.GetRequestPad(name) - }) - return sb, b, nil } -func (sb *StreamBin) GetStreamUrl(name string) (string, error) { - sb.mu.RLock() - sink, ok := sb.sinks[name] - sb.mu.RUnlock() - if !ok { - return "", errors.ErrStreamNotFound(name) - } - return sink.url, nil -} - func (sb *StreamBin) AddStream(url string) error { name := utils.NewGuid("") b := sb.b.NewBin(name) - queue, err := gst.NewElementWithName("queue", fmt.Sprintf("queue_%s", name)) + queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", name), config.Latency, true) if err != nil { return errors.ErrGstPipelineError(err) } - queue.SetArg("leaky", "downstream") var sink *gst.Element switch sb.outputType { @@ -152,6 +142,7 @@ func (sb *StreamBin) AddStream(url string) error { if err = sink.Set("location", url); err != nil { return errors.ErrGstPipelineError(err) } + case types.OutputTypeSRT: sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", name)) if err != nil { @@ -160,6 +151,9 @@ func (sb *StreamBin) AddStream(url string) error { if err = sink.SetProperty("uri", url); err != nil { return errors.ErrGstPipelineError(err) } + if err = sink.SetProperty("wait-for-connection", false); err != nil { + return errors.ErrGstPipelineError(err) + } default: return errors.ErrInvalidInput("output type") @@ -169,34 +163,29 @@ func (sb *StreamBin) AddStream(url string) error { return err } + // add a proxy pad between the queue and sink to intercept errors b.SetLinkFunc(func() error { proxy := gst.NewGhostPad("proxy", sink.GetStaticPad("sink")) - - // Proxy isn't saved/stored anywhere, so we need to call ref. - // It is later released in RemoveSink + // proxy isn't saved/stored anywhere, so we need to reference it proxy.Ref() - - // Intercept flows from the sink. Anything besides EOS will be ignored proxy.SetChainFunction(func(self *gst.Pad, _ *gst.Object, buffer *gst.Buffer) gst.FlowReturn { - // Buffer gets automatically unreferenced by go-gst. - // Without referencing it here, it will sometimes be garbage collected before being written + // buffer needs to be referenced or it might get freed buffer.Ref() - - internal, _ := self.GetInternalLinks() - if len(internal) != 1 { - return gst.FlowNotLinked - } - - if internal[0].Push(buffer) == gst.FlowEOS { - return gst.FlowEOS + links, _ := self.GetInternalLinks() + for _, link := range links { + if link.Push(buffer) == gst.FlowEOS { + return gst.FlowEOS + } } return gst.FlowOK }) proxy.ActivateMode(gst.PadModePush, true) + // link queue to sink if padReturn := queue.GetStaticPad("src").Link(proxy.Pad); padReturn != gst.PadLinkOK { return errors.ErrPadLinkFailed(queue.GetName(), "proxy", padReturn.String()) } + return nil }) @@ -211,6 +200,16 @@ func (sb *StreamBin) AddStream(url string) error { return sb.b.AddSinkBin(b) } +func (sb *StreamBin) GetStreamUrl(name string) (string, error) { + sb.mu.RLock() + sink, ok := sb.sinks[name] + sb.mu.RUnlock() + if !ok { + return "", errors.ErrStreamNotFound(name) + } + return sink.url, nil +} + func (sb *StreamBin) MaybeResetStream(name string, streamErr error) (bool, error) { sb.mu.Lock() sink := sb.sinks[name] @@ -262,6 +261,8 @@ func (sb *StreamBin) RemoveStream(url string) error { return errors.ErrStreamNotFound(url) } delete(sb.sinks, name) + + sb.removals[name] = struct{}{} sb.mu.Unlock() _, err := sb.b.RemoveSinkBin(name) @@ -276,3 +277,10 @@ func (sb *StreamBin) getStreamNameLocked(url string) string { } return "" } + +func (sb *StreamBin) Removed(name string) bool { + sb.mu.Lock() + _, ok := sb.removals[name] + sb.mu.Unlock() + return ok +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 9e255c76..0f285329 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -16,7 +16,6 @@ package pipeline import ( "context" - "net/url" "sync" "time" @@ -46,18 +45,18 @@ const ( type Controller struct { *config.PipelineConfig + ipcServiceClient ipc.EgressServiceClient // gstreamer - src source.Source - p *gstreamer.Pipeline - sinks map[types.EgressType][]sink.Sink - streamBin *builder.StreamBin - callbacks *gstreamer.Callbacks - ipcServiceClient ipc.EgressServiceClient + gstLogger *zap.SugaredLogger + src source.Source + p *gstreamer.Pipeline + sinks map[types.EgressType][]sink.Sink + streamBin *builder.StreamBin + callbacks *gstreamer.Callbacks // internal mu sync.Mutex - gstLogger *zap.SugaredLogger monitor *stats.HandlerMonitor limitTimer *time.Timer playing core.Fuse @@ -72,14 +71,14 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. var err error c := &Controller{ - PipelineConfig: conf, + PipelineConfig: conf, + ipcServiceClient: ipcServiceClient, + gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), callbacks: &gstreamer.Callbacks{ GstReady: make(chan struct{}), BuildReady: make(chan struct{}), }, - ipcServiceClient: ipcServiceClient, - gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), - monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), + monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), } c.callbacks.SetOnError(c.OnError) @@ -264,23 +263,8 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream // add stream outputs first for _, rawUrl := range req.AddOutputUrls { - u, err := url.Parse(rawUrl) - if err != nil { - return errors.ErrInvalidInput("malformed url") - } - - var outputType types.OutputType - switch u.Scheme { - case "srt": - outputType = types.OutputTypeSRT - case "rtmp": - outputType = types.OutputTypeRTMP - default: - return errors.ErrInvalidInput("invalid stream type") - } - // validate and redact url - url, redacted, err := config.ValidateUrl(rawUrl, outputType) + url, redacted, err := config.ValidateUrl(rawUrl, o.OutputType) if err != nil { errs.AppendErr(err) continue @@ -314,21 +298,6 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream // remove stream outputs for _, rawUrl := range req.RemoveOutputUrls { - u, err := url.Parse(rawUrl) - if err != nil { - return errors.ErrInvalidInput("malformed url") - } - - var outputType types.OutputType - switch u.Scheme { - case "srt": - outputType = types.OutputTypeSRT - case "rtmp": - outputType = types.OutputTypeRTMP - default: - return errors.ErrInvalidInput("invalid stream type") - } - url, err := o.GetStreamUrl(rawUrl) if err != nil { errs.AppendErr(err) diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index b72637b2..f87f6c31 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -33,38 +33,17 @@ import ( ) const ( - // watch errors - msgClockProblem = "GStreamer error: clock problem." - msgStreamingNotNegotiated = "streaming stopped, reason not-negotiated (-4)" - msgMuxer = ":muxer" - - elementGstRtmp2Sink = "GstRtmp2Sink" - elementGstAppSrc = "GstAppSrc" - elementSplitMuxSink = "GstSplitMuxSink" - - // watch elements - msgFirstSampleMetadata = "FirstSampleMetadata" - msgFragmentOpened = "splitmuxsink-fragment-opened" - msgFragmentClosed = "splitmuxsink-fragment-closed" - msgGstMultiFileSink = "GstMultiFileSink" - - fragmentLocation = "location" - fragmentRunningTime = "running-time" - - gstMultiFileSinkFilename = "filename" - gstMultiFileSinkTimestamp = "timestamp" - - // common gst errors + // gst error logs msgWrongThread = "Called from wrong thread" - // common gst warnings + // gst warning logs msgKeyframe = "Could not request a keyframe. Files may not split at the exact location they should" msgLatencyQuery = "Latency query failed" msgTaps = "can't find exact taps" msgInputDisappeared = "Can't copy metadata because input buffer disappeared" fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont" - // common gst fixmes + // gst fix me logs msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements" msgCreatingStream = "Creating random stream-id, consider implementing a deterministic way of creating a stream-id" msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation." @@ -85,6 +64,7 @@ func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line in } case gst.LevelWarning: if function == fnGstAudioResampleCheckDiscont { + // ignore return } switch message { @@ -147,6 +127,10 @@ func (c *Controller) messageWatch(msg *gst.Message) bool { return true } +const ( + msgClockProblem = "GStreamer error: clock problem." +) + func (c *Controller) handleMessageWarning(gErr *gst.GError) error { element, _, message := parseDebugInfo(gErr) @@ -160,17 +144,27 @@ func (c *Controller) handleMessageWarning(gErr *gst.GError) error { return nil } +const ( + elementGstAppSrc = "GstAppSrc" + elementGstRtmp2Sink = "GstRtmp2Sink" + elementGstSplitMuxSink = "GstSplitMuxSink" + elementGstSrtSink = "GstSRTSink" + elementGstQueue = "GstQueue" + + msgStreamingNotNegotiated = "streaming stopped, reason not-negotiated (-4)" + msgMuxer = ":muxer" +) + // handleMessageError returns true if the error has been handled, false if the pipeline should quit func (c *Controller) handleMessageError(gErr *gst.GError) error { element, name, message := parseDebugInfo(gErr) switch { case element == elementGstRtmp2Sink: - name = strings.Split(name, "_")[1] - + sinkName := strings.Split(name, "_")[1] if !c.eos.IsBroken() { // try reconnecting - ok, err := c.streamBin.MaybeResetStream(name, gErr) + ok, err := c.streamBin.MaybeResetStream(sinkName, gErr) if err != nil { logger.Errorw("failed to reset stream", err) } else if ok { @@ -179,7 +173,7 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { } // remove sink - url, err := c.streamBin.GetStreamUrl(name) + url, err := c.streamBin.GetStreamUrl(sinkName) if err != nil { logger.Warnw("rtmp output not found", err, "url", url) return err @@ -187,6 +181,22 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { return c.removeSink(context.Background(), url, gErr) + case element == elementGstSrtSink: + sinkName := strings.Split(name, "_")[1] + url, err := c.streamBin.GetStreamUrl(sinkName) + if err != nil { + logger.Warnw("srt output not found", err, "url", url) + return err + } + + return c.removeSink(context.Background(), url, gErr) + + case element == elementGstQueue: + sinkName := strings.Split(name, "_")[1] + if c.streamBin.Removed(sinkName) { + return nil + } + case element == elementGstAppSrc: if message == msgStreamingNotNegotiated { // send eos to app src @@ -195,8 +205,8 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { return nil } - case element == elementSplitMuxSink: - // We sometimes get GstSplitMuxSink errors if send EOS before the first media was sent to the mux + case element == elementGstSplitMuxSink: + // We sometimes get GstSplitMuxSink errors if EOS was received before any data if message == msgMuxer { if c.eos.IsBroken() { logger.Debugw("GstSplitMuxSink failure after sending EOS") @@ -207,23 +217,10 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { // input failure or file write failure. Fatal err := errors.ErrGstPipelineError(gErr) - logger.Errorw(gErr.Error(), errors.New(message), "element", name) + logger.Errorw(gErr.Error(), errors.New(message), "element", element, "name", name) return err } -// Debug info comes in the following format: -// file.c(line): method_name (): /GstPipeline:pipeline/GstBin:bin_name/GstElement:element_name:\nError message -var regExp = regexp.MustCompile("(?s)(.*?)GstPipeline:pipeline/GstBin:(.*?)/(.*?):([^:]*)(:\n)?(.*)") - -func parseDebugInfo(gErr *gst.GError) (element, name, message string) { - match := regExp.FindStringSubmatch(gErr.DebugString()) - - element = match[3] - name = match[4] - message = match[6] - return -} - func (c *Controller) handleMessageStateChanged(msg *gst.Message) { _, newState := msg.ParseStateChanged() if newState != gst.StatePlaying { @@ -245,6 +242,13 @@ func (c *Controller) handleMessageStateChanged(msg *gst.Message) { return } +const ( + msgFirstSampleMetadata = "FirstSampleMetadata" + msgFragmentOpened = "splitmuxsink-fragment-opened" + msgFragmentClosed = "splitmuxsink-fragment-closed" + msgGstMultiFileSink = "GstMultiFileSink" +) + func (c *Controller) handleMessageElement(msg *gst.Message) error { s := msg.GetStructure() if s != nil { @@ -307,6 +311,24 @@ func (c *Controller) handleMessageElement(msg *gst.Message) error { return nil } +// Debug info comes in the following format: +// file.c(line): method_name (): /GstPipeline:pipeline/GstBin:bin_name/GstElement:element_name:\nError message +var gstDebug = regexp.MustCompile("(?s)(.*?)GstPipeline:pipeline/GstBin:(.*?)/(.*?):([^:]*)(:\n)?(.*)") + +func parseDebugInfo(gErr *gst.GError) (element, name, message string) { + match := gstDebug.FindStringSubmatch(gErr.DebugString()) + + element = match[3] + name = match[4] + message = match[6] + return +} + +const ( + fragmentLocation = "location" + fragmentRunningTime = "running-time" +) + func getSegmentParamsFromGstStructure(s *gst.Structure) (filepath string, time uint64, err error) { loc, err := s.GetValue(fragmentLocation) if err != nil { @@ -339,6 +361,11 @@ func getFirstSampleMetadataFromGstStructure(s *gst.Structure) (startDate time.Ti return time.Unix(0, firstSampleMetadata.StartDate), nil } +const ( + gstMultiFileSinkFilename = "filename" + gstMultiFileSinkTimestamp = "timestamp" +) + func getImageInformationFromGstStructure(s *gst.Structure) (string, uint64, error) { loc, err := s.GetValue(gstMultiFileSinkFilename) if err != nil { diff --git a/pkg/types/types.go b/pkg/types/types.go index e6a11fb1..f3da4c91 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -200,6 +200,14 @@ var ( MimeTypeVP8: OutputTypeWebM, MimeTypeVP9: OutputTypeWebM, } + + StreamOutputTypes = map[string]OutputType{ + "rtmp": OutputTypeRTMP, + "rtmps": OutputTypeRTMP, + "mux": OutputTypeRTMP, + "twitch": OutputTypeRTMP, + "srt": OutputTypeSRT, + } ) func GetOutputTypeCompatibleWithCodecs(types []OutputType, audioCodecs map[MimeType]bool, videoCodecs map[MimeType]bool) OutputType { diff --git a/test/edge.go b/test/edge.go index bac5e150..a2ed1255 100644 --- a/test/edge.go +++ b/test/edge.go @@ -34,8 +34,17 @@ func (r *Runner) testEdgeCases(t *testing.T) { return } - // ParticipantComposite where the participant does not publish a track - r.runParticipantTest(t, "6A/Edge/ParticipantNoPublish", &testCase{}, + t.Run("EdgeCases", func(t *testing.T) { + r.testNoPublish(t) + r.testRtmpFailure(t) + r.testSrtFailure(t) + r.testTrackDisconnection(t) + }) +} + +// ParticipantComposite where the participant never publishes +func (r *Runner) testNoPublish(t *testing.T) { + r.runParticipantTest(t, "ParticipantNoPublish", &testCase{}, func(t *testing.T, identity string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -45,7 +54,6 @@ func (r *Runner) testEdgeCases(t *testing.T) { Identity: identity, FileOutputs: []*livekit.EncodedFileOutput{{ FileType: livekit.EncodedFileType_MP4, - Filepath: "there won't be a file", }}, }, }, @@ -70,9 +78,11 @@ func (r *Runner) testEdgeCases(t *testing.T) { r.room = room }, ) +} - // Stream output with a bad rtmp url or stream key - r.runRoomTest(t, "6B/Edge/RtmpFailure", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { +// RTMP output with no valid urls +func (r *Runner) testRtmpFailure(t *testing.T) { + r.runRoomTest(t, "RtmpFailure", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_RoomComposite{ @@ -81,7 +91,7 @@ func (r *Runner) testEdgeCases(t *testing.T) { Layout: "speaker-light", StreamOutputs: []*livekit.StreamOutput{{ Protocol: livekit.StreamProtocol_RTMP, - Urls: []string{badStreamUrl1}, + Urls: []string{badRtmpUrl1}, }}, }, }, @@ -103,9 +113,45 @@ func (r *Runner) testEdgeCases(t *testing.T) { require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) } }) +} + +// SRT output with a no valid urls +func (r *Runner) testSrtFailure(t *testing.T) { + r.runWebTest(t, "SrtFailure", func(t *testing.T) { + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Web{ + Web: &livekit.WebEgressRequest{ + Url: webUrl, + StreamOutputs: []*livekit.StreamOutput{{ + Protocol: livekit.StreamProtocol_SRT, + Urls: []string{badSrtUrl1}, + }}, + }, + }, + } + + info, err := r.StartEgress(context.Background(), req) + require.NoError(t, err) + require.Empty(t, info.Error) + require.NotEmpty(t, info.EgressId) + require.Equal(t, livekit.EgressStatus_EGRESS_STARTING, info.Status) + + // check update + time.Sleep(time.Second * 5) + info = r.getUpdate(t, info.EgressId) + if info.Status == livekit.EgressStatus_EGRESS_ACTIVE { + r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_FAILED) + } else { + require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) + } + }) + +} - // Track composite with data loss due to a disconnection - t.Run("6C/Edge/TrackDisconnection", func(t *testing.T) { +// Track composite with data loss due to a disconnection +func (r *Runner) testTrackDisconnection(t *testing.T) { + run(t, "TrackDisconnection", func(t *testing.T) { r.awaitIdle(t) test := &testCase{ diff --git a/test/ffprobe.go b/test/ffprobe.go index ae9abb4b..f5541242 100644 --- a/test/ffprobe.go +++ b/test/ffprobe.go @@ -121,15 +121,6 @@ func verify(t *testing.T, in string, p *config.PipelineConfig, res *livekit.Egre require.NoError(t, err, "input %s does not exist", in) } - switch p.Outputs[egressType][0].GetOutputType() { - case types.OutputTypeRaw: - require.Equal(t, 0, info.Format.ProbeScore) - case types.OutputTypeIVF: - require.Equal(t, 98, info.Format.ProbeScore) - default: - require.Equal(t, 100, info.Format.ProbeScore) - } - switch egressType { case types.EgressTypeFile: // size diff --git a/test/integration.go b/test/integration.go index 7b40c9e7..17cae744 100644 --- a/test/integration.go +++ b/test/integration.go @@ -36,15 +36,7 @@ import ( const ( muteDuration = time.Second * 10 - streamUrl1 = "rtmp://localhost:1935/live/stream" - redactedUrl1 = "rtmp://localhost:1935/live/{st...am}" - streamUrl2 = "rtmp://localhost:1935/live/stream_key" - redactedUrl2 = "rtmp://localhost:1935/live/{str...key}" - badStreamUrl1 = "rtmp://xxx.contribute.live-video.net/app/fake1" - redactedBadUrl1 = "rtmp://xxx.contribute.live-video.net/app/{f...1}" - badStreamUrl2 = "rtmp://localhost:1936/live/stream" - redactedBadUrl2 = "rtmp://localhost:1936/live/{st...am}" - webUrl = "https://videoplayer-2k23.vercel.app/videos/eminem" + webUrl = "https://videoplayer-2k23.vercel.app/videos/eminem" ) var ( @@ -94,7 +86,7 @@ type testCase struct { videoUnpublish time.Duration videoRepublish time.Duration - // used by track tests + // used by track and stream tests outputType types.OutputType expectVideoEncoding bool @@ -271,7 +263,7 @@ func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[s require.Equal(t, len(expected), len(info.StreamResults)) for _, s := range info.StreamResults { if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { - require.Equal(t, expected[redactedBadUrl1], s.Status) + require.Equal(t, expected[badRtmpUrl1Redacted], s.Status) } else { require.Equal(t, expected[s.Url], s.Status) } diff --git a/test/multi.go b/test/multi.go index ef12e48e..dc96087d 100644 --- a/test/multi.go +++ b/test/multi.go @@ -44,14 +44,14 @@ func (r *Runner) runMultipleTest( if stream { _, err = r.client.UpdateStream(context.Background(), egressID, &livekit.UpdateStreamRequest{ EgressId: egressID, - AddOutputUrls: []string{streamUrl1}, + AddOutputUrls: []string{rtmpUrl1}, }) require.NoError(t, err) time.Sleep(time.Second * 10) - r.verifyStreams(t, p, streamUrl1) + r.verifyStreams(t, p, rtmpUrl1) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - redactedUrl1: livekit.StreamInfo_ACTIVE, + rtmpUrl1Redacted: livekit.StreamInfo_ACTIVE, }) time.Sleep(time.Second * 10) } else { diff --git a/test/participant.go b/test/participant.go index e5c2fba2..9d4e45c3 100644 --- a/test/participant.go +++ b/test/participant.go @@ -33,17 +33,19 @@ func (r *Runner) testParticipant(t *testing.T) { } r.sourceFramerate = 23.97 - r.testParticipantFile(t) - r.testParticipantStream(t) - r.testParticipantSegments(t) - r.testParticipantMulti(t) + t.Run("Participant", func(t *testing.T) { + r.testParticipantFile(t) + r.testParticipantStream(t) + r.testParticipantSegments(t) + r.testParticipantMulti(t) + }) } func (r *Runner) runParticipantTest( t *testing.T, name string, test *testCase, f func(t *testing.T, identity string), ) { - t.Run(name, func(t *testing.T) { + run(t, name, func(t *testing.T) { r.awaitIdle(t) r.publishSampleOffset(t, test.audioCodec, test.audioDelay, test.audioUnpublish) if test.audioRepublish != 0 { @@ -62,79 +64,77 @@ func (r *Runner) testParticipantFile(t *testing.T) { return } - t.Run("3A/Participant/File", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "VP8", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - audioDelay: time.Second * 8, - audioUnpublish: time.Second * 14, - audioRepublish: time.Second * 20, - videoCodec: types.MimeTypeVP8, - filename: "participant_{publisher_identity}_vp8_{time}.mp4", - }, - { - name: "H264", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - videoUnpublish: time.Second * 10, - videoRepublish: time.Second * 20, - filename: "participant_{room_name}_h264_{time}.mp4", - }, - { - name: "AudioOnly", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - audioUnpublish: time.Second * 10, - audioRepublish: time.Second * 15, - filename: "participant_{room_name}_{time}.mp4", - }, - } { - r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) { - var fileOutput *livekit.EncodedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } - } - - participantRequest := &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + for _, test := range []*testCase{ + { + name: "File/VP8", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 8, + audioUnpublish: time.Second * 14, + audioRepublish: time.Second * 20, + videoCodec: types.MimeTypeVP8, + filename: "participant_{publisher_identity}_vp8_{time}.mp4", + }, + { + name: "File/H264", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + videoUnpublish: time.Second * 10, + videoRepublish: time.Second * 20, + filename: "participant_{room_name}_h264_{time}.mp4", + }, + { + name: "File/AudioOnly", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + audioUnpublish: time.Second * 10, + audioRepublish: time.Second * 15, + filename: "participant_{room_name}_{time}.mp4", + }, + } { + r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) { + var fileOutput *livekit.EncodedFileOutput + if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(uploadPrefix, test.filename), + Output: &livekit.EncodedFileOutput_Azure{ + Azure: r.AzureUpload, + }, } - if test.options != nil { - participantRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ - Advanced: test.options, - } + } else { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(r.FilePrefix, test.filename), } + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: participantRequest, - }, + participantRequest := &livekit.ParticipantEgressRequest{ + RoomName: r.room.Name(), + Identity: identity, + FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + } + if test.options != nil { + participantRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ + Advanced: test.options, } + } - test.expectVideoEncoding = true - r.runFileTest(t, req, test) - }) - if r.Short { - return + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{ + Participant: participantRequest, + }, } + + test.expectVideoEncoding = true + r.runFileTest(t, req, test) + }) + if r.Short { + return } - }) + } } func (r *Runner) testParticipantStream(t *testing.T) { @@ -148,7 +148,7 @@ func (r *Runner) testParticipantStream(t *testing.T) { videoCodec: types.MimeTypeVP8, } - r.runParticipantTest(t, "3B/Participant/Stream", test, + r.runParticipantTest(t, "Stream", test, func(t *testing.T, identity string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -157,13 +157,16 @@ func (r *Runner) testParticipantStream(t *testing.T) { RoomName: r.room.Name(), Identity: identity, StreamOutputs: []*livekit.StreamOutput{{ - Urls: []string{streamUrl1, badStreamUrl1}, + Urls: []string{rtmpUrl1, badRtmpUrl1}, }}, }, }, } - r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) + r.runStreamTest(t, req, &testCase{ + expectVideoEncoding: true, + outputType: types.OutputTypeRTMP, + }) }, ) } @@ -173,74 +176,72 @@ func (r *Runner) testParticipantSegments(t *testing.T) { return } - t.Run("3C/Participant/Segments", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "VP8", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeVP8, - // videoDelay: time.Second * 10, - // videoUnpublish: time.Second * 20, - filename: "participant_{publisher_identity}_vp8_{time}", - playlist: "participant_{publisher_identity}_vp8_{time}.m3u8", - }, - { - name: "H264", - audioCodec: types.MimeTypeOpus, - audioDelay: time.Second * 10, - audioUnpublish: time.Second * 20, - videoCodec: types.MimeTypeH264, - filename: "participant_{room_name}_h264_{time}", - playlist: "participant_{room_name}_h264_{time}.m3u8", - }, - } { - r.runParticipantTest(t, test.name, test, - func(t *testing.T, identity string) { - var segmentOutput *livekit.SegmentedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(uploadPrefix, test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, - Output: &livekit.SegmentedFileOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - PlaylistName: test.playlist, - FilenameSuffix: test.filenameSuffix, - } - } - - trackRequest := &livekit.ParticipantEgressRequest{ - RoomName: r.room.Name(), - Identity: identity, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + for _, test := range []*testCase{ + { + name: "Segments/VP8", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + // videoDelay: time.Second * 10, + // videoUnpublish: time.Second * 20, + filename: "participant_{publisher_identity}_vp8_{time}", + playlist: "participant_{publisher_identity}_vp8_{time}.m3u8", + }, + { + name: "Segments/H264", + audioCodec: types.MimeTypeOpus, + audioDelay: time.Second * 10, + audioUnpublish: time.Second * 20, + videoCodec: types.MimeTypeH264, + filename: "participant_{room_name}_h264_{time}", + playlist: "participant_{room_name}_h264_{time}.m3u8", + }, + } { + r.runParticipantTest(t, test.name, test, + func(t *testing.T, identity string) { + var segmentOutput *livekit.SegmentedFileOutput + if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(uploadPrefix, test.filename), + PlaylistName: test.playlist, + FilenameSuffix: test.filenameSuffix, + Output: &livekit.SegmentedFileOutput_S3{ + S3: r.S3Upload, + }, } - if test.options != nil { - trackRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ - Advanced: test.options, - } + } else { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(r.FilePrefix, test.filename), + PlaylistName: test.playlist, + FilenameSuffix: test.filenameSuffix, } + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Participant{ - Participant: trackRequest, - }, + trackRequest := &livekit.ParticipantEgressRequest{ + RoomName: r.room.Name(), + Identity: identity, + SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.ParticipantEgressRequest_Advanced{ + Advanced: test.options, } - test.expectVideoEncoding = true + } - r.runSegmentsTest(t, req, test) - }, - ) - if r.Short { - return - } + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Participant{ + Participant: trackRequest, + }, + } + test.expectVideoEncoding = true + + r.runSegmentsTest(t, req, test) + }, + ) + if r.Short { + return } - }) + } } func (r *Runner) testParticipantMulti(t *testing.T) { @@ -255,7 +256,7 @@ func (r *Runner) testParticipantMulti(t *testing.T) { videoDelay: time.Second * 5, } - r.runParticipantTest(t, "3D/Participant/Multi", test, + r.runParticipantTest(t, "Multi", test, func(t *testing.T, identity string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), diff --git a/test/room_composite.go b/test/room_composite.go index 8eed79c6..2c953efb 100644 --- a/test/room_composite.go +++ b/test/room_composite.go @@ -32,15 +32,17 @@ func (r *Runner) testRoomComposite(t *testing.T) { } r.sourceFramerate = 30 - r.testRoomCompositeFile(t) - r.testRoomCompositeStream(t) - r.testRoomCompositeSegments(t) - r.testRoomCompositeImages(t) - r.testRoomCompositeMulti(t) + t.Run("RoomComposite", func(t *testing.T) { + r.testRoomCompositeFile(t) + r.testRoomCompositeStream(t) + r.testRoomCompositeSegments(t) + r.testRoomCompositeImages(t) + r.testRoomCompositeMulti(t) + }) } func (r *Runner) runRoomTest(t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T)) { - t.Run(name, func(t *testing.T) { + run(t, name, func(t *testing.T) { r.awaitIdle(t) r.publishSamples(t, audioCodec, videoCodec) f(t) @@ -52,81 +54,79 @@ func (r *Runner) testRoomCompositeFile(t *testing.T) { return } - t.Run("1A/RoomComposite/File", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "Base", - filename: "r_{room_name}_{time}.mp4", - expectVideoEncoding: true, - }, - { - name: "Video-Only", - videoOnly: true, - options: &livekit.EncodingOptions{ - VideoCodec: livekit.VideoCodec_H264_HIGH, - }, - filename: "r_{room_name}_video_{time}.mp4", - expectVideoEncoding: true, + for _, test := range []*testCase{ + { + name: "File/Base", + filename: "r_{room_name}_{time}.mp4", + expectVideoEncoding: true, + }, + { + name: "File/Video-Only", + videoOnly: true, + options: &livekit.EncodingOptions{ + VideoCodec: livekit.VideoCodec_H264_HIGH, }, - { - name: "Audio-Only", - fileType: livekit.EncodedFileType_OGG, - audioOnly: true, - options: &livekit.EncodingOptions{ - AudioCodec: livekit.AudioCodec_OPUS, - }, - filename: "r_{room_name}_audio_{time}", - expectVideoEncoding: false, + filename: "r_{room_name}_video_{time}.mp4", + expectVideoEncoding: true, + }, + { + name: "File/Audio-Only", + fileType: livekit.EncodedFileType_OGG, + audioOnly: true, + options: &livekit.EncodingOptions{ + AudioCodec: livekit.AudioCodec_OPUS, }, - } { - r.runRoomTest(t, test.name, types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { - var fileOutput *livekit.EncodedFileOutput - if r.S3Upload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } - } - - roomRequest := &livekit.RoomCompositeEgressRequest{ - RoomName: r.room.Name(), - Layout: "speaker-dark", - AudioOnly: test.audioOnly, - VideoOnly: test.videoOnly, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + filename: "r_{room_name}_audio_{time}", + expectVideoEncoding: false, + }, + } { + r.runRoomTest(t, test.name, types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { + var fileOutput *livekit.EncodedFileOutput + if r.S3Upload != nil { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(uploadPrefix, test.filename), + Output: &livekit.EncodedFileOutput_S3{ + S3: r.S3Upload, + }, } - if test.options != nil { - roomRequest.Options = &livekit.RoomCompositeEgressRequest_Advanced{ - Advanced: test.options, - } - } else if test.preset != 0 { - roomRequest.Options = &livekit.RoomCompositeEgressRequest_Preset{ - Preset: test.preset, - } + } else { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(r.FilePrefix, test.filename), } + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: roomRequest, - }, + roomRequest := &livekit.RoomCompositeEgressRequest{ + RoomName: r.room.Name(), + Layout: "speaker-dark", + AudioOnly: test.audioOnly, + VideoOnly: test.videoOnly, + FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + } + if test.options != nil { + roomRequest.Options = &livekit.RoomCompositeEgressRequest_Advanced{ + Advanced: test.options, } + } else if test.preset != 0 { + roomRequest.Options = &livekit.RoomCompositeEgressRequest_Preset{ + Preset: test.preset, + } + } - r.runFileTest(t, req, test) - }) - if r.Short { - return + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_RoomComposite{ + RoomComposite: roomRequest, + }, } + + r.runFileTest(t, req, test) + }) + if r.Short { + return } - }) + } } func (r *Runner) testRoomCompositeStream(t *testing.T) { @@ -134,7 +134,7 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { return } - r.runRoomTest(t, "1B/RoomComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { + r.runRoomTest(t, "Stream", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_RoomComposite{ @@ -143,13 +143,16 @@ func (r *Runner) testRoomCompositeStream(t *testing.T) { Layout: "grid-light", StreamOutputs: []*livekit.StreamOutput{{ Protocol: livekit.StreamProtocol_RTMP, - Urls: []string{streamUrl1, badStreamUrl1}, + Urls: []string{rtmpUrl1, badRtmpUrl1}, }}, }, }, } - r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) + r.runStreamTest(t, req, &testCase{ + expectVideoEncoding: true, + outputType: types.OutputTypeRTMP, + }) }) } @@ -158,7 +161,7 @@ func (r *Runner) testRoomCompositeSegments(t *testing.T) { return } - r.runRoomTest(t, "1C/RoomComposite/Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { + r.runRoomTest(t, "Segments", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { for _, test := range []*testCase{ { options: &livekit.EncodingOptions{ @@ -233,7 +236,7 @@ func (r *Runner) testRoomCompositeImages(t *testing.T) { return } - r.runRoomTest(t, "1D/RoomComposite/Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { + r.runRoomTest(t, "Images", types.MimeTypeOpus, types.MimeTypeH264, func(t *testing.T) { for _, test := range []*testCase{ { options: &livekit.EncodingOptions{ @@ -278,7 +281,7 @@ func (r *Runner) testRoomCompositeMulti(t *testing.T) { return } - r.runRoomTest(t, "1E/RoomComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { + r.runRoomTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_RoomComposite{ diff --git a/test/runner.go b/test/runner.go index b0e2097a..0efc0eec 100644 --- a/test/runner.go +++ b/test/runner.go @@ -229,3 +229,10 @@ func (r *Runner) RunTests(t *testing.T) { r.testTrack(t) r.testEdgeCases(t) } + +var testNumber int + +func run(t *testing.T, name string, f func(t *testing.T)) { + testNumber++ + t.Run(fmt.Sprintf("%d/%s", testNumber, name), f) +} diff --git a/test/stream.go b/test/stream.go index 9223b6ab..1d498658 100644 --- a/test/stream.go +++ b/test/stream.go @@ -29,12 +29,44 @@ import ( "github.com/livekit/protocol/rpc" ) +const ( + rtmpUrl1 = "rtmp://localhost:1935/live/stream" + rtmpUrl1Redacted = "rtmp://localhost:1935/live/{st...am}" + rtmpUrl2 = "rtmp://localhost:1935/live/stream_key" + rtmpUrl2Redacted = "rtmp://localhost:1935/live/{str...key}" + badRtmpUrl1 = "rtmp://xxx.contribute.live-video.net/app/fake1" + badRtmpUrl1Redacted = "rtmp://xxx.contribute.live-video.net/app/{f...1}" + badRtmpUrl2 = "rtmp://localhost:1936/live/stream" + badRtmpUrl2Redacted = "rtmp://localhost:1936/live/{st...am}" + srtPublishUrl1 = "srt://localhost:8890?streamid=publish:mystream&pkt_size=1316" + srtReadUrl1 = "srt://localhost:8890?streamid=read:mystream" + srtPublishUrl2 = "srt://localhost:8890?streamid=publish:otherstream&pkt_size=1316" + srtReadUrl2 = "srt://localhost:8890?streamid=read:otherstream" + badSrtUrl1 = "srt://localhost:8891?streamid=publish:wrongport&pkt_size=1316" + badSrtUrl2 = "srt://localhost:8891?streamid=publish:badstream&pkt_size=1316" +) + +// [[publish, redacted, verification]] +var streamUrls = map[types.OutputType][][]string{ + types.OutputTypeRTMP: { + {rtmpUrl1, rtmpUrl1Redacted, rtmpUrl1}, + {badRtmpUrl1, badRtmpUrl1Redacted, ""}, + {rtmpUrl2, rtmpUrl2Redacted, rtmpUrl2}, + {badRtmpUrl2, badRtmpUrl2Redacted, ""}, + }, + types.OutputTypeSRT: { + {srtPublishUrl1, srtPublishUrl1, srtReadUrl1}, + {badSrtUrl1, badSrtUrl1, ""}, + {srtPublishUrl2, srtPublishUrl2, srtReadUrl2}, + {badSrtUrl2, badSrtUrl2, ""}, + }, +} + func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test *testCase) { ctx := context.Background() - + urls := streamUrls[test.outputType] egressID := r.startEgress(t, req) - // get params p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) require.NoError(t, err) require.Equal(t, test.expectVideoEncoding, p.VideoEncoding) @@ -44,40 +76,40 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * // verify and check update time.Sleep(time.Second * 5) - r.verifyStreams(t, p, streamUrl1) + + r.verifyStreams(t, p, urls[0][2]) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - redactedUrl1: livekit.StreamInfo_ACTIVE, - redactedBadUrl1: livekit.StreamInfo_FAILED, + urls[0][1]: livekit.StreamInfo_ACTIVE, + urls[1][1]: livekit.StreamInfo_FAILED, }) // add one good stream url and one bad _, err = r.client.UpdateStream(ctx, egressID, &livekit.UpdateStreamRequest{ EgressId: egressID, - AddOutputUrls: []string{badStreamUrl2, streamUrl2}, + AddOutputUrls: []string{urls[2][0], urls[3][0]}, }) require.NoError(t, err) - - // verify and check updates time.Sleep(time.Second * 5) - r.verifyStreams(t, p, streamUrl1, streamUrl2) + // verify and check updates + r.verifyStreams(t, p, urls[0][2], urls[2][2]) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - redactedUrl1: livekit.StreamInfo_ACTIVE, - redactedUrl2: livekit.StreamInfo_ACTIVE, - redactedBadUrl1: livekit.StreamInfo_FAILED, - redactedBadUrl2: livekit.StreamInfo_ACTIVE, + urls[0][1]: livekit.StreamInfo_ACTIVE, + urls[1][1]: livekit.StreamInfo_FAILED, + urls[2][1]: livekit.StreamInfo_ACTIVE, + urls[3][1]: livekit.StreamInfo_ACTIVE, }) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - redactedUrl1: livekit.StreamInfo_ACTIVE, - redactedUrl2: livekit.StreamInfo_ACTIVE, - redactedBadUrl1: livekit.StreamInfo_FAILED, - redactedBadUrl2: livekit.StreamInfo_FAILED, + urls[0][1]: livekit.StreamInfo_ACTIVE, + urls[1][1]: livekit.StreamInfo_FAILED, + urls[2][1]: livekit.StreamInfo_ACTIVE, + urls[3][1]: livekit.StreamInfo_FAILED, }) // remove one of the stream urls _, err = r.client.UpdateStream(ctx, egressID, &livekit.UpdateStreamRequest{ EgressId: egressID, - RemoveOutputUrls: []string{streamUrl1}, + RemoveOutputUrls: []string{urls[0][0]}, }) require.NoError(t, err) @@ -87,12 +119,12 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * } // verify the remaining stream - r.verifyStreams(t, p, streamUrl2) + r.verifyStreams(t, p, urls[2][2]) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - redactedUrl1: livekit.StreamInfo_FINISHED, - redactedUrl2: livekit.StreamInfo_ACTIVE, - redactedBadUrl1: livekit.StreamInfo_FAILED, - redactedBadUrl2: livekit.StreamInfo_FAILED, + urls[0][1]: livekit.StreamInfo_FINISHED, + urls[1][1]: livekit.StreamInfo_FAILED, + urls[2][1]: livekit.StreamInfo_ACTIVE, + urls[3][1]: livekit.StreamInfo_FAILED, }) // stop @@ -105,17 +137,17 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * require.NotZero(t, res.EndedAt) // check stream info - require.Len(t, res.StreamResults, 4) + require.Len(t, res.StreamResults, 2) for _, info := range res.StreamResults { require.NotZero(t, info.StartedAt) require.NotZero(t, info.EndedAt) switch info.Url { - case redactedUrl1: + case urls[0][0]: require.Equal(t, livekit.StreamInfo_FINISHED.String(), info.Status.String()) require.Greater(t, float64(info.Duration)/1e9, 15.0) - case redactedUrl2: + case urls[2][0]: require.Equal(t, livekit.StreamInfo_FINISHED.String(), info.Status.String()) require.Greater(t, float64(info.Duration)/1e9, 10.0) diff --git a/test/track.go b/test/track.go index e22470d6..a049c2b7 100644 --- a/test/track.go +++ b/test/track.go @@ -43,8 +43,10 @@ func (r *Runner) testTrack(t *testing.T) { } r.sourceFramerate = 23.97 - r.testTrackFile(t) - r.testTrackStream(t) + t.Run("Track", func(t *testing.T) { + r.testTrackFile(t) + r.testTrackStream(t) + }) } func (r *Runner) testTrackFile(t *testing.T) { @@ -52,67 +54,65 @@ func (r *Runner) testTrackFile(t *testing.T) { return } - t.Run("5A/Track/File", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "OPUS", - audioOnly: true, - audioCodec: types.MimeTypeOpus, - outputType: types.OutputTypeOGG, - filename: "t_{track_source}_{time}.ogg", - }, - { - name: "H264", - videoOnly: true, - videoCodec: types.MimeTypeH264, - outputType: types.OutputTypeMP4, - filename: "t_{track_id}_{time}.mp4", - }, - { - name: "VP8", - videoOnly: true, - videoCodec: types.MimeTypeVP8, - outputType: types.OutputTypeWebM, - filename: "t_{track_type}_{time}.webm", - }, - // { - // name: "VP9", - // videoOnly: true, - // videoCodec: types.MimeTypeVP9, - // outputType: types.OutputTypeWebM, - // filename: "t_{track_type}_{time}.webm", - // }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { - trackID := audioTrackID - if trackID == "" { - trackID = videoTrackID - } - - trackRequest := &livekit.TrackEgressRequest{ - RoomName: r.room.Name(), - TrackId: trackID, - Output: &livekit.TrackEgressRequest_File{ - File: &livekit.DirectFileOutput{ - Filepath: path.Join(r.FilePrefix, test.filename), - }, - }, - } + for _, test := range []*testCase{ + { + name: "File/OPUS", + audioOnly: true, + audioCodec: types.MimeTypeOpus, + outputType: types.OutputTypeOGG, + filename: "t_{track_source}_{time}.ogg", + }, + { + name: "File/H264", + videoOnly: true, + videoCodec: types.MimeTypeH264, + outputType: types.OutputTypeMP4, + filename: "t_{track_id}_{time}.mp4", + }, + { + name: "File/VP8", + videoOnly: true, + videoCodec: types.MimeTypeVP8, + outputType: types.OutputTypeWebM, + filename: "t_{track_type}_{time}.webm", + }, + // { + // name: "VP9", + // videoOnly: true, + // videoCodec: types.MimeTypeVP9, + // outputType: types.OutputTypeWebM, + // filename: "t_{track_type}_{time}.webm", + // }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { + trackID := audioTrackID + if trackID == "" { + trackID = videoTrackID + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Track{ - Track: trackRequest, + trackRequest := &livekit.TrackEgressRequest{ + RoomName: r.room.Name(), + TrackId: trackID, + Output: &livekit.TrackEgressRequest_File{ + File: &livekit.DirectFileOutput{ + Filepath: path.Join(r.FilePrefix, test.filename), }, - } + }, + } - r.runFileTest(t, req, test) - }) - if r.Short { - return + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Track{ + Track: trackRequest, + }, } + + r.runFileTest(t, req, test) + }) + if r.Short { + return } - }) + } } func (r *Runner) testTrackStream(t *testing.T) { @@ -120,7 +120,7 @@ func (r *Runner) testTrackStream(t *testing.T) { return } - t.Run("5B/Track/Stream", func(t *testing.T) { + run(t, "Stream", func(t *testing.T) { now := time.Now().Unix() for _, test := range []*testCase{ { diff --git a/test/track_composite.go b/test/track_composite.go index 553460ed..aa563f2d 100644 --- a/test/track_composite.go +++ b/test/track_composite.go @@ -32,18 +32,20 @@ func (r *Runner) testTrackComposite(t *testing.T) { } r.sourceFramerate = 23.97 - r.testTrackCompositeFile(t) - r.testTrackCompositeStream(t) - r.testTrackCompositeSegments(t) - r.testTrackCompositeImages(t) - r.testTrackCompositeMulti(t) + t.Run("TrackComposite", func(t *testing.T) { + r.testTrackCompositeFile(t) + r.testTrackCompositeStream(t) + r.testTrackCompositeSegments(t) + r.testTrackCompositeImages(t) + r.testTrackCompositeMulti(t) + }) } func (r *Runner) runTrackTest( t *testing.T, name string, audioCodec, videoCodec types.MimeType, f func(t *testing.T, audioTrackID, videoTrackID string), ) { - t.Run(name, func(t *testing.T) { + run(t, name, func(t *testing.T) { r.awaitIdle(t) audioTrackID, videoTrackID := r.publishSamples(t, audioCodec, videoCodec) f(t, audioTrackID, videoTrackID) @@ -55,75 +57,73 @@ func (r *Runner) testTrackCompositeFile(t *testing.T) { return } - t.Run("4A/TrackComposite/File", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "VP8", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeVP8, - filename: "tc_{publisher_identity}_vp8_{time}.mp4", - }, - { - name: "H264", - fileType: livekit.EncodedFileType_MP4, - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - filename: "tc_{room_name}_h264_{time}.mp4", - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { - var aID, vID string - if !test.audioOnly { - vID = videoTrackID - } - if !test.videoOnly { - aID = audioTrackID - } - - var fileOutput *livekit.EncodedFileOutput - if r.AzureUpload != nil { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(uploadPrefix, test.filename), - Output: &livekit.EncodedFileOutput_Azure{ - Azure: r.AzureUpload, - }, - } - } else { - fileOutput = &livekit.EncodedFileOutput{ - FileType: test.fileType, - Filepath: path.Join(r.FilePrefix, test.filename), - } - } + for _, test := range []*testCase{ + { + name: "File/VP8", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + filename: "tc_{publisher_identity}_vp8_{time}.mp4", + }, + { + name: "File/H264", + fileType: livekit.EncodedFileType_MP4, + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + filename: "tc_{room_name}_h264_{time}.mp4", + }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { + var aID, vID string + if !test.audioOnly { + vID = videoTrackID + } + if !test.videoOnly { + aID = audioTrackID + } - trackRequest := &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: aID, - VideoTrackId: vID, - FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + var fileOutput *livekit.EncodedFileOutput + if r.AzureUpload != nil { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(uploadPrefix, test.filename), + Output: &livekit.EncodedFileOutput_Azure{ + Azure: r.AzureUpload, + }, } - if test.options != nil { - trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ - Advanced: test.options, - } + } else { + fileOutput = &livekit.EncodedFileOutput{ + FileType: test.fileType, + Filepath: path.Join(r.FilePrefix, test.filename), } + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: trackRequest, - }, + trackRequest := &livekit.TrackCompositeEgressRequest{ + RoomName: r.room.Name(), + AudioTrackId: aID, + VideoTrackId: vID, + FileOutputs: []*livekit.EncodedFileOutput{fileOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ + Advanced: test.options, } + } - test.expectVideoEncoding = true - r.runFileTest(t, req, test) - }) - if r.Short { - return + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_TrackComposite{ + TrackComposite: trackRequest, + }, } + + test.expectVideoEncoding = true + r.runFileTest(t, req, test) + }) + if r.Short { + return } - }) + } } func (r *Runner) testTrackCompositeStream(t *testing.T) { @@ -131,7 +131,7 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) { return } - r.runTrackTest(t, "4B/TrackComposite/Stream", types.MimeTypeOpus, types.MimeTypeVP8, + r.runTrackTest(t, "Stream", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T, audioTrackID, videoTrackID string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), @@ -141,13 +141,16 @@ func (r *Runner) testTrackCompositeStream(t *testing.T) { AudioTrackId: audioTrackID, VideoTrackId: videoTrackID, StreamOutputs: []*livekit.StreamOutput{{ - Urls: []string{streamUrl1, badStreamUrl1}, + Urls: []string{rtmpUrl1, badRtmpUrl1}, }}, }, }, } - r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) + r.runStreamTest(t, req, &testCase{ + expectVideoEncoding: true, + outputType: types.OutputTypeRTMP, + }) }, ) } @@ -157,89 +160,87 @@ func (r *Runner) testTrackCompositeSegments(t *testing.T) { return } - t.Run("4C/TrackComposite/Segments", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "VP8", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeVP8, - filename: "tcs_{publisher_identity}_vp8_{time}", - playlist: "tcs_{publisher_identity}_vp8_{time}.m3u8", - }, - { - name: "H264", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - filename: "tcs_{room_name}_h264_{time}", - playlist: "tcs_{room_name}_h264_{time}.m3u8", - livePlaylist: "tcs_live_{room_name}_h264_{time}.m3u8", - }, - { - name: "Audio Only", - audioCodec: types.MimeTypeOpus, - filename: "tcs_{room_name}_audio_{time}", - playlist: "tcs_{room_name}_audio_{time}.m3u8", - audioOnly: true, - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, - func(t *testing.T, audioTrackID, videoTrackID string) { - var aID, vID string - if !test.audioOnly { - vID = videoTrackID - } - if !test.videoOnly { - aID = audioTrackID - } - - var segmentOutput *livekit.SegmentedFileOutput - if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(uploadPrefix, test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - Output: &livekit.SegmentedFileOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - segmentOutput = &livekit.SegmentedFileOutput{ - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - PlaylistName: test.playlist, - LivePlaylistName: test.livePlaylist, - FilenameSuffix: test.filenameSuffix, - } - } + for _, test := range []*testCase{ + { + name: "Segments/VP8", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeVP8, + filename: "tcs_{publisher_identity}_vp8_{time}", + playlist: "tcs_{publisher_identity}_vp8_{time}.m3u8", + }, + { + name: "Segments/H264", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + filename: "tcs_{room_name}_h264_{time}", + playlist: "tcs_{room_name}_h264_{time}.m3u8", + livePlaylist: "tcs_live_{room_name}_h264_{time}.m3u8", + }, + { + name: "Segments/Audio-Only", + audioCodec: types.MimeTypeOpus, + filename: "tcs_{room_name}_audio_{time}", + playlist: "tcs_{room_name}_audio_{time}.m3u8", + audioOnly: true, + }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, + func(t *testing.T, audioTrackID, videoTrackID string) { + var aID, vID string + if !test.audioOnly { + vID = videoTrackID + } + if !test.videoOnly { + aID = audioTrackID + } - trackRequest := &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: aID, - VideoTrackId: vID, - SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + var segmentOutput *livekit.SegmentedFileOutput + if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(uploadPrefix, test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.livePlaylist, + FilenameSuffix: test.filenameSuffix, + Output: &livekit.SegmentedFileOutput_S3{ + S3: r.S3Upload, + }, } - if test.options != nil { - trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ - Advanced: test.options, - } + } else { + segmentOutput = &livekit.SegmentedFileOutput{ + FilenamePrefix: path.Join(r.FilePrefix, test.filename), + PlaylistName: test.playlist, + LivePlaylistName: test.livePlaylist, + FilenameSuffix: test.filenameSuffix, } + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: trackRequest, - }, + trackRequest := &livekit.TrackCompositeEgressRequest{ + RoomName: r.room.Name(), + AudioTrackId: aID, + VideoTrackId: vID, + SegmentOutputs: []*livekit.SegmentedFileOutput{segmentOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ + Advanced: test.options, } - test.expectVideoEncoding = true + } - r.runSegmentsTest(t, req, test) - }, - ) - if r.Short { - return - } + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_TrackComposite{ + TrackComposite: trackRequest, + }, + } + test.expectVideoEncoding = true + + r.runSegmentsTest(t, req, test) + }, + ) + if r.Short { + return } - }) + } } func (r *Runner) testTrackCompositeImages(t *testing.T) { @@ -247,71 +248,69 @@ func (r *Runner) testTrackCompositeImages(t *testing.T) { return } - t.Run("4D/TrackComposite/Images", func(t *testing.T) { - for _, test := range []*testCase{ - { - name: "H264", - audioCodec: types.MimeTypeOpus, - videoCodec: types.MimeTypeH264, - filename: "tc_{publisher_identity}_h264", - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, - func(t *testing.T, audioTrackID, videoTrackID string) { - var aID, vID string - if !test.audioOnly { - vID = videoTrackID - } - if !test.videoOnly { - aID = audioTrackID - } - - var imageOutput *livekit.ImageOutput - if r.S3Upload != nil { - imageOutput = &livekit.ImageOutput{ - CaptureInterval: 5, - Width: 1280, - Height: 720, - FilenamePrefix: path.Join(uploadPrefix, test.filename), - Output: &livekit.ImageOutput_S3{ - S3: r.S3Upload, - }, - } - } else { - imageOutput = &livekit.ImageOutput{ - CaptureInterval: 5, - Width: 1280, - Height: 720, - FilenamePrefix: path.Join(r.FilePrefix, test.filename), - } - } + for _, test := range []*testCase{ + { + name: "Images/H264", + audioCodec: types.MimeTypeOpus, + videoCodec: types.MimeTypeH264, + filename: "tc_{publisher_identity}_h264", + }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, + func(t *testing.T, audioTrackID, videoTrackID string) { + var aID, vID string + if !test.audioOnly { + vID = videoTrackID + } + if !test.videoOnly { + aID = audioTrackID + } - trackRequest := &livekit.TrackCompositeEgressRequest{ - RoomName: r.room.Name(), - AudioTrackId: aID, - VideoTrackId: vID, - ImageOutputs: []*livekit.ImageOutput{imageOutput}, + var imageOutput *livekit.ImageOutput + if r.S3Upload != nil { + imageOutput = &livekit.ImageOutput{ + CaptureInterval: 5, + Width: 1280, + Height: 720, + FilenamePrefix: path.Join(uploadPrefix, test.filename), + Output: &livekit.ImageOutput_S3{ + S3: r.S3Upload, + }, } - if test.options != nil { - trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ - Advanced: test.options, - } + } else { + imageOutput = &livekit.ImageOutput{ + CaptureInterval: 5, + Width: 1280, + Height: 720, + FilenamePrefix: path.Join(r.FilePrefix, test.filename), } + } - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_TrackComposite{ - TrackComposite: trackRequest, - }, + trackRequest := &livekit.TrackCompositeEgressRequest{ + RoomName: r.room.Name(), + AudioTrackId: aID, + VideoTrackId: vID, + ImageOutputs: []*livekit.ImageOutput{imageOutput}, + } + if test.options != nil { + trackRequest.Options = &livekit.TrackCompositeEgressRequest_Advanced{ + Advanced: test.options, } - r.runImagesTest(t, req, test) - }, - ) - if r.Short { - return - } + } + + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_TrackComposite{ + TrackComposite: trackRequest, + }, + } + r.runImagesTest(t, req, test) + }, + ) + if r.Short { + return } - }) + } } func (r *Runner) testTrackCompositeMulti(t *testing.T) { @@ -319,7 +318,7 @@ func (r *Runner) testTrackCompositeMulti(t *testing.T) { return } - r.runTrackTest(t, "4E/TrackComposite/Multi", types.MimeTypeOpus, types.MimeTypeVP8, + r.runTrackTest(t, "Multi", types.MimeTypeOpus, types.MimeTypeVP8, func(t *testing.T, audioTrackID, videoTrackID string) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), diff --git a/test/web.go b/test/web.go index aafa6ecf..02aaeb23 100644 --- a/test/web.go +++ b/test/web.go @@ -20,6 +20,7 @@ import ( "path" "testing" + "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" @@ -31,14 +32,16 @@ func (r *Runner) testWeb(t *testing.T) { } r.sourceFramerate = 30 - r.testWebFile(t) - r.testWebStream(t) - r.testWebSegments(t) - r.testWebMulti(t) + t.Run("Web", func(t *testing.T) { + r.testWebFile(t) + r.testWebStream(t) + r.testWebSegments(t) + r.testWebMulti(t) + }) } func (r *Runner) runWebTest(t *testing.T, name string, f func(t *testing.T)) { - t.Run(name, func(t *testing.T) { + run(t, name, func(t *testing.T) { r.awaitIdle(t) f(t) }) @@ -49,7 +52,7 @@ func (r *Runner) testWebFile(t *testing.T) { return } - r.runWebTest(t, "2A/Web/File", func(t *testing.T) { + r.runWebTest(t, "File", func(t *testing.T) { var fileOutput *livekit.EncodedFileOutput if r.GCPUpload != nil { fileOutput = &livekit.EncodedFileOutput{ @@ -86,21 +89,24 @@ func (r *Runner) testWebStream(t *testing.T) { return } - r.runWebTest(t, "2B/Web/Stream", func(t *testing.T) { + r.runWebTest(t, "Stream", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), Request: &rpc.StartEgressRequest_Web{ Web: &livekit.WebEgressRequest{ Url: webUrl, StreamOutputs: []*livekit.StreamOutput{{ - Protocol: livekit.StreamProtocol_RTMP, - Urls: []string{badStreamUrl1, streamUrl1}, + Protocol: livekit.StreamProtocol_SRT, + Urls: []string{srtPublishUrl1, badSrtUrl1}, }}, }, }, } - r.runStreamTest(t, req, &testCase{expectVideoEncoding: true}) + r.runStreamTest(t, req, &testCase{ + expectVideoEncoding: true, + outputType: types.OutputTypeSRT, + }) }) } @@ -109,7 +115,7 @@ func (r *Runner) testWebSegments(t *testing.T) { return } - r.runWebTest(t, "2C/Web/Segments", func(t *testing.T) { + r.runWebTest(t, "Segments", func(t *testing.T) { var segmentOutput *livekit.SegmentedFileOutput if r.AzureUpload != nil { segmentOutput = &livekit.SegmentedFileOutput{ @@ -147,7 +153,7 @@ func (r *Runner) testWebMulti(t *testing.T) { return } - r.runWebTest(t, "2D/Web/Multi", func(t *testing.T) { + r.runWebTest(t, "Multi", func(t *testing.T) { req := &rpc.StartEgressRequest{ EgressId: utils.NewGuid(utils.EgressPrefix), From b536f3beddeec57b1ce31dbbd8c47b1bd1cf9a6a Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 23 Jul 2024 00:48:39 -0400 Subject: [PATCH 3/6] more fixes --- go.mod | 4 +- go.sum | 10 +-- pkg/gstreamer/bin.go | 24 +++---- pkg/pipeline/builder/audio.go | 10 +-- pkg/pipeline/builder/stream.go | 112 ++++++++++++++++++--------------- pkg/pipeline/builder/video.go | 2 +- pkg/pipeline/watch.go | 91 ++++++++++++--------------- test/integration.go | 8 +-- test/stream.go | 2 +- test/web.go | 2 + 10 files changed, 125 insertions(+), 140 deletions(-) diff --git a/go.mod b/go.mod index 45702655..8b775659 100644 --- a/go.mod +++ b/go.mod @@ -13,17 +13,15 @@ require ( github.com/go-gst/go-glib v1.0.1 github.com/go-gst/go-gst v1.0.0 github.com/go-jose/go-jose/v3 v3.0.3 - github.com/go-jose/go-jose/v4 v4.0.3 github.com/go-logr/logr v1.4.2 github.com/googleapis/gax-go/v2 v2.12.4 github.com/gorilla/websocket v1.5.2 github.com/livekit/livekit-server v1.6.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.19.2-0.20240722200827-2c910325dbfb + github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad github.com/pion/rtp v1.8.6 - github.com/pion/rtp/v2 v2.0.0 github.com/pion/webrtc/v3 v3.2.43 github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_model v0.6.1 diff --git a/go.sum b/go.sum index 64044129..63a8d21d 100644 --- a/go.sum +++ b/go.sum @@ -98,7 +98,6 @@ github.com/go-gst/go-gst v1.0.0 h1:YBzE3JVZvbrnWWb/iGCXuiaOvHQ7HW+xXUBR++EgEtQ= github.com/go-gst/go-gst v1.0.0/go.mod h1:sQMWMnR98s2B4w52e4IXyGvz75rXV8CZ1bejdPT3KIs= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= -github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -185,8 +184,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec= github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.2-0.20240722200827-2c910325dbfb h1:2B+mVyYk79MQ1UHcVE5GPGBqzS6NrrkymHzpVxNmLvo= -github.com/livekit/protocol v1.19.2-0.20240722200827-2c910325dbfb/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a h1:KqA12sSIgRm4HvwVmN0FBdoN0COUaOXpIezAiJO+Jgc= +github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto= github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad h1:SfX8OBXfUx9WHGEIsJi+rpWMsPhgtlRlQpHk3bnEZrI= @@ -230,7 +229,6 @@ github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9 github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/rtp v1.8.6 h1:MTmn/b0aWWsAzux2AmP8WGllusBVw4NPYPVFFd7jUPw= github.com/pion/rtp v1.8.6/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= -github.com/pion/rtp/v2 v2.0.0/go.mod h1:Vj+rrFbJCT3yxqE/VSwaOo9DQ2pMKGPxuE7hplGOlOs= github.com/pion/sctp v1.8.13/go.mod h1:YKSgO/bO/6aOMP9LCie1DuD7m+GamiK2yIiPM6vH+GA= github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY= github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE= @@ -335,8 +333,6 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -398,8 +394,6 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/pkg/gstreamer/bin.go b/pkg/gstreamer/bin.go index b8f8023a..f5bd09ab 100644 --- a/pkg/gstreamer/bin.go +++ b/pkg/gstreamer/bin.go @@ -151,23 +151,23 @@ func (b *Bin) AddElements(elements ...*gst.Element) error { return nil } -func (b *Bin) RemoveSourceBin(name string) (bool, error) { +func (b *Bin) RemoveSourceBin(name string) error { logger.Debugw(fmt.Sprintf("removing src %s from %s", name, b.bin.GetName())) return b.removeBin(name, gst.PadDirectionSource) } -func (b *Bin) RemoveSinkBin(name string) (bool, error) { +func (b *Bin) RemoveSinkBin(name string) error { logger.Debugw(fmt.Sprintf("removing sink %s from %s", name, b.bin.GetName())) return b.removeBin(name, gst.PadDirectionSink) } -func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) { +func (b *Bin) removeBin(name string, direction gst.PadDirection) error { b.LockStateShared() defer b.UnlockStateShared() state := b.GetStateLocked() if state > StateRunning { - return true, nil + return nil } b.mu.Lock() @@ -192,14 +192,14 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) { } } if bin == nil { - return false, nil + return nil } if state == StateBuilding { if err := b.pipeline.Remove(bin.bin.Element); err != nil { - return false, errors.ErrGstPipelineError(err) + return errors.ErrGstPipelineError(err) } - return true, nil + return nil } if direction == gst.PadDirectionSource { @@ -208,7 +208,7 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) { b.probeRemoveSink(bin) } - return true, nil + return nil } func (b *Bin) probeRemoveSource(src *Bin) { @@ -262,32 +262,24 @@ func (b *Bin) probeRemoveSink(sink *Bin) { return } - logger.Debugw("adding probe") srcGhostPad.AddProbe(gst.PadProbeTypeAllBoth, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn { - logger.Debugw("unlinking") srcGhostPad.Unlink(sinkGhostPad.Pad) - logger.Debugw("sending EOS to sinkGhostPad") sinkGhostPad.Pad.SendEvent(gst.NewEOSEvent()) b.mu.Lock() - logger.Debugw("removing sink bin") err := b.pipeline.Remove(sink.bin.Element) b.mu.Unlock() if err != nil { - logger.Debugw("failed to remove sink bin", "error", err) b.OnError(errors.ErrGstPipelineError(err)) return gst.PadProbeRemove } - logger.Debugw("setting state to null") if err = sink.SetState(gst.StateNull); err != nil { logger.Warnw(fmt.Sprintf("failed to change %s state", sink.bin.GetName()), err) } - logger.Debugw("releasing tee request pad") b.elements[len(b.elements)-1].ReleaseRequestPad(srcGhostPad.GetTarget()) - logger.Debugw("removing tee pad") b.bin.RemovePad(srcGhostPad.Pad) return gst.PadProbeOK }) diff --git a/pkg/pipeline/builder/audio.go b/pkg/pipeline/builder/audio.go index 7658424b..5517b49e 100644 --- a/pkg/pipeline/builder/audio.go +++ b/pkg/pipeline/builder/audio.go @@ -100,13 +100,15 @@ func (b *AudioBin) onTrackRemoved(trackID string) { b.mu.Lock() name, ok := b.names[trackID] + if !ok { + b.mu.Unlock() + return + } delete(b.names, trackID) b.mu.Unlock() - if ok { - if _, err := b.bin.RemoveSourceBin(name); err != nil { - b.bin.OnError(err) - } + if err := b.bin.RemoveSourceBin(name); err != nil { + b.bin.OnError(err) } } diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index 2cea7727..7179e064 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -35,7 +35,6 @@ type StreamBin struct { b *gstreamer.Bin outputType types.OutputType sinks map[string]*StreamSink - removals map[string]struct{} } type StreamSink struct { @@ -44,6 +43,7 @@ type StreamSink struct { url string reconnections int disconnectedAt time.Time + failed bool } func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*StreamBin, *gstreamer.Bin, error) { @@ -102,7 +102,6 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St b: b, outputType: o.OutputType, sinks: make(map[string]*StreamSink), - removals: make(map[string]struct{}), } for _, url := range o.Urls { @@ -130,18 +129,12 @@ func (sb *StreamBin) AddStream(url string) error { if err != nil { return errors.ErrGstPipelineError(err) } - if err = sink.SetProperty("async", false); err != nil { - return errors.ErrGstPipelineError(err) - } - if err = sink.SetProperty("sync", false); err != nil { + if err = sink.Set("location", url); err != nil { return errors.ErrGstPipelineError(err) } if err = sink.SetProperty("async-connect", false); err != nil { return errors.ErrGstPipelineError(err) } - if err = sink.Set("location", url); err != nil { - return errors.ErrGstPipelineError(err) - } case types.OutputTypeSRT: sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", name)) @@ -159,42 +152,72 @@ func (sb *StreamBin) AddStream(url string) error { return errors.ErrInvalidInput("output type") } + // GstBaseSink properties + if err = sink.SetProperty("async", false); err != nil { + return errors.ErrGstPipelineError(err) + } + if err = sink.SetProperty("sync", false); err != nil { + return errors.ErrGstPipelineError(err) + } if err = b.AddElements(queue, sink); err != nil { return err } - // add a proxy pad between the queue and sink to intercept errors + ss := &StreamSink{ + bin: b, + sink: sink, + url: url, + } + + // add a proxy pad between the queue and sink to prevent errors from propagating upstream b.SetLinkFunc(func() error { - proxy := gst.NewGhostPad("proxy", sink.GetStaticPad("sink")) - // proxy isn't saved/stored anywhere, so we need to reference it + proxy := gst.NewGhostPad(fmt.Sprintf("proxy_%s", name), sink.GetStaticPad("sink")) proxy.Ref() - proxy.SetChainFunction(func(self *gst.Pad, _ *gst.Object, buffer *gst.Buffer) gst.FlowReturn { - // buffer needs to be referenced or it might get freed - buffer.Ref() - links, _ := self.GetInternalLinks() - for _, link := range links { - if link.Push(buffer) == gst.FlowEOS { + proxy.ActivateMode(gst.PadModePush, true) + + switch sb.outputType { + case types.OutputTypeRTMP: + proxy.SetChainFunction(func(self *gst.Pad, _ *gst.Object, buffer *gst.Buffer) gst.FlowReturn { + buffer.Ref() + links, _ := self.GetInternalLinks() + switch { + case len(links) != 1: + return gst.FlowNotLinked + case links[0].Push(buffer) == gst.FlowEOS: return gst.FlowEOS + default: + return gst.FlowOK } - } - return gst.FlowOK - }) - proxy.ActivateMode(gst.PadModePush, true) + }) + case types.OutputTypeSRT: + proxy.SetChainListFunction(func(self *gst.Pad, _ *gst.Object, list *gst.BufferList) gst.FlowReturn { + list.Ref() + if ss.failed { + return gst.FlowOK + } + links, _ := self.GetInternalLinks() + if len(links) != 1 { + return gst.FlowNotLinked + } + switch links[0].PushList(list) { + case gst.FlowEOS: + return gst.FlowEOS + case gst.FlowError: + ss.failed = true + } + return gst.FlowOK + }) + } // link queue to sink if padReturn := queue.GetStaticPad("src").Link(proxy.Pad); padReturn != gst.PadLinkOK { return errors.ErrPadLinkFailed(queue.GetName(), "proxy", padReturn.String()) } - return nil }) sb.mu.Lock() - sb.sinks[name] = &StreamSink{ - bin: b, - sink: sink, - url: url, - } + sb.sinks[name] = ss sb.mu.Unlock() return sb.b.AddSinkBin(b) @@ -255,32 +278,21 @@ func (sb *StreamBin) MaybeResetStream(name string, streamErr error) (bool, error func (sb *StreamBin) RemoveStream(url string) error { sb.mu.Lock() - name := sb.getStreamNameLocked(url) - if name == "" { + var name string + var sink *StreamSink + for n, s := range sb.sinks { + if s.url == url { + name = n + sink = s + break + } + } + if sink == nil { sb.mu.Unlock() return errors.ErrStreamNotFound(url) } delete(sb.sinks, name) - - sb.removals[name] = struct{}{} sb.mu.Unlock() - _, err := sb.b.RemoveSinkBin(name) - return err -} - -func (sb *StreamBin) getStreamNameLocked(url string) string { - for name, sink := range sb.sinks { - if sink.url == url { - return name - } - } - return "" -} - -func (sb *StreamBin) Removed(name string) bool { - sb.mu.Lock() - _, ok := sb.removals[name] - sb.mu.Unlock() - return ok + return sb.b.RemoveSinkBin(name) } diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index 5a7781db..fe8b51cf 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -149,7 +149,7 @@ func (b *VideoBin) onTrackRemoved(trackID string) { } b.mu.Unlock() - if _, err := b.bin.RemoveSourceBin(name); err != nil { + if err := b.bin.RemoveSourceBin(name); err != nil { b.bin.OnError(err) } } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index f87f6c31..05234155 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -33,61 +33,60 @@ import ( ) const ( - // gst error logs + // noisy gst errors msgWrongThread = "Called from wrong thread" - // gst warning logs + // noisy gst warnings msgKeyframe = "Could not request a keyframe. Files may not split at the exact location they should" msgLatencyQuery = "Latency query failed" msgTaps = "can't find exact taps" msgInputDisappeared = "Can't copy metadata because input buffer disappeared" + msgSkippingSegment = "error reading data -1 (reason: Success), skipping segment" fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont" + callerEPollUpdateEvents = "./srtcore/epoll.cpp:905" - // gst fix me logs + // noisy gst fixmes msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements" msgCreatingStream = "Creating random stream-id, consider implementing a deterministic way of creating a stream-id" msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation." ) +var ( + logLevels = map[gst.DebugLevel]string{ + gst.LevelError: "error", + gst.LevelWarning: "warning", + gst.LevelFixMe: "fixme", + gst.LevelInfo: "info", + gst.LevelDebug: "debug", + gst.LevelLog: "log", + gst.LevelTrace: "trace", + gst.LevelMemDump: "memdump", + } + + ignore = map[string]bool{ + msgWrongThread: true, + msgKeyframe: true, + msgLatencyQuery: true, + msgTaps: true, + msgInputDisappeared: true, + msgSkippingSegment: true, + fnGstAudioResampleCheckDiscont: true, + callerEPollUpdateEvents: true, + msgStreamStart: true, + msgCreatingStream: true, + msgAggregateSubclass: true, + } +) + func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line int, _ *glib.Object, message string) { - var lvl string - switch level { - case gst.LevelNone: - lvl = "none" - case gst.LevelError: - switch message { - case msgWrongThread: - // ignore - return - default: - lvl = "error" - } - case gst.LevelWarning: - if function == fnGstAudioResampleCheckDiscont { - // ignore - return - } - switch message { - case msgKeyframe, msgLatencyQuery, msgTaps, msgInputDisappeared: - // ignore - return - default: - lvl = "warning" - } - case gst.LevelFixMe: - switch message { - case msgStreamStart, msgCreatingStream, msgAggregateSubclass: - // ignore - return - default: - lvl = "fixme" - } - case gst.LevelInfo: - lvl = "info" - case gst.LevelDebug: - lvl = "debug" - default: - lvl = "log" + lvl, ok := logLevels[level] + if !ok || ignore[message] || ignore[function] { + return + } + + caller := fmt.Sprintf("%s:%d", file, line) + if ignore[caller] { + return } var msg string @@ -96,8 +95,7 @@ func (c *Controller) gstLog(level gst.DebugLevel, file, function string, line in } else { msg = fmt.Sprintf("[gst %s] %s", lvl, message) } - args := []interface{}{"caller", fmt.Sprintf("%s:%d", file, line)} - c.gstLogger.Debugw(msg, args...) + c.gstLogger.Debugw(msg, "caller", caller) } func (c *Controller) messageWatch(msg *gst.Message) bool { @@ -149,7 +147,6 @@ const ( elementGstRtmp2Sink = "GstRtmp2Sink" elementGstSplitMuxSink = "GstSplitMuxSink" elementGstSrtSink = "GstSRTSink" - elementGstQueue = "GstQueue" msgStreamingNotNegotiated = "streaming stopped, reason not-negotiated (-4)" msgMuxer = ":muxer" @@ -191,12 +188,6 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { return c.removeSink(context.Background(), url, gErr) - case element == elementGstQueue: - sinkName := strings.Split(name, "_")[1] - if c.streamBin.Removed(sinkName) { - return nil - } - case element == elementGstAppSrc: if message == msgStreamingNotNegotiated { // send eos to app src diff --git a/test/integration.go b/test/integration.go index 17cae744..385b5bab 100644 --- a/test/integration.go +++ b/test/integration.go @@ -33,12 +33,6 @@ import ( lksdk "github.com/livekit/server-sdk-go/v2" ) -const ( - muteDuration = time.Second * 10 - - webUrl = "https://videoplayer-2k23.vercel.app/videos/eminem" -) - var ( samples = map[types.MimeType]string{ types.MimeTypeOpus: "/workspace/test/sample/matrix-trailer.ogg", @@ -138,7 +132,7 @@ func (r *Runner) publishSample(t *testing.T, codec types.MimeType, withMuting bo default: pub.SetMuted(!muted) muted = !muted - time.Sleep(muteDuration) + time.Sleep(time.Second * 10) } } }() diff --git a/test/stream.go b/test/stream.go index 1d498658..3873c68e 100644 --- a/test/stream.go +++ b/test/stream.go @@ -137,7 +137,7 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * require.NotZero(t, res.EndedAt) // check stream info - require.Len(t, res.StreamResults, 2) + require.Len(t, res.StreamResults, 4) for _, info := range res.StreamResults { require.NotZero(t, info.StartedAt) require.NotZero(t, info.EndedAt) diff --git a/test/web.go b/test/web.go index 02aaeb23..46077755 100644 --- a/test/web.go +++ b/test/web.go @@ -26,6 +26,8 @@ import ( "github.com/livekit/protocol/utils" ) +const webUrl = "https://videoplayer-2k23.vercel.app/videos/eminem" + func (r *Runner) testWeb(t *testing.T) { if !r.should(runWeb) { return From 0d61559b311059df74acd7310510dca08a2b9764 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 23 Jul 2024 01:06:45 -0400 Subject: [PATCH 4/6] double run call on track test --- test/track.go | 88 +++++++++++++++++++++++++-------------------------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/test/track.go b/test/track.go index a049c2b7..f6f9b672 100644 --- a/test/track.go +++ b/test/track.go @@ -77,7 +77,7 @@ func (r *Runner) testTrackFile(t *testing.T) { filename: "t_{track_type}_{time}.webm", }, // { - // name: "VP9", + // name: "File/VP9", // videoOnly: true, // videoCodec: types.MimeTypeVP9, // outputType: types.OutputTypeWebM, @@ -120,58 +120,56 @@ func (r *Runner) testTrackStream(t *testing.T) { return } - run(t, "Stream", func(t *testing.T) { - now := time.Now().Unix() - for _, test := range []*testCase{ - { - name: "Websocket", - audioOnly: true, - audioCodec: types.MimeTypeOpus, - filename: fmt.Sprintf("track-ws-%v.raw", now), - }, - } { - r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { - trackID := audioTrackID - if trackID == "" { - trackID = videoTrackID - } + now := time.Now().Unix() + for _, test := range []*testCase{ + { + name: "Websocket", + audioOnly: true, + audioCodec: types.MimeTypeOpus, + filename: fmt.Sprintf("track-ws-%v.raw", now), + }, + } { + r.runTrackTest(t, test.name, test.audioCodec, test.videoCodec, func(t *testing.T, audioTrackID, videoTrackID string) { + trackID := audioTrackID + if trackID == "" { + trackID = videoTrackID + } + + filepath := path.Join(r.FilePrefix, test.filename) + wss := newTestWebsocketServer(filepath) + s := httptest.NewServer(http.HandlerFunc(wss.handleWebsocket)) + defer func() { + wss.close() + s.Close() + }() - filepath := path.Join(r.FilePrefix, test.filename) - wss := newTestWebsocketServer(filepath) - s := httptest.NewServer(http.HandlerFunc(wss.handleWebsocket)) - defer func() { - wss.close() - s.Close() - }() - - req := &rpc.StartEgressRequest{ - EgressId: utils.NewGuid(utils.EgressPrefix), - Request: &rpc.StartEgressRequest_Track{ - Track: &livekit.TrackEgressRequest{ - RoomName: r.room.Name(), - TrackId: trackID, - Output: &livekit.TrackEgressRequest_WebsocketUrl{ - WebsocketUrl: "ws" + strings.TrimPrefix(s.URL, "http"), - }, + req := &rpc.StartEgressRequest{ + EgressId: utils.NewGuid(utils.EgressPrefix), + Request: &rpc.StartEgressRequest_Track{ + Track: &livekit.TrackEgressRequest{ + RoomName: r.room.Name(), + TrackId: trackID, + Output: &livekit.TrackEgressRequest_WebsocketUrl{ + WebsocketUrl: "ws" + strings.TrimPrefix(s.URL, "http"), }, }, - } + }, + } - egressID := r.startEgress(t, req) + egressID := r.startEgress(t, req) - p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) - require.NoError(t, err) + p, err := config.GetValidatedPipelineConfig(r.ServiceConfig, req) + require.NoError(t, err) - time.Sleep(time.Second * 30) + time.Sleep(time.Second * 30) - res := r.stopEgress(t, egressID) - verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate, false) - }) - if r.Short { - return - } + res := r.stopEgress(t, egressID) + verify(t, filepath, p, res, types.EgressTypeWebsocket, r.Muting, r.sourceFramerate, false) + }) + if r.Short { + return } - }) + } } type websocketTestServer struct { From ceac6adc9bebacf87fe5f36afc6d62fac80adb73 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 23 Jul 2024 01:23:59 -0400 Subject: [PATCH 5/6] add ws schemes to output type map --- pkg/types/types.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/types/types.go b/pkg/types/types.go index f3da4c91..21a777aa 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -207,6 +207,8 @@ var ( "mux": OutputTypeRTMP, "twitch": OutputTypeRTMP, "srt": OutputTypeSRT, + "ws": OutputTypeRaw, + "wss": OutputTypeRaw, } ) From 31f90a08685d795b90d2588e62402896768a6a40 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 23 Jul 2024 01:43:09 -0400 Subject: [PATCH 6/6] remove debug logs --- pkg/pipeline/builder/video.go | 4 ---- test/stream.go | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/pipeline/builder/video.go b/pkg/pipeline/builder/video.go index fe8b51cf..78c5690a 100644 --- a/pkg/pipeline/builder/video.go +++ b/pkg/pipeline/builder/video.go @@ -719,12 +719,10 @@ func (b *VideoBin) createSrcPad(trackID, name string) { b.mu.Lock() if pts < b.lastPTS || (b.selectedPad != videoTestSrcName && b.selectedPad != name) { b.mu.Unlock() - logger.Debugw(fmt.Sprintf("%s dropping %v", name, time.Duration(pts))) return gst.PadProbeDrop } b.lastPTS = pts b.mu.Unlock() - logger.Debugw(fmt.Sprintf("%s pushing %v", name, time.Duration(pts))) return gst.PadProbeOK }) @@ -741,12 +739,10 @@ func (b *VideoBin) createTestSrcPad() { b.mu.Lock() if pts < b.lastPTS || (b.selectedPad != videoTestSrcName) { b.mu.Unlock() - logger.Debugw(fmt.Sprintf("%s dropping %v", videoTestSrcName, time.Duration(pts))) return gst.PadProbeDrop } b.lastPTS = pts b.mu.Unlock() - logger.Debugw(fmt.Sprintf("%s pushing %v", videoTestSrcName, time.Duration(pts))) return gst.PadProbeOK }) diff --git a/test/stream.go b/test/stream.go index 3873c68e..c036cf75 100644 --- a/test/stream.go +++ b/test/stream.go @@ -143,11 +143,11 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * require.NotZero(t, info.EndedAt) switch info.Url { - case urls[0][0]: + case urls[0][1]: require.Equal(t, livekit.StreamInfo_FINISHED.String(), info.Status.String()) require.Greater(t, float64(info.Duration)/1e9, 15.0) - case urls[2][0]: + case urls[2][1]: require.Equal(t, livekit.StreamInfo_FINISHED.String(), info.Status.String()) require.Greater(t, float64(info.Duration)/1e9, 10.0)