Skip to content

Commit

Permalink
SRT support (#730)
Browse files Browse the repository at this point in the history
* Allow SRT output on egress server (#688)

* initial commit, preliminary srt support

* use switch/case instead of condition logic

---------

Co-authored-by: David Colburn <[email protected]>

* testing and fixes

* more fixes

* double run call on track test

* add ws schemes to output type map

* remove debug logs

---------

Co-authored-by: yaruno <[email protected]>
  • Loading branch information
frostbyte73 and yaruno authored Jul 23, 2024
1 parent 240f18f commit 5b6bd31
Show file tree
Hide file tree
Showing 23 changed files with 950 additions and 785 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ require (
github.com/go-gst/go-glib v1.0.1
github.com/go-gst/go-gst v1.0.0
github.com/go-jose/go-jose/v3 v3.0.3
github.com/go-jose/go-jose/v4 v4.0.3
github.com/go-logr/logr v1.4.2
github.com/googleapis/gax-go/v2 v2.12.4
github.com/gorilla/websocket v1.5.2
github.com/livekit/livekit-server v1.6.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa
github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5
github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad
github.com/pion/rtp v1.8.6
github.com/pion/rtp/v2 v2.0.0
github.com/pion/webrtc/v3 v3.2.43
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.1
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ github.com/go-gst/go-gst v1.0.0 h1:YBzE3JVZvbrnWWb/iGCXuiaOvHQ7HW+xXUBR++EgEtQ=
github.com/go-gst/go-gst v1.0.0/go.mod h1:sQMWMnR98s2B4w52e4IXyGvz75rXV8CZ1bejdPT3KIs=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
github.com/go-jose/go-jose/v4 v4.0.3/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -185,8 +184,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec=
github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa h1:rkX4blO/giAiqWM/E5T0N7SU0OA9pMHjWekhv+a6byI=
github.com/livekit/protocol v1.19.2-0.20240705155036-b272353929aa/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a h1:KqA12sSIgRm4HvwVmN0FBdoN0COUaOXpIezAiJO+Jgc=
github.com/livekit/protocol v1.19.2-0.20240723043112-bacbd15bfb3a/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.1-0.20240628022514-ad17d3f0adad h1:SfX8OBXfUx9WHGEIsJi+rpWMsPhgtlRlQpHk3bnEZrI=
Expand Down Expand Up @@ -230,7 +229,6 @@ github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.6 h1:MTmn/b0aWWsAzux2AmP8WGllusBVw4NPYPVFFd7jUPw=
github.com/pion/rtp v1.8.6/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp/v2 v2.0.0/go.mod h1:Vj+rrFbJCT3yxqE/VSwaOo9DQ2pMKGPxuE7hplGOlOs=
github.com/pion/sctp v1.8.13/go.mod h1:YKSgO/bO/6aOMP9LCie1DuD7m+GamiK2yIiPM6vH+GA=
github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY=
github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
Expand Down Expand Up @@ -335,8 +333,6 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -398,8 +394,6 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
29 changes: 28 additions & 1 deletion pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package config

import (
"net/url"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
Expand Down Expand Up @@ -85,7 +87,32 @@ func (p *PipelineConfig) updateEncodedOutputs(req egress.EncodedOutput) error {
return errors.ErrInvalidInput("multiple stream outputs")
}
if stream != nil {
conf, err := p.getStreamConfig(types.OutputTypeRTMP, stream.Urls)
var outputType types.OutputType
switch stream.Protocol {
case livekit.StreamProtocol_DEFAULT_PROTOCOL:
if len(stream.Urls) == 0 {
return errors.ErrInvalidInput("stream protocol")
}

parsed, err := url.Parse(stream.Urls[0])
if err != nil {
return errors.ErrInvalidUrl(stream.Urls[0], err.Error())
}

var ok bool
outputType, ok = types.StreamOutputTypes[parsed.Scheme]
if !ok {
return errors.ErrInvalidUrl(stream.Urls[0], "invalid protocol")
}

case livekit.StreamProtocol_RTMP:
outputType = types.OutputTypeRTMP

case livekit.StreamProtocol_SRT:
outputType = types.OutputTypeSRT
}

conf, err := p.getStreamConfig(outputType, stream.Urls)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeSRT:
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264

case types.OutputTypeRaw:
p.AudioOutCodec = types.MimeTypeRawAudio
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, er
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
}
if types.StreamOutputTypes[parsed.Scheme] != outputType {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}

switch outputType {
case types.OutputTypeRTMP:
Expand All @@ -58,10 +61,10 @@ func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, er
}
return rawUrl, redacted, nil

case types.OutputTypeSRT:
return rawUrl, rawUrl, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
return rawUrl, rawUrl, nil

default:
Expand Down
20 changes: 10 additions & 10 deletions pkg/gstreamer/bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,23 @@ func (b *Bin) AddElements(elements ...*gst.Element) error {
return nil
}

func (b *Bin) RemoveSourceBin(name string) (bool, error) {
func (b *Bin) RemoveSourceBin(name string) error {
logger.Debugw(fmt.Sprintf("removing src %s from %s", name, b.bin.GetName()))
return b.removeBin(name, gst.PadDirectionSource)
}

func (b *Bin) RemoveSinkBin(name string) (bool, error) {
func (b *Bin) RemoveSinkBin(name string) error {
logger.Debugw(fmt.Sprintf("removing sink %s from %s", name, b.bin.GetName()))
return b.removeBin(name, gst.PadDirectionSink)
}

func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {
func (b *Bin) removeBin(name string, direction gst.PadDirection) error {
b.LockStateShared()
defer b.UnlockStateShared()

state := b.GetStateLocked()
if state > StateRunning {
return true, nil
return nil
}

b.mu.Lock()
Expand All @@ -192,14 +192,14 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {
}
}
if bin == nil {
return false, nil
return nil
}

if state == StateBuilding {
if err := b.pipeline.Remove(bin.bin.Element); err != nil {
return false, errors.ErrGstPipelineError(err)
return errors.ErrGstPipelineError(err)
}
return true, nil
return nil
}

if direction == gst.PadDirectionSource {
Expand All @@ -208,7 +208,7 @@ func (b *Bin) removeBin(name string, direction gst.PadDirection) (bool, error) {
b.probeRemoveSink(bin)
}

return true, nil
return nil
}

func (b *Bin) probeRemoveSource(src *Bin) {
Expand Down Expand Up @@ -262,7 +262,7 @@ func (b *Bin) probeRemoveSink(sink *Bin) {
return
}

srcGhostPad.AddProbe(gst.PadProbeTypeBlockDownstream, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
srcGhostPad.AddProbe(gst.PadProbeTypeAllBoth, func(_ *gst.Pad, _ *gst.PadProbeInfo) gst.PadProbeReturn {
srcGhostPad.Unlink(sinkGhostPad.Pad)
sinkGhostPad.Pad.SendEvent(gst.NewEOSEvent())

Expand All @@ -281,7 +281,7 @@ func (b *Bin) probeRemoveSink(sink *Bin) {

b.elements[len(b.elements)-1].ReleaseRequestPad(srcGhostPad.GetTarget())
b.bin.RemovePad(srcGhostPad.Pad)
return gst.PadProbeRemove
return gst.PadProbeOK
})
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/pipeline/builder/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ func (b *AudioBin) onTrackRemoved(trackID string) {

b.mu.Lock()
name, ok := b.names[trackID]
if !ok {
b.mu.Unlock()
return
}
delete(b.names, trackID)
b.mu.Unlock()

if ok {
if _, err := b.bin.RemoveSourceBin(name); err != nil {
b.bin.OnError(err)
}
if err := b.bin.RemoveSourceBin(name); err != nil {
b.bin.OnError(err)
}
}

Expand Down
Loading

0 comments on commit 5b6bd31

Please sign in to comment.