Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SRT support #730

Merged
merged 7 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ require (
github.com/go-gst/go-glib v1.0.1
github.com/go-gst/go-gst v1.0.0
github.com/go-jose/go-jose/v3 v3.0.3
github.com/go-jose/go-jose/v4 v4.0.3
github.com/go-logr/logr v1.4.2
github.com/googleapis/gax-go/v2 v2.12.4
github.com/gorilla/websocket v1.5.2
github.com/livekit/livekit-server v1.6.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa
github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5
github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad
github.com/pion/rtp v1.8.6
github.com/pion/rtp/v2 v2.0.0
github.com/pion/webrtc/v3 v3.2.43
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.1
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ github.com/go-gst/go-gst v1.0.0 h1:YBzE3JVZvbrnWWb/iGCXuiaOvHQ7HW+xXUBR++EgEtQ=
github.com/go-gst/go-gst v1.0.0/go.mod h1:sQMWMnR98s2B4w52e4IXyGvz75rXV8CZ1bejdPT3KIs=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -185,8 +184,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec=
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa h1:rkX4blO/giAiqWM/E5T0N7SU0OA9pMHjWekhv+a6byI=
github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a h1:KqA12sSIgRm4HvwVmN0FBdoN0COUaOXpIezAiJO+Jgc=
github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad h1:SfX8OBXfUx9WHGEIsJi+rpWMsPhgtlRlQpHk3bnEZrI=
Expand Down Expand Up @@ -230,7 +229,6 @@ github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.6 h1:MTmn/b0aWWsAzux2AmP8WGllusBVw4NPYPVFFd7jUPw=
github.com/pion/rtp v1.8.6/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp/v2 v2.0.0/go.mod h1:Vj+rrFbJCT3yxqE/VSwaOo9DQ2pMKGPxuE7hplGOlOs=
github.com/pion/sctp v1.8.13/go.mod h1:YKSgO/bO/6aOMP9LCie1DuD7m+GamiK2yIiPM6vH+GA=
github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY=
github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
Expand Down Expand Up @@ -335,8 +333,6 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -398,8 +394,6 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
29 changes: 28 additions & 1 deletion pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package config

import (
"net/url"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
Expand Down Expand Up @@ -85,7 +87,32 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
return errors.ErrInvalidInput("multiple stream outputs")
}
if stream != nil {
conf, err := p.getStreamConfig(types.OutputTypeRTMP, stream.Urls)
var outputType types.OutputType
switch stream.Protocol {
case livekit.StreamProtocol_DEFAULT_PROTOCOL:
if len(stream.Urls) == 0 {
return errors.ErrInvalidInput("stream protocol")
}

parsed, err := url.Parse(stream.Urls[0])
if err != nil {
return errors.ErrInvalidUrl(stream.Urls[0], err.Error())
}

var ok bool
outputType, ok = types.StreamOutputTypes[parsed.Scheme]
if !ok {
return errors.ErrInvalidUrl(stream.Urls[0], "invalid protocol")
}

case livekit.StreamProtocol_RTMP:
outputType = types.OutputTypeRTMP

case livekit.StreamProtocol_SRT:
outputType = types.OutputTypeSRT
}

conf, err := p.getStreamConfig(outputType, stream.Urls)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeSRT:
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeRaw:
p.AudioOutCodec = types.MimeTypeRawAudio
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, er
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
}
if types.StreamOutputTypes[parsed.Scheme] != outputType {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}

switch outputType {
case types.OutputTypeRTMP:
Expand All @@ -58,10 +61,10 @@ func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, er
}
return rawUrl, redacted, nil

case types.OutputTypeSRT:
return rawUrl, rawUrl, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
return rawUrl, rawUrl, nil

default:
Expand Down
20 changes: 10 additions & 10 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,23 @@ func (b *Bin) AddElements(elements ...*gst.Element) error {
return nil
}

func (b *Bin) RemoveSourceBin(name string) (bool, error) {
func (b *Bin) RemoveSourceBin(name string) error {
logger.Debugw(fmt.Sprintf("removing src %s from %s", name, b.bin.GetName()))
return b.removeBin(name, gst.PadDirectionSource)
}

func (b *Bin) RemoveSinkBin(name string) (bool, error) {
func (b *Bin) RemoveSinkBin(name string) error {
logger.Debugw(fmt.Sprintf("removing sink %s from %s", name, b.bin.GetName()))
return b.removeBin(name, gst.PadDirectionSink)
}

func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {
func (b *Bin) removeBin(name string, direction gst.PadDirection) error {
b.LockStateShared()
defer b.UnlockStateShared()

state := b.GetStateLocked()
if state > StateRunning {
return true, nil
return nil
}

b.mu.Lock()
Expand All @@ -192,14 +192,14 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {
}
}
if bin == nil {
return false, nil
return nil
}

if state == StateBuilding {
if err := b.pipeline.Remove(bin.bin.Element); err != nil {
return false, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
return true, nil
return nil
}

if direction == gst.PadDirectionSource {
Expand All @@ -208,7 +208,7 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {
b.probeRemoveSink(bin)
}

return true, nil
return nil
}

func (b *Bin) probeRemoveSource(src *Bin) {
Expand Down Expand Up @@ -262,7 +262,7 @@ func (b *Bin) probeRemoveSink(sink *Bin) {
return
}

srcGhostPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
srcGhostPad.AddProbe(gst.PadProbeTypeAllBoth, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
srcGhostPad.Unlink(sinkGhostPad.Pad)
sinkGhostPad.Pad.SendEvent(gst.NewEOSEvent())

Expand All @@ -281,7 +281,7 @@ func (b *Bin) probeRemoveSink(sink *Bin) {

b.elements[len(b.elements)-1].ReleaseRequestPad(srcGhostPad.GetTarget())
b.bin.RemovePad(srcGhostPad.Pad)
return gst.PadProbeRemove
return gst.PadProbeOK
})
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ func (b *AudioBin) onTrackRemoved(trackID string) {

b.mu.Lock()
name, ok := b.names[trackID]
if !ok {
b.mu.Unlock()
return
}
delete(b.names, trackID)
b.mu.Unlock()

if ok {
if _, err := b.bin.RemoveSourceBin(name); err != nil {
b.bin.OnError(err)
}
if err := b.bin.RemoveSourceBin(name); err != nil {
b.bin.OnError(err)
}
}

Expand Down
Loading
Loading