Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow SRT output on egress server #688

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package config

import (
"strings"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
Expand Down Expand Up @@ -84,7 +86,12 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
return errors.ErrInvalidInput("multiple stream outputs")
}
if stream != nil {
conf, err := p.getStreamConfig(types.OutputTypeRTMP, stream.Urls)
outputType := types.OutputTypeRTMP
if len(stream.Urls) > 0 && strings.HasPrefix(stream.Urls[0], "srt://") {
yaruno marked this conversation as resolved.
Show resolved Hide resolved
outputType = types.OutputTypeSRT
}

conf, err := p.getStreamConfig(outputType, stream.Urls)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeSRT:
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeRaw:
p.AudioOutCodec = types.MimeTypeRawAudio
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,14 @@ func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType)
}
return rawUrl, redacted, nil

case types.OutputTypeSRT:
if parsed.Scheme != "srt" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
// Todo: Optionally, you can redact the SRT stream key or other sensitive parts of the URL
redacted := rawUrl
return rawUrl, redacted, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
Expand Down
17 changes: 16 additions & 1 deletion pkg/pipeline/builder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func BuildStreamBin(pipeline *gstreamer.Pipeline, p *config.PipelineConfig) (*St
return nil, nil, errors.ErrGstPipelineError(err)
}

case types.OutputTypeSRT:
// add SRT steam
mux, err = gst.NewElement("mpegtsmux")
if err != nil {
return nil, nil, errors.ErrGstPipelineError(err)
}

default:
err = errors.ErrInvalidInput("output type")
}
Expand Down Expand Up @@ -145,6 +152,14 @@ func (sb *StreamBin) AddStream(url string) error {
if err = sink.Set("location", url); err != nil {
return errors.ErrGstPipelineError(err)
}
case types.OutputTypeSRT:
sink, err = gst.NewElementWithName("srtsink", fmt.Sprintf("srtsink_%s", name))
if err != nil {
return errors.ErrGstPipelineError(err)
}
if err = sink.SetProperty("uri", url); err != nil {
return errors.ErrGstPipelineError(err)
}

default:
return errors.ErrInvalidInput("output type")
Expand All @@ -161,7 +176,7 @@ func (sb *StreamBin) AddStream(url string) error {
// It is later released in RemoveSink
proxy.Ref()

// Intercept flows from rtmp2sink. Anything besides EOS will be ignored
// Intercept flows from the sink. Anything besides EOS will be ignored
proxy.SetChainFunction(func(self *gst.Pad, _ *gst.Object, buffer *gst.Buffer) gst.FlowReturn {
// Buffer gets automatically unreferenced by go-gst.
// Without referencing it here, it will sometimes be garbage collected before being written
Expand Down
15 changes: 13 additions & 2 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pipeline

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -263,8 +264,13 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream

// add stream outputs first
for _, rawUrl := range req.AddOutputUrls {
outputType := types.OutputTypeRTMP
yaruno marked this conversation as resolved.
Show resolved Hide resolved
if strings.HasPrefix(rawUrl, "srt://") {
outputType = types.OutputTypeSRT
}
// validate and redact url
url, redacted, err := c.ValidateUrl(rawUrl, types.OutputTypeRTMP)
//url, redacted, err := c.ValidateUrl(rawUrl, types.OutputTypeRTMP)
url, redacted, err := c.ValidateUrl(rawUrl, outputType)
if err != nil {
errs.AppendErr(err)
continue
Expand Down Expand Up @@ -297,7 +303,12 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream

// remove stream outputs
for _, rawUrl := range req.RemoveOutputUrls {
url, _, err := c.ValidateUrl(rawUrl, types.OutputTypeRTMP)
outputType := types.OutputTypeRTMP
if strings.HasPrefix(rawUrl, "srt://") {
yaruno marked this conversation as resolved.
Show resolved Hide resolved
outputType = types.OutputTypeSRT
}
url, _, err := c.ValidateUrl(rawUrl, outputType)
//url, _, err := c.ValidateUrl(rawUrl, types.OutputTypeRTMP)
yaruno marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
errs.AppendErr(err)
continue
Expand Down
7 changes: 7 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
OutputTypeWebM OutputType = "video/webm"
OutputTypeJPEG OutputType = "image/jpeg"
OutputTypeRTMP OutputType = "rtmp"
OutputTypeSRT OutputType = "srt"
OutputTypeHLS OutputType = "application/x-mpegurl"
OutputTypeJSON OutputType = "application/json"
OutputTypeBlob OutputType = "application/octet-stream"
Expand All @@ -89,6 +90,7 @@ var (
OutputTypeTS: MimeTypeAAC,
OutputTypeWebM: MimeTypeOpus,
OutputTypeRTMP: MimeTypeAAC,
OutputTypeSRT: MimeTypeAAC,
OutputTypeHLS: MimeTypeAAC,
}

Expand All @@ -98,6 +100,7 @@ var (
OutputTypeTS: MimeTypeH264,
OutputTypeWebM: MimeTypeVP8,
OutputTypeRTMP: MimeTypeH264,
OutputTypeSRT: MimeTypeH264,
OutputTypeHLS: MimeTypeH264,
}

Expand Down Expand Up @@ -153,6 +156,10 @@ var (
MimeTypeAAC: true,
MimeTypeH264: true,
},
OutputTypeSRT: {
MimeTypeAAC: true,
MimeTypeH264: true,
},
OutputTypeHLS: {
MimeTypeAAC: true,
MimeTypeH264: true,
Expand Down
Loading