Skip to content

Commit

Permalink
Support for EnableTranscoding (#256)
Browse files Browse the repository at this point in the history
BypassTranscoding is still supported for backward compatibility
  • Loading branch information
biglittlebigben authored Apr 26, 2024
1 parent ca8834a commit f608c48
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 22 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ require (
github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e
github.com/livekit/protocol v1.12.1-0.20240422173605-bad3fc9684c3
github.com/livekit/protocol v1.14.0
github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9
github.com/livekit/server-sdk-go/v2 v2.1.1
github.com/livekit/server-sdk-go/v2 v2.1.2
github.com/pion/dtls/v2 v2.2.10
github.com/pion/interceptor v0.1.28
github.com/pion/rtcp v1.2.14
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e h1:ss4VwrouYiDpuNJ9BUTH+WsW+GDdJS70iZp8ii3/0Lc=
github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.12.1-0.20240422173605-bad3fc9684c3 h1:2RaOmN2xMN+bzsEYnAKI4XJaO3TjI6mLIL10+I46V9k=
github.com/livekit/protocol v1.12.1-0.20240422173605-bad3fc9684c3/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/protocol v1.14.0 h1:W0i5HR2Efoy2j9NhmONolU3FIsUDVl6MAT5zWfALev0=
github.com/livekit/protocol v1.14.0/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9 h1:4CngtPIJ58WcQ1sUDGdxJDkTndQpN6M/T8jXvRAd7Oc=
github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.1.1 h1:TGMF3BHAyx+/A0iVZTGG+4mZMXrAY1WRbOMqbwGnrHI=
github.com/livekit/server-sdk-go/v2 v2.1.1/go.mod h1:4d3kLn4qLMwKnKGpivW29YUwcBQKXFXzwH+x/nua94E=
github.com/livekit/server-sdk-go/v2 v2.1.2 h1:3MhFqptHjzpsNAcisHYDtn77qrJ9szsAy4zJkeI3Mic=
github.com/livekit/server-sdk-go/v2 v2.1.2/go.mod h1:ZjOQBtoFcU+oq1c2xaMLHhcQ3DNcfkwC3FHG8DAvkZ8=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
Expand Down
2 changes: 1 addition & 1 deletion pkg/lksdk_output/lksdk_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewLKSDKOutput(ctx context.Context, p *params.Params) (*LKSDKOutput, error)
lksdk.WithAutoSubscribe(false),
}

if p.BypassTranscoding {
if !*p.EnableTranscoding {
opts = append(opts, lksdk.WithInterceptors([]interceptor.Factory{}))
} else {
var br uint32
Expand Down
19 changes: 19 additions & 0 deletions pkg/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func GetParams(ctx context.Context, psrpcClient rpc.IOInfoClient, conf *config.C
return nil, err
}

UpdateTranscodingEnabled(infoCopy)

if token == "" {
token, err = ingress.BuildIngressToken(conf.ApiKey, conf.ApiSecret, info.RoomName, info.ParticipantIdentity, info.ParticipantName, info.ParticipantMetadata)
if err != nil {
Expand All @@ -161,6 +163,23 @@ func GetParams(ctx context.Context, psrpcClient rpc.IOInfoClient, conf *config.C
return p, nil
}

func UpdateTranscodingEnabled(info *livekit.IngressInfo) {
if info.EnableTranscoding != nil {
return
}

// Backward compatibility. This is an ingress created before the EnableTranscoding field was added.
// Default to enabling transcoding for WHIP
switch info.InputType {
case livekit.IngressInput_WHIP_INPUT:
b := !info.BypassTranscoding
info.EnableTranscoding = &b
default:
t := true
info.EnableTranscoding = &t
}
}

func getLoggerFields(info *livekit.IngressInfo, loggingFields map[string]string) []interface{} {
fields := []interface{}{"ingressID", info.IngressId, "resourceID", info.State.ResourceId, "roomName", info.RoomName, "participantIdentity", info.ParticipantIdentity}
for k, v := range loggingFields {
Expand Down
8 changes: 4 additions & 4 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc
}

var rpcServer rpc.IngressHandlerServer
if p.BypassTranscoding {
if !*p.EnableTranscoding {
// RPC is handled in the handler process when transcoding

rpcServer, err = rpc.NewIngressHandlerServer(ihs, s.bus)
Expand All @@ -162,14 +162,14 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc
p.SetStatus(livekit.IngressState_ENDPOINT_ERROR, err)
p.SendStateUpdate(ctx)

if p.BypassTranscoding {
if !*p.EnableTranscoding {
DeregisterIngressRpcHandlers(rpcServer, p.IngressInfo)
}
span.RecordError(err)
return nil
}

if p.BypassTranscoding {
if !*p.EnableTranscoding {
p.SetStatus(livekit.IngressState_ENDPOINT_PUBLISHING, nil)
p.SendStateUpdate(ctx)

Expand All @@ -196,7 +196,7 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc
return stats
}

if p.BypassTranscoding {
if !*p.EnableTranscoding {
ended = func(err error) {
ctx, span := tracer.Start(context.Background(), "Service.HandleWHIPPublishRequest.ended")
defer span.End()
Expand Down
17 changes: 10 additions & 7 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/livekit/ingress/pkg/config"
"github.com/livekit/ingress/pkg/errors"
"github.com/livekit/ingress/pkg/params"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/hwstats"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo, alreadyCommitted b
accept = available > m.cpuCostConfig.RTMPCpuCost
cpuHold = m.cpuCostConfig.RTMPCpuCost
case livekit.IngressInput_WHIP_INPUT:
if info.BypassTranscoding {
if !*info.EnableTranscoding {
accept = available > m.cpuCostConfig.WHIPBypassTranscodingCpuCost
cpuHold = m.cpuCostConfig.WHIPBypassTranscodingCpuCost
} else {
Expand All @@ -274,6 +275,8 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo, alreadyCommitted b
}

func (m *Monitor) CanAcceptIngress(info *livekit.IngressInfo) bool {
params.UpdateTranscodingEnabled(info)

accept, _, _ := m.canAcceptIngress(info, false)

return accept
Expand All @@ -294,23 +297,23 @@ func (m *Monitor) AcceptIngress(info *livekit.IngressInfo) bool {
func (m *Monitor) IngressStarted(info *livekit.IngressInfo) {
switch info.InputType {
case livekit.IngressInput_RTMP_INPUT:
m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Add(1)
m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Add(1)
case livekit.IngressInput_WHIP_INPUT:
m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Add(1)
m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Add(1)
case livekit.IngressInput_URL_INPUT:
m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Add(1)
m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Add(1)

}
}

func (m *Monitor) IngressEnded(info *livekit.IngressInfo) {
switch info.InputType {
case livekit.IngressInput_RTMP_INPUT:
m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Sub(1)
m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Sub(1)
case livekit.IngressInput_WHIP_INPUT:
m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Sub(1)
m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Sub(1)
case livekit.IngressInput_URL_INPUT:
m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Sub(1)
m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Sub(1)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/whip/whip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (h *whipHandler) Init(ctx context.Context, p *params.Params, sdpOffer strin
return "", err
}

if !p.BypassTranscoding && len(h.simulcastLayers) != 0 {
if *p.EnableTranscoding && len(h.simulcastLayers) != 0 {
return "", errors.ErrSimulcastTranscode
}

Expand All @@ -129,7 +129,7 @@ func (h *whipHandler) Init(ctx context.Context, p *params.Params, sdpOffer strin
// for each PeerConnection.
i := &interceptor.Registry{}

if !p.BypassTranscoding {
if *p.EnableTranscoding {
// Use the default set of Interceptors
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
return "", err
Expand Down Expand Up @@ -410,7 +410,7 @@ func (h *whipHandler) addTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPRe

var th WhipTrackHandler
var err error
if h.params.BypassTranscoding {
if !*h.params.EnableTranscoding {
th, err = NewSDKWhipTrackHandler(logger, track, trackQuality, receiver, h.writePLI, h.writeRTCPUpstream)
if err != nil {
logger.Warnw("failed creating SDK whip track handler", err)
Expand Down Expand Up @@ -483,7 +483,7 @@ func (h *whipHandler) runSession(ctx context.Context) error {
var err error
var sdkOutput *lksdk_output.LKSDKOutput

if h.params.BypassTranscoding {
if !*h.params.EnableTranscoding {
sdkOutput, err = lksdk_output.NewLKSDKOutput(ctx, h.params)
if err != nil {
return err
Expand Down

0 comments on commit f608c48

Please sign in to comment.