Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update started_at for rtmp output #751

Merged
merged 4 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config

import (
"os"
"strings"
"time"

"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -111,16 +112,20 @@ func (c *BaseConfig) initLogger(values ...interface{}) error {
c.Logging.Level = c.LogLevel
}

var gstDebug string
var gstDebug []string
switch c.Logging.Level {
case "debug":
gstDebug = "3"
gstDebug = []string{"3"}
case "info", "warn":
gstDebug = "2"
gstDebug = []string{"2"}
case "error":
gstDebug = "1"
gstDebug = []string{"1"}
}
if err := os.Setenv("GST_DEBUG", gstDebug); err != nil {
gstDebug = append(gstDebug,
"rtmpclient:4",
"srtlib:1",
)
if err := os.Setenv("GST_DEBUG", strings.Join(gstDebug, ",")); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type StreamConfig struct {
outputConfig

Urls []string
StreamIDs map[string]string
StreamInfo map[string]*livekit.StreamInfo

twitchTemplate string
Expand All @@ -47,6 +48,7 @@ func (p *PipelineConfig) GetWebsocketConfig() *StreamConfig {
func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []string) (*StreamConfig, error) {
conf := &StreamConfig{
outputConfig: outputConfig{OutputType: outputType},
StreamIDs: make(map[string]string),
}

conf.StreamInfo = make(map[string]*livekit.StreamInfo)
Expand Down
21 changes: 19 additions & 2 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
"github.com/livekit/protocol/utils"
)

var twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")
// rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)
var (
rtmpRegexp = regexp.MustCompile("^(rtmps?:\\/\\/)(.*\\/)(.*\\/)(\\S*)( live=1)?$")
twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")
)

func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
Expand All @@ -55,10 +59,12 @@ func (o *StreamConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (
}
}

redacted, ok := utils.RedactStreamKey(rawUrl)
redacted, streamID, ok := redactStreamKey(rawUrl)
if !ok {
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
o.StreamIDs[rawUrl] = streamID

return rawUrl, redacted, nil

case types.OutputTypeSRT:
Expand Down Expand Up @@ -142,3 +148,14 @@ func (o *StreamConfig) updateTwitchTemplate() error {

return errors.New("no ingest found")
}

func redactStreamKey(url string) (string, string, bool) {
match := rtmpRegexp.FindStringSubmatch(url)
if len(match) != 6 {
return url, "", false
}

streamID := match[4]
match[4] = utils.RedactIdentifier(match[4])
return strings.Join(match[1:], ""), streamID, true
}
33 changes: 20 additions & 13 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,11 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
continue
}

// add stream
if err = c.streamBin.AddStream(url); err != nil {
errs.AppendErr(err)
continue
}

// add to output count
c.OutputCount++

// add stream info to results
c.mu.Lock()
streamInfo := &livekit.StreamInfo{
Url: redacted,
StartedAt: now,
Status: livekit.StreamInfo_ACTIVE,
Url: redacted,
Status: livekit.StreamInfo_ACTIVE,
}
o.StreamInfo[url] = streamInfo

Expand All @@ -299,6 +289,18 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
list.Info = append(list.Info, streamInfo)
}
c.mu.Unlock()

// add stream
if err = c.streamBin.AddStream(url); err != nil {
streamInfo.Status = livekit.StreamInfo_FAILED
errs.AppendErr(err)
continue
}

if o.OutputType != types.OutputTypeRTMP {
streamInfo.StartedAt = now
}
c.OutputCount++
sendUpdate = true
}

Expand Down Expand Up @@ -520,8 +522,13 @@ func (c *Controller) updateStartTime(startedAt int64) {
}
switch egressType {
case types.EgressTypeStream, types.EgressTypeWebsocket:
streamConfig := o[0].(*config.StreamConfig)
if streamConfig.OutputType == types.OutputTypeRTMP {
continue
}

c.mu.Lock()
for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo {
for _, streamInfo := range streamConfig.StreamInfo {
streamInfo.Status = livekit.StreamInfo_ACTIVE
streamInfo.StartedAt = startedAt
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"net/url"
"os"
"path"

"github.com/Azure/azure-storage-blob-go/azblob"

Expand Down Expand Up @@ -89,5 +88,5 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
return "", 0, errors.ErrUploadFailed("Azure", err)
}

return path.Join(u.container, storageFilepath), stat.Size(), nil
return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil
}
33 changes: 25 additions & 8 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ const (
msgInputDisappeared = "Can't copy metadata because input buffer disappeared"
msgSkippingSegment = "error reading data -1 (reason: Success), skipping segment"
fnGstAudioResampleCheckDiscont = "gst_audio_resample_check_discont"
callerEPollUpdateEvents = "./srtcore/epoll.cpp:905"

// noisy gst fixmes
msgStreamStart = "stream-start event without group-id. Consider implementing group-id handling in the upstream elements"
msgCreatingStream = "Creating random stream-id, consider implementing a deterministic way of creating a stream-id"
msgAggregateSubclass = "Subclass should call gst_aggregator_selected_samples() from its aggregate implementation."

// rtmp client
catRtmpClient = "rtmpclient"
fnSendCreateStream = "send_create_stream"
)

var (
Expand All @@ -70,38 +73,52 @@ var (
msgInputDisappeared: true,
msgSkippingSegment: true,
fnGstAudioResampleCheckDiscont: true,
callerEPollUpdateEvents: true,
msgStreamStart: true,
msgCreatingStream: true,
msgAggregateSubclass: true,
}
)

func (c *Controller) gstLog(
_ *gst.DebugCategory,
cat *gst.DebugCategory,
level gst.DebugLevel,
file, function string, line int,
_ *gst.LoggedObject,
debugMsg *gst.DebugMessage,
) {
category := cat.GetName()
message := debugMsg.Get()
lvl, ok := logLevels[level]
if !ok || ignore[message] || ignore[function] {
return
}

caller := fmt.Sprintf("%s:%d", file, line)
if ignore[caller] {
if category == catRtmpClient {
if function == fnSendCreateStream {
streamID := strings.Split(message, "'")[1]
if o := c.GetStreamConfig(); o != nil {
c.mu.Lock()
for url, sID := range o.StreamIDs {
if streamID == sID {
if streamInfo := o.StreamInfo[url]; streamInfo != nil && streamInfo.StartedAt == 0 {
streamInfo.StartedAt = time.Now().UnixNano()
break
}
}
}
c.mu.Unlock()
}
}
return
}

var msg string
if function != "" {
msg = fmt.Sprintf("[gst %s] %s: %s", lvl, function, message)
msg = fmt.Sprintf("[%s %s] %s: %s", category, lvl, function, message)
} else {
msg = fmt.Sprintf("[gst %s] %s", lvl, message)
msg = fmt.Sprintf("[%s %s] %s", category, lvl, message)
}
c.gstLogger.Debugw(msg, "caller", caller)
c.gstLogger.Debugw(msg, "caller", fmt.Sprintf("%s:%d", file, line))
}

func (c *Controller) messageWatch(msg *gst.Message) bool {
Expand Down
Loading