From 8602b268127a6e6f6818023e0303eb0c724bb5f2 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 9 Aug 2024 14:12:17 -0400 Subject: [PATCH] Clean up stream logic (#754) * clean up stream logic * more updates * update urls_test * fix SRT test * fix test races for stream updates * update rtmp failure test --- go.mod | 2 +- pkg/config/output.go | 34 ++++--- pkg/config/output_stream.go | 37 ++++--- pkg/config/pipeline.go | 3 +- pkg/config/urls.go | 113 ++++++++++++++------- pkg/config/urls_test.go | 40 ++++---- pkg/pipeline/builder/stream.go | 81 +++++++-------- pkg/pipeline/controller.go | 174 ++++++++++++++++----------------- pkg/pipeline/sink/websocket.go | 8 +- pkg/pipeline/watch.go | 37 +++---- pkg/service/io.go | 71 ++++++++------ test/edge.go | 21 +++- test/integration.go | 32 ++++-- test/stream.go | 11 +-- 14 files changed, 370 insertions(+), 294 deletions(-) diff --git a/go.mod b/go.mod index ee2d0d40..6260cde9 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.15.0 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aws/aws-sdk-go v1.51.28 + github.com/bep/debounce v1.2.1 github.com/chromedp/cdproto v0.0.0-20240421230201-ab917191657d github.com/chromedp/chromedp v0.9.5 github.com/frostbyte73/core v0.0.10 @@ -49,7 +50,6 @@ require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bep/debounce v1.2.1 // indirect github.com/bufbuild/protovalidate-go v0.6.1 // indirect github.com/bufbuild/protoyaml-go v0.1.9 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/pkg/config/output.go b/pkg/config/output.go index 86497c5a..814f3c47 100644 --- a/pkg/config/output.go +++ b/pkg/config/output.go @@ -62,7 +62,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error { } p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} - p.OutputCount++ + p.OutputCount.Inc() p.FinalizationRequired = true if p.VideoEnabled { p.VideoEncoding = true @@ -118,16 +118,18 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error { } p.Outputs[types.EgressTypeStream] = []OutputConfig{conf} - p.OutputCount += len(stream.Urls) + p.OutputCount.Add(int32(len(stream.Urls))) if p.VideoEnabled { p.VideoEncoding = true } - streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo)) - for _, info := range conf.StreamInfo { - streamInfoList = append(streamInfoList, info) - } + streamInfoList := make([]*livekit.StreamInfo, 0, len(stream.Urls)) + conf.Streams.Range(func(_, stream any) bool { + streamInfoList = append(streamInfoList, stream.(*Stream).StreamInfo) + return true + }) p.Info.StreamResults = streamInfoList + if len(files)+len(segments)+len(images) == 0 { // empty stream output only valid in combination with other outputs if len(stream.Urls) == 0 { @@ -157,7 +159,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error { } p.Outputs[types.EgressTypeSegments] = []OutputConfig{conf} - p.OutputCount++ + p.OutputCount.Inc() p.FinalizationRequired = true if p.VideoEnabled { p.VideoEncoding = true @@ -186,7 +188,7 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error { return err } - if p.OutputCount == 0 { + if p.OutputCount.Load() == 0 { return errors.ErrInvalidInput("output") } @@ -205,7 +207,7 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err p.Info.Result = &livekit.EgressInfo_File{File: conf.FileInfo} p.Outputs[types.EgressTypeFile] = []OutputConfig{conf} - p.OutputCount = 1 + p.OutputCount.Inc() p.FinalizationRequired = true case *livekit.TrackEgressRequest_WebsocketUrl: @@ -214,15 +216,17 @@ func (p *PipelineConfig) updateDirectOutput(req *livekit.TrackEgressRequest) err return err } - streamInfoList := make([]*livekit.StreamInfo, 0, len(conf.StreamInfo)) - for _, info := range conf.StreamInfo { - streamInfoList = append(streamInfoList, info) - } + streamInfoList := make([]*livekit.StreamInfo, 0, 1) + conf.Streams.Range(func(_, stream any) bool { + streamInfoList = append(streamInfoList, stream.(*Stream).StreamInfo) + return true + }) + p.Info.StreamResults = streamInfoList p.Info.Result = &livekit.EgressInfo_Stream{Stream: &livekit.StreamInfoList{Info: streamInfoList}} p.Outputs[types.EgressTypeWebsocket] = []OutputConfig{conf} - p.OutputCount = 1 + p.OutputCount.Inc() default: return errors.ErrInvalidInput("output") @@ -243,7 +247,7 @@ func (p *PipelineConfig) updateImageOutputs(images []*livekit.ImageOutput) error } p.Outputs[types.EgressTypeImages] = append(p.Outputs[types.EgressTypeImages], conf) - p.OutputCount++ + p.OutputCount.Inc() p.FinalizationRequired = true p.Info.ImageResults = append(p.Info.ImageResults, conf.ImagesInfo) diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index 413314aa..03793f00 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -15,20 +15,30 @@ package config import ( + "sync" + "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" ) type StreamConfig struct { outputConfig - Urls []string - StreamIDs map[string]string - StreamInfo map[string]*livekit.StreamInfo + // url -> Stream + Streams sync.Map twitchTemplate string } +type Stream struct { + Name string // gstreamer stream ID + ParsedUrl string // parsed/validated url + RedactedUrl string // url with stream key removed + StreamID string // stream ID used by rtmpconnection + StreamInfo *livekit.StreamInfo +} + func (p *PipelineConfig) GetStreamConfig() *StreamConfig { o, ok := p.Outputs[types.EgressTypeStream] if !ok || len(o) == 0 { @@ -48,22 +58,13 @@ func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig { func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) { conf := &StreamConfig{ outputConfig: outputConfig{OutputType: outputType}, - StreamIDs: make(map[string]string), } - conf.StreamInfo = make(map[string]*livekit.StreamInfo) - var streamInfoList []*livekit.StreamInfo for _, rawUrl := range urls { - url, redacted, err := conf.ValidateUrl(rawUrl, outputType) + _, err := conf.AddStream(rawUrl, outputType) if err != nil { return nil, err } - - conf.Urls = append(conf.Urls, url) - - info := &livekit.StreamInfo{Url: redacted} - conf.StreamInfo[url] = info - streamInfoList = append(streamInfoList, info) } switch outputType { @@ -81,3 +82,13 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str return conf, nil } + +func (s *Stream) UpdateEndTime(endedAt int64) { + s.StreamInfo.EndedAt = endedAt + if s.StreamInfo.StartedAt == 0 { + logger.Warnw("stream missing start time", nil, "url", s.RedactedUrl) + s.StreamInfo.StartedAt = endedAt + } else { + s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt + } +} diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index ab800ed9..cb7cb980 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -22,6 +22,7 @@ import ( "github.com/go-gst/go-gst/gst/app" "github.com/pion/webrtc/v3" + "go.uber.org/atomic" "google.golang.org/protobuf/proto" "gopkg.in/yaml.v3" @@ -50,7 +51,7 @@ type PipelineConfig struct { VideoConfig `yaml:"-"` Outputs map[types.EgressType][]OutputConfig `yaml:"-"` - OutputCount int `yaml:"-"` + OutputCount atomic.Int32 `yaml:"-"` FinalizationRequired bool `yaml:"-"` Info *info.EgressInfo `yaml:"-"` diff --git a/pkg/config/urls.go b/pkg/config/urls.go index 5e96ddb6..291f25b6 100644 --- a/pkg/config/urls.go +++ b/pkg/config/urls.go @@ -20,11 +20,13 @@ import ( "net/url" "regexp" "strings" + "time" "github.com/go-jose/go-jose/v3/json" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" ) @@ -34,75 +36,110 @@ var ( twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$") ) -func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) { - parsed, err := url.Parse(rawUrl) +func (o *StreamConfig) AddStream(rawUrl string, outputType types.OutputType) (*Stream, error) { + parsed, redacted, streamID, err := o.ValidateUrl(rawUrl, outputType) if err != nil { - return "", "", errors.ErrInvalidUrl(rawUrl, err.Error()) + return nil, err } - if types.StreamOutputTypes[parsed.Scheme] != outputType { - return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme") + + stream := &Stream{ + ParsedUrl: parsed, + RedactedUrl: redacted, + StreamID: streamID, + StreamInfo: &livekit.StreamInfo{ + Url: redacted, + Status: livekit.StreamInfo_ACTIVE, + }, + } + if outputType != types.OutputTypeRTMP { + stream.StreamInfo.StartedAt = time.Now().UnixNano() + } + o.Streams.Store(parsed, stream) + + return stream, nil +} + +func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) ( + parsed string, redacted string, streamID string, err error, +) { + parsedUrl, err := url.Parse(rawUrl) + if err != nil { + err = errors.ErrInvalidUrl(rawUrl, err.Error()) + return + } + if types.StreamOutputTypes[parsedUrl.Scheme] != outputType { + err = errors.ErrInvalidUrl(rawUrl, "invalid scheme") + return } switch outputType { case types.OutputTypeRTMP: - if parsed.Scheme == "mux" { - rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host) - } else if parsed.Scheme == "twitch" { - rawUrl, err = o.updateTwitchURL(parsed.Host) + if parsedUrl.Scheme == "mux" { + parsed = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsedUrl.Host) + } else if parsedUrl.Scheme == "twitch" { + parsed, err = o.updateTwitchURL(parsedUrl.Host) if err != nil { - return "", "", errors.ErrInvalidUrl(rawUrl, err.Error()) + return } } else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 { - updated, err := o.updateTwitchURL(match[1]) - if err == nil { - rawUrl = updated + if updated, err := o.updateTwitchURL(match[1]); err == nil { + parsed = updated } + } else { + parsed = rawUrl } - redacted, streamID, ok := redactStreamKey(rawUrl) + var ok bool + redacted, streamID, ok = redactStreamKey(parsed) if !ok { - return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)") + err = errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)") } - o.StreamIDs[rawUrl] = streamID - - return rawUrl, redacted, nil + return case types.OutputTypeSRT: - return rawUrl, rawUrl, nil + parsed = rawUrl + redacted = rawUrl + return case types.OutputTypeRaw: - return rawUrl, rawUrl, nil + parsed = rawUrl + redacted = rawUrl + return default: - return "", "", errors.ErrInvalidInput("stream output type") + err = errors.ErrInvalidInput("stream output type") + return } } -func (o *StreamConfig) GetStreamUrl(rawUrl string) (string, error) { - parsed, err := url.Parse(rawUrl) +func (o *StreamConfig) GetStream(rawUrl string) (*Stream, error) { + parsedUrl, err := url.Parse(rawUrl) if err != nil { - return "", errors.ErrInvalidUrl(rawUrl, err.Error()) + return nil, errors.ErrInvalidUrl(rawUrl, err.Error()) } - var twitchKey string - if parsed.Scheme == "mux" { - return fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host), nil - } else if parsed.Scheme == "twitch" { - twitchKey = parsed.Host + var parsed string + if parsedUrl.Scheme == "mux" { + parsed = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsedUrl.Host) + } else if parsedUrl.Scheme == "twitch" { + parsed, err = o.updateTwitchURL(parsedUrl.Host) + if err != nil { + return nil, err + } } else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 { - twitchKey = match[1] + parsed, err = o.updateTwitchURL(match[1]) + if err != nil { + return nil, err + } } else { - return rawUrl, nil + parsed = rawUrl } - // find twitch url by stream key because we can't rely on the ingest endpoint returning consistent results - for u := range o.StreamInfo { - if match := twitchEndpoint.FindStringSubmatch(u); len(match) > 0 && match[1] == twitchKey { - return u, nil - } + stream, ok := o.Streams.Load(parsed) + if !ok { + return nil, errors.ErrStreamNotFound(rawUrl) } - - return "", errors.ErrStreamNotFound(rawUrl) + return stream.(*Stream), nil } func (o *StreamConfig) updateTwitchURL(key string) (string, error) { diff --git a/pkg/config/urls_test.go b/pkg/config/urls_test.go index a3501579..9a9fa0e3 100644 --- a/pkg/config/urls_test.go +++ b/pkg/config/urls_test.go @@ -16,12 +16,12 @@ package config import ( "regexp" + "strings" "testing" "github.com/stretchr/testify/require" "github.com/livekit/egress/pkg/types" - "github.com/livekit/protocol/livekit" ) func TestValidateUrl(t *testing.T) { @@ -33,12 +33,12 @@ func TestValidateUrl(t *testing.T) { for _, test := range []struct { url string twitch bool - updated string + parsed string redacted string }{ { url: "mux://streamkey", - updated: "rtmps://global-live.mux.com:443/app/streamkey", + parsed: "rtmps://global-live.mux.com:443/app/streamkey", redacted: "rtmps://global-live.mux.com:443/app/{str...key}", }, { @@ -51,52 +51,54 @@ func TestValidateUrl(t *testing.T) { }, { url: "rtmp://localhost:1935/live/streamkey", - updated: "rtmp://localhost:1935/live/streamkey", + parsed: "rtmp://localhost:1935/live/streamkey", redacted: "rtmp://localhost:1935/live/{str...key}", }, { url: "rtmps://localhost:1935/live/streamkey", - updated: "rtmps://localhost:1935/live/streamkey", + parsed: "rtmps://localhost:1935/live/streamkey", redacted: "rtmps://localhost:1935/live/{str...key}", }, } { - updated, redacted, err := o.ValidateUrl(test.url, types.OutputTypeRTMP) + parsed, redacted, streamID, err := o.ValidateUrl(test.url, types.OutputTypeRTMP) require.NoError(t, err) + require.NotEmpty(t, streamID) if test.twitch { - require.NotEmpty(t, twitchUpdated.FindString(updated), updated) + require.NotEmpty(t, twitchUpdated.FindString(parsed), parsed) require.NotEmpty(t, twitchRedacted.FindString(redacted), redacted) } else { - require.Equal(t, test.updated, updated) + require.Equal(t, test.parsed, parsed) require.Equal(t, test.redacted, redacted) } } } func TestGetUrl(t *testing.T) { + o := &StreamConfig{} + require.NoError(t, o.updateTwitchTemplate()) + + parsedTwitchUrl := strings.ReplaceAll(o.twitchTemplate, "{stream_key}", "streamkey") urls := []string{ "rtmps://global-live.mux.com:443/app/streamkey", - "rtmp://sfo.contribute.live-video.net/app/streamkey", - "rtmp://sfo.contribute.live-video.net/app/streamkey", + parsedTwitchUrl, + parsedTwitchUrl, "rtmp://localhost:1935/live/streamkey", } - o := &StreamConfig{ - StreamInfo: map[string]*livekit.StreamInfo{ - urls[0]: {Url: urls[0]}, - urls[1]: {Url: urls[1]}, - urls[3]: {Url: urls[3]}, - }, + for _, url := range []string{urls[0], urls[1], urls[3]} { + _, err := o.AddStream(url, types.OutputTypeRTMP) + require.NoError(t, err) } for i, rawUrl := range []string{ "mux://streamkey", "twitch://streamkey", - "rtmp://jfk.contribute.live-video.net/app/streamkey", + "rtmp://any.contribute.live-video.net/app/streamkey", "rtmp://localhost:1935/live/streamkey", } { - url, err := o.GetStreamUrl(rawUrl) + stream, err := o.GetStream(rawUrl) require.NoError(t, err) - require.Equal(t, urls[i], url) + require.Equal(t, urls[i], stream.ParsedUrl) } } diff --git a/pkg/pipeline/builder/stream.go b/pkg/pipeline/builder/stream.go index 4fe891d5..7cb95d3d 100644 --- a/pkg/pipeline/builder/stream.go +++ b/pkg/pipeline/builder/stream.go @@ -38,9 +38,9 @@ type StreamBin struct { } type StreamSink struct { + stream *config.Stream bin *gstreamer.Bin sink *gst.Element - url string reconnections int disconnectedAt time.Time failed bool @@ -104,20 +104,22 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St sinks: make(map[string]*StreamSink), } - for _, url := range o.Urls { - if err = sb.AddStream(url); err != nil { - return nil, nil, err - } + o.Streams.Range(func(_, stream any) bool { + err = sb.AddStream(stream.(*config.Stream)) + return err == nil + }) + if err != nil { + return nil, nil, err } return sb, b, nil } -func (sb *StreamBin) AddStream(url string) error { - name := utils.NewGuid("") - b := sb.b.NewBin(name) +func (sb *StreamBin) AddStream(stream *config.Stream) error { + stream.Name = utils.NewGuid("") + b := sb.b.NewBin(stream.Name) - queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", name), config.Latency, true) + queue, err := gstreamer.BuildQueue(fmt.Sprintf("queue_%s", stream.Name), config.Latency, true) if err != nil { return errors.ErrGstPipelineError(err) } @@ -125,20 +127,20 @@ func (sb *StreamBin) AddStream(url string) error { var sink *gst.Element switch sb.outputType { case types.OutputTypeRTMP: - sink, err = gst.NewElementWithName("rtmp2sink", fmt.Sprintf("rtmp2sink_%s", name)) + sink, err = gst.NewElementWithName("rtmp2sink", fmt.Sprintf("rtmp2sink_%s", stream.Name)) if err != nil { return errors.ErrGstPipelineError(err) } - if err = sink.Set("location", url); err != nil { + if err = sink.Set("location", stream.ParsedUrl); err != nil { return errors.ErrGstPipelineError(err) } case types.OutputTypeSRT: - sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", name)) + sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", stream.Name)) if err != nil { return errors.ErrGstPipelineError(err) } - if err = sink.SetProperty("uri", url); err != nil { + if err = sink.SetProperty("uri", stream.ParsedUrl); err != nil { return errors.ErrGstPipelineError(err) } if err = sink.SetProperty("wait-for-connection", false); err != nil { @@ -161,14 +163,14 @@ func (sb *StreamBin) AddStream(url string) error { } ss := &StreamSink{ - bin: b, - sink: sink, - url: url, + stream: stream, + bin: b, + sink: sink, } // add a proxy pad between the queue and sink to prevent errors from propagating upstream b.SetLinkFunc(func() error { - proxy := gst.NewGhostPad(fmt.Sprintf("proxy_%s", name), sink.GetStaticPad("sink")) + proxy := gst.NewGhostPad(fmt.Sprintf("proxy_%s", stream.Name), sink.GetStaticPad("sink")) proxy.Ref() proxy.ActivateMode(gst.PadModePush, true) @@ -214,29 +216,29 @@ func (sb *StreamBin) AddStream(url string) error { }) sb.mu.Lock() - sb.sinks[name] = ss + sb.sinks[stream.Name] = ss sb.mu.Unlock() return sb.b.AddSinkBin(b) } -func (sb *StreamBin) GetStreamUrl(name string) (string, error) { - sb.mu.RLock() +func (sb *StreamBin) GetStream(name string) (*config.Stream, error) { + sb.mu.Lock() + defer sb.mu.Unlock() + sink, ok := sb.sinks[name] - sb.mu.RUnlock() if !ok { - return "", errors.ErrStreamNotFound(name) + return nil, errors.ErrStreamNotFound(name) } - return sink.url, nil + return sink.stream, nil } -func (sb *StreamBin) MaybeResetStream(name string, streamErr error) (bool, error) { +func (sb *StreamBin) MaybeResetStream(stream *config.Stream, streamErr error) (bool, error) { sb.mu.Lock() - sink := sb.sinks[name] + sink, ok := sb.sinks[stream.Name] sb.mu.Unlock() - - if sink == nil { - return false, errors.ErrStreamNotFound(name) + if !ok { + return false, errors.ErrStreamNotFound(stream.Name) } s, err := sink.sink.GetProperty("stats") @@ -260,8 +262,7 @@ func (sb *StreamBin) MaybeResetStream(name string, streamErr error) (bool, error } sink.reconnections++ - redacted, _ := utils.RedactStreamKey(sink.url) - logger.Warnw("resetting stream", streamErr, "url", redacted) + logger.Warnw("resetting stream", streamErr, "url", sink.stream.RedactedUrl) if err = sink.bin.SetState(gst.StateNull); err != nil { return false, err @@ -273,23 +274,15 @@ func (sb *StreamBin) MaybeResetStream(name string, streamErr error) (bool, error return true, nil } -func (sb *StreamBin) RemoveStream(url string) error { +func (sb *StreamBin) RemoveStream(stream *config.Stream) error { sb.mu.Lock() - var name string - var sink *StreamSink - for n, s := range sb.sinks { - if s.url == url { - name = n - sink = s - break - } - } - if sink == nil { + _, ok := sb.sinks[stream.Name] + if !ok { sb.mu.Unlock() - return errors.ErrStreamNotFound(url) + return errors.ErrStreamNotFound(stream.RedactedUrl) } - delete(sb.sinks, name) + delete(sb.sinks, stream.Name) sb.mu.Unlock() - return sb.b.RemoveSinkBin(name) + return sb.b.RemoveSinkBin(stream.Name) } diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 2a73ebb4..68db8de2 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/bep/debounce" "github.com/frostbyte73/core" "github.com/go-gst/go-gst/gst" "go.uber.org/zap" @@ -37,7 +38,6 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" - "github.com/livekit/protocol/utils" ) const ( @@ -47,6 +47,7 @@ const ( type Controller struct { *config.PipelineConfig ipcServiceClient ipc.EgressServiceClient + streamUpdates func(func()) // debounce stream updates since they can come in quick succession // gstreamer gstLogger *zap.SugaredLogger @@ -74,6 +75,7 @@ func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc. c := &Controller{ PipelineConfig: conf, ipcServiceClient: ipcServiceClient, + streamUpdates: debounce.New(time.Millisecond * 500), gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), callbacks: &gstreamer.Callbacks{ GstReady: make(chan struct{}), @@ -264,14 +266,13 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream return errors.ErrNonStreamingPipeline } - sendUpdate := false errs := errors.ErrArray{} - now := time.Now().UnixNano() + sendUpdate := false // add stream outputs first for _, rawUrl := range req.AddOutputUrls { // validate and redact url - url, redacted, err := o.ValidateUrl(rawUrl, o.OutputType) + stream, err := o.AddStream(rawUrl, o.OutputType) if err != nil { errs.AppendErr(err) continue @@ -279,113 +280,97 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream // add stream info to results c.mu.Lock() - streamInfo := &livekit.StreamInfo{ - Url: redacted, - Status: livekit.StreamInfo_ACTIVE, - } - o.StreamInfo[url] = streamInfo - - c.Info.StreamResults = append(c.Info.StreamResults, streamInfo) + c.Info.StreamResults = append(c.Info.StreamResults, stream.StreamInfo) if list := (*livekit.EgressInfo)(c.Info).GetStream(); list != nil { - list.Info = append(list.Info, streamInfo) + list.Info = append(list.Info, stream.StreamInfo) } c.mu.Unlock() // add stream - if err = c.streamBin.AddStream(url); err != nil { - streamInfo.Status = livekit.StreamInfo_FAILED + sendUpdate = true + if err = c.streamBin.AddStream(stream); err != nil { + stream.StreamInfo.Status = livekit.StreamInfo_FAILED errs.AppendErr(err) continue } - if o.OutputType != types.OutputTypeRTMP { - streamInfo.StartedAt = now - } - c.OutputCount++ - sendUpdate = true + c.OutputCount.Inc() } // remove stream outputs for _, rawUrl := range req.RemoveOutputUrls { - url, err := o.GetStreamUrl(rawUrl) + stream, err := o.GetStream(rawUrl) if err != nil { errs.AppendErr(err) continue } - if err = c.removeSink(ctx, url, nil); err != nil { + sendUpdate = true + if err = c.streamFinished(ctx, stream); err != nil { errs.AppendErr(err) - } else { - sendUpdate = true } } if sendUpdate { c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) + c.streamUpdates(func() { + _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) + }) } return errs.ToError() } -func (c *Controller) removeSink(ctx context.Context, url string, streamErr error) error { - now := time.Now().UnixNano() +func (c *Controller) streamFinished(ctx context.Context, stream *config.Stream) error { + stream.StreamInfo.Status = livekit.StreamInfo_FINISHED + stream.UpdateEndTime(time.Now().UnixNano()) - c.mu.Lock() + // remove output o := c.GetStreamConfig() + o.Streams.Delete(stream.ParsedUrl) + c.OutputCount.Dec() - streamInfo := o.StreamInfo[url] - if streamInfo == nil { - c.mu.Unlock() - return errors.ErrStreamNotFound(url) + // end egress if no outputs remaining + if c.OutputCount.Load() == 0 { + c.SendEOS(ctx, "all streams removed") + return nil } - // set error if exists - if streamErr != nil { - streamInfo.Status = livekit.StreamInfo_FAILED - streamInfo.Error = streamErr.Error() - } else { - streamInfo.Status = livekit.StreamInfo_FINISHED - } + logger.Infow("stream finished", + "url", stream.RedactedUrl, + "status", stream.StreamInfo.Status, + "duration", stream.StreamInfo.Duration, + ) - // update end time and duration - streamInfo.EndedAt = now - if streamInfo.StartedAt == 0 { - streamInfo.StartedAt = now - } else { - streamInfo.Duration = now - streamInfo.StartedAt - } + return c.streamBin.RemoveStream(stream) +} + +func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, streamErr error) error { + stream.StreamInfo.Status = livekit.StreamInfo_FAILED + stream.StreamInfo.Error = streamErr.Error() + stream.UpdateEndTime(time.Now().UnixNano()) // remove output - delete(o.StreamInfo, url) - c.OutputCount-- - c.mu.Unlock() - - // log removal - redacted, _ := utils.RedactStreamKey(url) - logger.Infow("removing stream sink", - "url", redacted, - "status", streamInfo.Status, - "duration", streamInfo.Duration, - "error", streamErr) + o := c.GetStreamConfig() + o.Streams.Delete(stream.ParsedUrl) + c.OutputCount.Dec() - // shut down if no outputs remaining - if c.OutputCount == 0 { - if streamErr != nil { - return streamErr - } else { - c.SendEOS(ctx, "all streams removed") - return nil - } + // fail egress if no outputs remaining + if c.OutputCount.Load() == 0 { + return streamErr } - // only send updates if the egress will continue, otherwise it's handled by UpdateStream RPC - if streamErr != nil { - c.Info.UpdatedAt = time.Now().UnixNano() + logger.Infow("stream failed", + "url", stream.RedactedUrl, + "status", stream.StreamInfo.Status, + "duration", stream.StreamInfo.Duration, + "error", streamErr) + + c.streamUpdates(func() { _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) - } + }) - return c.streamBin.RemoveStream(url) + return c.streamBin.RemoveStream(stream) } func (c *Controller) SendEOS(ctx context.Context, reason string) { @@ -456,9 +441,10 @@ func (c *Controller) Close() { // update status if c.Info.Status == livekit.EgressStatus_EGRESS_FAILED { if o := c.GetStreamConfig(); o != nil { - for _, streamInfo := range o.StreamInfo { - streamInfo.Status = livekit.StreamInfo_FAILED - } + o.Streams.Range(func(_, stream any) bool { + stream.(*config.Stream).StreamInfo.Status = livekit.StreamInfo_FAILED + return true + }) } } @@ -526,15 +512,13 @@ func (c *Controller) updateStartTime(startedAt int64) { case types.EgressTypeStream, types.EgressTypeWebsocket: streamConfig := o[0].(*config.StreamConfig) if streamConfig.OutputType == types.OutputTypeRTMP { + // rtmp has special start time handling continue } - - c.mu.Lock() - for _, streamInfo := range streamConfig.StreamInfo { - streamInfo.Status = livekit.StreamInfo_ACTIVE - streamInfo.StartedAt = startedAt - } - c.mu.Unlock() + streamConfig.Streams.Range(func(_, stream any) bool { + stream.(*config.Stream).StreamInfo.StartedAt = startedAt + return true + }) case types.EgressTypeFile: o[0].(*config.FileConfig).FileInfo.StartedAt = startedAt @@ -555,6 +539,23 @@ func (c *Controller) updateStartTime(startedAt int64) { } } +func (c *Controller) updateStreamStartTime(streamID string) { + if o := c.GetStreamConfig(); o != nil { + o.Streams.Range(func(_, s any) bool { + if stream := s.(*config.Stream); stream.StreamID == streamID && stream.StreamInfo.StartedAt == 0 { + logger.Debugw("stream started", "url", stream.RedactedUrl) + stream.StreamInfo.StartedAt = time.Now().UnixNano() + c.Info.UpdatedAt = time.Now().UnixNano() + c.streamUpdates(func() { + _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) + }) + return false + } + return true + }) + } +} + func (c *Controller) updateDuration(endedAt int64) { for egressType, o := range c.Outputs { if len(o) == 0 { @@ -562,14 +563,13 @@ func (c *Controller) updateDuration(endedAt int64) { } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: - for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo { - streamInfo.Status = livekit.StreamInfo_FINISHED - if streamInfo.StartedAt == 0 { - streamInfo.StartedAt = endedAt - } - streamInfo.EndedAt = endedAt - streamInfo.Duration = endedAt - streamInfo.StartedAt - } + streamConfig := o[0].(*config.StreamConfig) + streamConfig.Streams.Range(func(_, s any) bool { + stream := s.(*config.Stream) + stream.StreamInfo.Status = livekit.StreamInfo_FINISHED + stream.UpdateEndTime(endedAt) + return true + }) case types.EgressTypeFile: fileInfo := o[0].(*config.FileConfig).FileInfo diff --git a/pkg/pipeline/sink/websocket.go b/pkg/pipeline/sink/websocket.go index 486191a7..4c9a12d5 100644 --- a/pkg/pipeline/sink/websocket.go +++ b/pkg/pipeline/sink/websocket.go @@ -48,7 +48,13 @@ func newWebsocketSink(o *config.StreamConfig, mimeType types.MimeType, callbacks header := http.Header{} header.Set("Content-Type", string(mimeType)) - conn, _, err := websocket.DefaultDialer.Dial(o.Urls[0], header) + var wsUrl string + o.Streams.Range(func(url, _ any) bool { + wsUrl = url.(string) + return false + }) + + conn, _, err := websocket.DefaultDialer.Dial(wsUrl, header) if err != nil { return nil, err } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index a15dd03b..1c57769e 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -96,18 +96,7 @@ func (c *Controller) gstLog( if category == catRtmpClient { if function == fnSendCreateStream { streamID := strings.Split(message, "'")[1] - if o := c.GetStreamConfig(); o != nil { - c.mu.Lock() - for url, sID := range o.StreamIDs { - if streamID == sID { - if streamInfo := o.StreamInfo[url]; streamInfo != nil && streamInfo.StartedAt == 0 { - streamInfo.StartedAt = time.Now().UnixNano() - break - } - } - } - c.mu.Unlock() - } + c.updateStreamStartTime(streamID) } return } @@ -181,10 +170,15 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { switch { case element == elementGstRtmp2Sink: - sinkName := strings.Split(name, "_")[1] + streamName := strings.Split(name, "_")[1] + stream, err := c.streamBin.GetStream(streamName) + if err != nil { + return err + } + if !c.eos.IsBroken() { // try reconnecting - ok, err := c.streamBin.MaybeResetStream(sinkName, gErr) + ok, err := c.streamBin.MaybeResetStream(stream, gErr) if err != nil { logger.Errorw("failed to reset stream", err) } else if ok { @@ -193,23 +187,16 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error { } // remove sink - url, err := c.streamBin.GetStreamUrl(sinkName) - if err != nil { - logger.Warnw("rtmp output not found", err, "url", url) - return err - } - - return c.removeSink(context.Background(), url, gErr) + return c.streamFailed(context.Background(), stream, gErr) case element == elementGstSrtSink: - sinkName := strings.Split(name, "_")[1] - url, err := c.streamBin.GetStreamUrl(sinkName) + streamName := strings.Split(name, "_")[1] + stream, err := c.streamBin.GetStream(streamName) if err != nil { - logger.Warnw("srt output not found", err, "url", url) return err } - return c.removeSink(context.Background(), url, gErr) + return c.streamFailed(context.Background(), stream, gErr) case element == elementGstAppSrc: if message == msgStreamingNotNegotiated { diff --git a/pkg/service/io.go b/pkg/service/io.go index 26dbd3f3..9cba79b2 100644 --- a/pkg/service/io.go +++ b/pkg/service/io.go @@ -16,6 +16,7 @@ package service import ( "context" + "sync" "google.golang.org/protobuf/types/known/emptypb" @@ -29,6 +30,9 @@ import ( type IOClient struct { rpc.IOInfoClient + + mu sync.Mutex + updates chan *livekit.EgressInfo } func NewIOClient(bus psrpc.MessageBus) (rpc.IOInfoClient, error) { @@ -38,6 +42,7 @@ func NewIOClient(bus psrpc.MessageBus) (rpc.IOInfoClient, error) { } return &IOClient{ IOInfoClient: client, + updates: make(chan *livekit.EgressInfo, 10), }, nil } @@ -51,36 +56,46 @@ func (c *IOClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo, o } func (c *IOClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { - _, err := c.IOInfoClient.UpdateEgress(ctx, info, opts...) - if err != nil { - logger.Errorw("failed to update egress", err) - return nil, err - } + c.updates <- info - requestType, outputType := egress.GetTypes(info.Request) - switch info.Status { - case livekit.EgressStatus_EGRESS_FAILED: - logger.Warnw("egress failed", errors.New(info.Error), - "egressID", info.EgressId, - "requestType", requestType, - "outputType", outputType, - ) - case livekit.EgressStatus_EGRESS_COMPLETE: - logger.Infow("egress completed", - "egressID", info.EgressId, - "requestType", requestType, - "outputType", outputType, - ) - default: - logger.Infow("egress updated", - "egressID", info.EgressId, - "requestType", requestType, - "outputType", outputType, - "status", info.Status, - ) - } + // ensure updates are sent sequentially + c.mu.Lock() + defer c.mu.Unlock() + for { + select { + case update := <-c.updates: + _, err := c.IOInfoClient.UpdateEgress(ctx, update, opts...) + if err != nil { + logger.Errorw("failed to update egress", err) + return nil, err + } - return &emptypb.Empty{}, nil + requestType, outputType := egress.GetTypes(update.Request) + switch update.Status { + case livekit.EgressStatus_EGRESS_FAILED: + logger.Warnw("egress failed", errors.New(update.Error), + "egressID", update.EgressId, + "requestType", requestType, + "outputType", outputType, + ) + case livekit.EgressStatus_EGRESS_COMPLETE: + logger.Infow("egress completed", + "egressID", update.EgressId, + "requestType", requestType, + "outputType", outputType, + ) + default: + logger.Infow("egress updated", + "egressID", update.EgressId, + "requestType", requestType, + "outputType", outputType, + "status", update.Status, + ) + } + default: + return &emptypb.Empty{}, nil + } + } } func (c *IOClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { diff --git a/test/edge.go b/test/edge.go index 4677d252..38f171b4 100644 --- a/test/edge.go +++ b/test/edge.go @@ -146,14 +146,25 @@ func (r *Runner) testRtmpFailure(t *testing.T) { require.Equal(t, r.RoomName, info.RoomName) require.Equal(t, livekit.EgressStatus_EGRESS_STARTING, info.Status) - // check update + // check updates time.Sleep(time.Second * 5) info = r.getUpdate(t, info.EgressId) - if info.Status == livekit.EgressStatus_EGRESS_ACTIVE { - r.checkUpdate(t, info.EgressId, livekit.EgressStatus_EGRESS_FAILED) - } else { - require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) + streamFailed := false + for info.Status == livekit.EgressStatus_EGRESS_ACTIVE { + if !streamFailed && info.StreamResults[0].Status == livekit.StreamInfo_FAILED { + streamFailed = true + } + if streamFailed { + // make sure this never reverts in subsequent updates + require.Equal(t, livekit.StreamInfo_FAILED, info.StreamResults[0].Status) + } + info = r.getUpdate(t, info.EgressId) } + + require.Equal(t, livekit.EgressStatus_EGRESS_FAILED, info.Status) + require.NotEmpty(t, info.Error) + require.Equal(t, livekit.StreamInfo_FAILED, info.StreamResults[0].Status) + require.NotEmpty(t, info.StreamResults[0].Error) }) } diff --git a/test/integration.go b/test/integration.go index 0285b8f3..ae99fc01 100644 --- a/test/integration.go +++ b/test/integration.go @@ -146,16 +146,32 @@ func (r *Runner) checkUpdate(t *testing.T, egressID string, status livekit.Egres } func (r *Runner) checkStreamUpdate(t *testing.T, egressID string, expected map[string]livekit.StreamInfo_Status) { - info := r.getUpdate(t, egressID) + for { + info := r.getUpdate(t, egressID) + require.Equal(t, len(expected), len(info.StreamResults)) + + failureStillActive := false + for _, s := range info.StreamResults { + require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") + + var e livekit.StreamInfo_Status + if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { + e = expected[badRtmpUrl1Redacted] + } else { + e = expected[s.Url] + } + if e == livekit.StreamInfo_FAILED && s.Status == livekit.StreamInfo_ACTIVE { + // expecting another update + failureStillActive = true + continue + } - require.Equal(t, len(expected), len(info.StreamResults)) - for _, s := range info.StreamResults { - if strings.HasSuffix(s.Url, ".contribute.live-video.net/app/{f...1}") { - require.Equal(t, expected[badRtmpUrl1Redacted], s.Status) - } else { - require.Equal(t, expected[s.Url], s.Status) + require.Equal(t, e, s.Status) + } + + if !failureStillActive { + return } - require.Equal(t, s.Status == livekit.StreamInfo_FAILED, s.Error != "") } } diff --git a/test/stream.go b/test/stream.go index c036cf75..da09e640 100644 --- a/test/stream.go +++ b/test/stream.go @@ -74,9 +74,8 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * require.Equal(t, config.StreamKeyframeInterval, p.KeyFrameInterval) } - // verify and check update + // verify time.Sleep(time.Second * 5) - r.verifyStreams(t, p, urls[0][2]) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ urls[0][1]: livekit.StreamInfo_ACTIVE, @@ -91,14 +90,8 @@ func (r *Runner) runStreamTest(t *testing.T, req *rpc.StartEgressRequest, test * require.NoError(t, err) time.Sleep(time.Second * 5) - // verify and check updates + // verify r.verifyStreams(t, p, urls[0][2], urls[2][2]) - r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ - urls[0][1]: livekit.StreamInfo_ACTIVE, - urls[1][1]: livekit.StreamInfo_FAILED, - urls[2][1]: livekit.StreamInfo_ACTIVE, - urls[3][1]: livekit.StreamInfo_ACTIVE, - }) r.checkStreamUpdate(t, egressID, map[string]livekit.StreamInfo_Status{ urls[0][1]: livekit.StreamInfo_ACTIVE, urls[1][1]: livekit.StreamInfo_FAILED,