From 1cd3a3ab20a7c57e8b40ccaa7bb412282c903ae5 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 5 Aug 2024 17:49:34 -0400 Subject: [PATCH 1/3] update started_at for rtmp output --- pkg/config/base.go | 15 ++++++++++----- pkg/config/output_stream.go | 2 ++ pkg/config/urls.go | 21 +++++++++++++++++++-- pkg/pipeline/controller.go | 32 ++++++++++++++++++-------------- pkg/pipeline/watch.go | 31 +++++++++++++++++++++++-------- 5 files changed, 72 insertions(+), 29 deletions(-) diff --git a/pkg/config/base.go b/pkg/config/base.go index 4138f1ed..4049eae4 100644 --- a/pkg/config/base.go +++ b/pkg/config/base.go @@ -16,6 +16,7 @@ package config import ( "os" + "strings" "time" "github.com/livekit/protocol/logger" @@ -111,16 +112,20 @@ func (c *BaseConfig) initLogger(values ...interface{}) error { c.Logging.Level = c.LogLevel } - var gstDebug string + var gstDebug []string switch c.Logging.Level { case "debug": - gstDebug = "3" + gstDebug = []string{"3"} case "info", "warn": - gstDebug = "2" + gstDebug = []string{"2"} case "error": - gstDebug = "1" + gstDebug = []string{"1"} } - if err := os.Setenv("GST_DEBUG", gstDebug); err != nil { + gstDebug = append(gstDebug, + "rtmpclient:4", + "srtlib:1", + ) + if err := os.Setenv("GST_DEBUG", strings.Join(gstDebug, ",")); err != nil { return err } diff --git a/pkg/config/output_stream.go b/pkg/config/output_stream.go index 17f162a6..413314aa 100644 --- a/pkg/config/output_stream.go +++ b/pkg/config/output_stream.go @@ -23,6 +23,7 @@ type StreamConfig struct { outputConfig Urls []string + StreamIDs map[string]string StreamInfo map[string]*livekit.StreamInfo twitchTemplate string @@ -47,6 +48,7 @@ func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig { func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) { conf := &StreamConfig{ outputConfig: outputConfig{OutputType: outputType}, + StreamIDs: make(map[string]string), } conf.StreamInfo = make(map[string]*livekit.StreamInfo) diff --git a/pkg/config/urls.go b/pkg/config/urls.go index 45daba26..5e96ddb6 100644 --- a/pkg/config/urls.go +++ b/pkg/config/urls.go @@ -28,7 +28,11 @@ import ( "github.com/livekit/protocol/utils" ) -var twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$") +// rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1) +var ( + rtmpRegexp = regexp.MustCompile("^(rtmps?:\\/\\/)(.*\\/)(.*\\/)(\\S*)( live=1)?$") + twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$") +) func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) { parsed, err := url.Parse(rawUrl) @@ -55,10 +59,12 @@ func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) ( } } - redacted, ok := utils.RedactStreamKey(rawUrl) + redacted, streamID, ok := redactStreamKey(rawUrl) if !ok { return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)") } + o.StreamIDs[rawUrl] = streamID + return rawUrl, redacted, nil case types.OutputTypeSRT: @@ -142,3 +148,14 @@ func (o *StreamConfig) updateTwitchTemplate() error { return errors.New("no ingest found") } + +func redactStreamKey(url string) (string, string, bool) { + match := rtmpRegexp.FindStringSubmatch(url) + if len(match) != 6 { + return url, "", false + } + + streamID := match[4] + match[4] = utils.RedactIdentifier(match[4]) + return strings.Join(match[1:], ""), streamID, true +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 30fa9ce9..25d894d1 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -265,7 +265,6 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream sendUpdate := false errs := errors.ErrArray{} - now := time.Now().UnixNano() // add stream outputs first for _, rawUrl := range req.AddOutputUrls { @@ -276,21 +275,11 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream continue } - // add stream - if err = c.streamBin.AddStream(url); err != nil { - errs.AppendErr(err) - continue - } - - // add to output count - c.OutputCount++ - // add stream info to results c.mu.Lock() streamInfo := &livekit.StreamInfo{ - Url: redacted, - StartedAt: now, - Status: livekit.StreamInfo_ACTIVE, + Url: redacted, + Status: livekit.StreamInfo_ACTIVE, } o.StreamInfo[url] = streamInfo @@ -299,6 +288,16 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream list.Info = append(list.Info, streamInfo) } c.mu.Unlock() + + // add stream + if err = c.streamBin.AddStream(url); err != nil { + streamInfo.Status = livekit.StreamInfo_FAILED + errs.AppendErr(err) + continue + } + + // add to output count + c.OutputCount++ sendUpdate = true } @@ -520,8 +519,13 @@ func (c *Controller) updateStartTime(startedAt int64) { } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: + streamConfig := o[0].(*config.StreamConfig) + if streamConfig.OutputType == types.OutputTypeRTMP { + continue + } + c.mu.Lock() - for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo { + for _, streamInfo := range streamConfig.StreamInfo { streamInfo.Status = livekit.StreamInfo_ACTIVE streamInfo.StartedAt = startedAt } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 8b12c249..60f05b40 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -42,12 +42,15 @@ const ( msgInputDisappeared = "Can't copy metadata because input buffer disappeared" msgSkippingSegment = "error reading data -1 (reason: Success), skipping segment" fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont" - callerEPollUpdateEvents = "./srtcore/epoll.cpp:905" // noisy gst fixmes msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements" msgCreatingStream = "Creating random stream-id, consider implementing a deterministic way of creating a stream-id" msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation." + + // rtmp client + catRtmpClient = "rtmpclient" + fnSendCreateStream = "send_create_stream" ) var ( @@ -70,7 +73,6 @@ var ( msgInputDisappeared: true, msgSkippingSegment: true, fnGstAudioResampleCheckDiscont: true, - callerEPollUpdateEvents: true, msgStreamStart: true, msgCreatingStream: true, msgAggregateSubclass: true, @@ -78,30 +80,43 @@ var ( ) func (c *Controller) gstLog( - _ *gst.DebugCategory, + cat *gst.DebugCategory, level gst.DebugLevel, file, function string, line int, _ *gst.LoggedObject, debugMsg *gst.DebugMessage, ) { + category := cat.GetName() message := debugMsg.Get() lvl, ok := logLevels[level] if !ok || ignore[message] || ignore[function] { return } - caller := fmt.Sprintf("%s:%d", file, line) - if ignore[caller] { + if category == catRtmpClient { + if function == fnSendCreateStream { + streamID := strings.Split(message, "'")[1] + if o := c.GetStreamConfig(); o != nil { + for url, sID := range o.StreamIDs { + if streamID == sID { + if streamInfo := o.StreamInfo[url]; streamInfo != nil && streamInfo.StartedAt == 0 { + streamInfo.StartedAt = time.Now().UnixNano() + break + } + } + } + } + } return } var msg string if function != "" { - msg = fmt.Sprintf("[gst %s] %s: %s", lvl, function, message) + msg = fmt.Sprintf("[%s %s] %s: %s", category, lvl, function, message) } else { - msg = fmt.Sprintf("[gst %s] %s", lvl, message) + msg = fmt.Sprintf("[%s %s] %s", category, lvl, message) } - c.gstLogger.Debugw(msg, "caller", caller) + c.gstLogger.Debugw(msg, "caller", fmt.Sprintf("%s:%d", file, line)) } func (c *Controller) messageWatch(msg *gst.Message) bool { From c526bd976587671f58b610eba661310899a8c4c0 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 5 Aug 2024 18:07:21 -0400 Subject: [PATCH 2/3] add lock, fix srt added through update --- pkg/pipeline/controller.go | 5 ++++- pkg/pipeline/watch.go | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 25d894d1..916e9c52 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -265,6 +265,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream sendUpdate := false errs := errors.ErrArray{} + now := time.Now().UnixNano() // add stream outputs first for _, rawUrl := range req.AddOutputUrls { @@ -296,7 +297,9 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream continue } - // add to output count + if o.OutputType != types.OutputTypeRTMP { + streamInfo.StartedAt = now + } c.OutputCount++ sendUpdate = true } diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 60f05b40..a15dd03b 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -97,6 +97,7 @@ func (c *Controller) gstLog( if function == fnSendCreateStream { streamID := strings.Split(message, "'")[1] if o := c.GetStreamConfig(); o != nil { + c.mu.Lock() for url, sID := range o.StreamIDs { if streamID == sID { if streamInfo := o.StreamInfo[url]; streamInfo != nil && streamInfo.StartedAt == 0 { @@ -105,6 +106,7 @@ func (c *Controller) gstLog( } } } + c.mu.Unlock() } } return From a4930ce1630124e9bae2601b5c1eac40acf1ebaa Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 5 Aug 2024 18:23:39 -0400 Subject: [PATCH 3/3] also fix azure location --- pkg/pipeline/sink/uploader/azure.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/pipeline/sink/uploader/azure.go b/pkg/pipeline/sink/uploader/azure.go index a84e178f..130abd3a 100644 --- a/pkg/pipeline/sink/uploader/azure.go +++ b/pkg/pipeline/sink/uploader/azure.go @@ -19,7 +19,6 @@ import ( "fmt" "net/url" "os" - "path" "github.com/Azure/azure-storage-blob-go/azblob" @@ -89,5 +88,5 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType return "", 0, errors.ErrUploadFailed("Azure", err) } - return path.Join(u.container, storageFilepath), stat.Size(), nil + return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil }