Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow SRT output on egress server #688

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package config

import (
"net/url"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
Expand Down Expand Up @@ -84,7 +86,22 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
return errors.ErrInvalidInput("multiple stream outputs")
}
if stream != nil {
conf, err := p.getStreamConfig(types.OutputTypeRTMP, stream.Urls)
u, err := url.Parse(stream.Urls[0])
if err != nil {
return errors.ErrInvalidInput("malformed url")
}

var outputType types.OutputType
switch u.Scheme {
case "srt":
outputType = types.OutputTypeSRT
case "rtmp":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to test for "rtmps"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that default to same output type as rtmp or should there be a new output type for rtmps ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All current output URL schemes: rtmp://, rtmps:// and mux:// should use OutputTypeRTMP.

outputType = types.OutputTypeRTMP
default:
return errors.ErrInvalidInput("invalid stream type")
}

conf, err := p.getStreamConfig(outputType, stream.Urls)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeSRT:
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeRaw:
p.AudioOutCodec = types.MimeTypeRawAudio
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,14 @@ func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType)
}
return rawUrl, redacted, nil

case types.OutputTypeSRT:
if parsed.Scheme != "srt" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
// Todo: Optionally, you can redact the SRT stream key or other sensitive parts of the URL
redacted := rawUrl
return rawUrl, redacted, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
Expand Down
17 changes: 16 additions & 1 deletion pkg/pipeline/builder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St
return nil, nil, errors.ErrGstPipelineError(err)
}

case types.OutputTypeSRT:
// add SRT steam
mux, err = gst.NewElement("mpegtsmux")
if err != nil {
return nil, nil, errors.ErrGstPipelineError(err)
}

default:
err = errors.ErrInvalidInput("output type")
}
Expand Down Expand Up @@ -145,6 +152,14 @@ func (sb *StreamBin) AddStream(url string) error {
if err = sink.Set("location", url); err != nil {
return errors.ErrGstPipelineError(err)
}
case types.OutputTypeSRT:
sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", name))
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = sink.SetProperty("uri", url); err != nil {
return errors.ErrGstPipelineError(err)
}

default:
return errors.ErrInvalidInput("output type")
Expand All @@ -161,7 +176,7 @@ func (sb *StreamBin) AddStream(url string) error {
// It is later released in RemoveSink
proxy.Ref()

// Intercept flows from rtmp2sink. Anything besides EOS will be ignored
// Intercept flows from the sink. Anything besides EOS will be ignored
proxy.SetChainFunction(func(self *gst.Pad, _ *gst.Object, buffer *gst.Buffer) gst.FlowReturn {
// Buffer gets automatically unreferenced by go-gst.
// Without referencing it here, it will sometimes be garbage collected before being written
Expand Down
38 changes: 35 additions & 3 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pipeline

import (
"context"
"net/url"
"sync"
"time"

Expand Down Expand Up @@ -263,8 +264,23 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream

// add stream outputs first
for _, rawUrl := range req.AddOutputUrls {
// validate and redact url
url, redacted, err := c.ValidateUrl(rawUrl, types.OutputTypeRTMP)

u, err := url.Parse(rawUrl)
if err != nil {
return errors.ErrInvalidInput("malformed url")
}

var outputType types.OutputType
switch u.Scheme {
case "srt":
outputType = types.OutputTypeSRT
case "rtmp":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to support rtmps

outputType = types.OutputTypeRTMP
default:
return errors.ErrInvalidInput("invalid stream type")
}

url, redacted, err := c.ValidateUrl(rawUrl, outputType)
if err != nil {
errs.AppendErr(err)
continue
Expand Down Expand Up @@ -297,7 +313,23 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream

// remove stream outputs
for _, rawUrl := range req.RemoveOutputUrls {
url, _, err := c.ValidateUrl(rawUrl, types.OutputTypeRTMP)
u, err := url.Parse(rawUrl)
if err != nil {
return errors.ErrInvalidInput("malformed url")
}

var outputType types.OutputType
switch u.Scheme {
case "srt":
outputType = types.OutputTypeSRT
case "rtmp":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to support rtmps

outputType = types.OutputTypeRTMP
default:
return errors.ErrInvalidInput("invalid stream type")
}

url, _, err := c.ValidateUrl(rawUrl, outputType)

if err != nil {
errs.AppendErr(err)
continue
Expand Down
7 changes: 7 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
OutputTypeWebM OutputType = "video/webm"
OutputTypeJPEG OutputType = "image/jpeg"
OutputTypeRTMP OutputType = "rtmp"
OutputTypeSRT OutputType = "srt"
OutputTypeHLS OutputType = "application/x-mpegurl"
OutputTypeJSON OutputType = "application/json"
OutputTypeBlob OutputType = "application/octet-stream"
Expand All @@ -89,6 +90,7 @@ var (
OutputTypeTS: MimeTypeAAC,
OutputTypeWebM: MimeTypeOpus,
OutputTypeRTMP: MimeTypeAAC,
OutputTypeSRT: MimeTypeAAC,
OutputTypeHLS: MimeTypeAAC,
}

Expand All @@ -98,6 +100,7 @@ var (
OutputTypeTS: MimeTypeH264,
OutputTypeWebM: MimeTypeVP8,
OutputTypeRTMP: MimeTypeH264,
OutputTypeSRT: MimeTypeH264,
OutputTypeHLS: MimeTypeH264,
}

Expand Down Expand Up @@ -153,6 +156,10 @@ var (
MimeTypeAAC: true,
MimeTypeH264: true,
},
OutputTypeSRT: {
MimeTypeAAC: true,
MimeTypeH264: true,
},
OutputTypeHLS: {
MimeTypeAAC: true,
MimeTypeH264: true,
Expand Down
Loading