From f608c4803dce1becf721792bb72dbe369a1d2a9a Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 26 Apr 2024 14:16:59 +0200 Subject: [PATCH] Support for EnableTranscoding (#256) BypassTranscoding is still supported for backward compatibility --- go.mod | 4 ++-- go.sum | 8 ++++---- pkg/lksdk_output/lksdk_output.go | 2 +- pkg/params/params.go | 19 +++++++++++++++++++ pkg/service/service.go | 8 ++++---- pkg/stats/monitor.go | 17 ++++++++++------- pkg/whip/whip_handler.go | 8 ++++---- 7 files changed, 44 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 95305a4c..670fcce5 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,9 @@ require ( github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e - github.com/livekit/protocol v1.12.1-0.20240422173605-bad3fc9684c3 + github.com/livekit/protocol v1.14.0 github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9 - github.com/livekit/server-sdk-go/v2 v2.1.1 + github.com/livekit/server-sdk-go/v2 v2.1.2 github.com/pion/dtls/v2 v2.2.10 github.com/pion/interceptor v0.1.28 github.com/pion/rtcp v1.2.14 diff --git a/go.sum b/go.sum index fcb62903..7b005468 100644 --- a/go.sum +++ b/go.sum @@ -83,12 +83,12 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e h1:ss4VwrouYiDpuNJ9BUTH+WsW+GDdJS70iZp8ii3/0Lc= github.com/livekit/mediatransportutil v0.0.0-20240416023643-881d3dc5423e/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.12.1-0.20240422173605-bad3fc9684c3 h1:2RaOmN2xMN+bzsEYnAKI4XJaO3TjI6mLIL10+I46V9k= -github.com/livekit/protocol v1.12.1-0.20240422173605-bad3fc9684c3/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/protocol v1.14.0 h1:W0i5HR2Efoy2j9NhmONolU3FIsUDVl6MAT5zWfALev0= +github.com/livekit/protocol v1.14.0/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9 h1:4CngtPIJ58WcQ1sUDGdxJDkTndQpN6M/T8jXvRAd7Oc= github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= -github.com/livekit/server-sdk-go/v2 v2.1.1 h1:TGMF3BHAyx+/A0iVZTGG+4mZMXrAY1WRbOMqbwGnrHI= -github.com/livekit/server-sdk-go/v2 v2.1.1/go.mod h1:4d3kLn4qLMwKnKGpivW29YUwcBQKXFXzwH+x/nua94E= +github.com/livekit/server-sdk-go/v2 v2.1.2 h1:3MhFqptHjzpsNAcisHYDtn77qrJ9szsAy4zJkeI3Mic= +github.com/livekit/server-sdk-go/v2 v2.1.2/go.mod h1:ZjOQBtoFcU+oq1c2xaMLHhcQ3DNcfkwC3FHG8DAvkZ8= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= diff --git a/pkg/lksdk_output/lksdk_output.go b/pkg/lksdk_output/lksdk_output.go index 0f773098..0757dcd7 100644 --- a/pkg/lksdk_output/lksdk_output.go +++ b/pkg/lksdk_output/lksdk_output.go @@ -137,7 +137,7 @@ func NewLKSDKOutput(ctx context.Context, p *params.Params) (*LKSDKOutput, error) lksdk.WithAutoSubscribe(false), } - if p.BypassTranscoding { + if !*p.EnableTranscoding { opts = append(opts, lksdk.WithInterceptors([]interceptor.Factory{})) } else { var br uint32 diff --git a/pkg/params/params.go b/pkg/params/params.go index 8b19affe..fc79be7a 100644 --- a/pkg/params/params.go +++ b/pkg/params/params.go @@ -135,6 +135,8 @@ func GetParams(ctx context.Context, psrpcClient rpc.IOInfoClient, conf *config.C return nil, err } + UpdateTranscodingEnabled(infoCopy) + if token == "" { token, err = ingress.BuildIngressToken(conf.ApiKey, conf.ApiSecret, info.RoomName, info.ParticipantIdentity, info.ParticipantName, info.ParticipantMetadata) if err != nil { @@ -161,6 +163,23 @@ func GetParams(ctx context.Context, psrpcClient rpc.IOInfoClient, conf *config.C return p, nil } +func UpdateTranscodingEnabled(info *livekit.IngressInfo) { + if info.EnableTranscoding != nil { + return + } + + // Backward compatibility. This is an ingress created before the EnableTranscoding field was added. + // Default to enabling transcoding for WHIP + switch info.InputType { + case livekit.IngressInput_WHIP_INPUT: + b := !info.BypassTranscoding + info.EnableTranscoding = &b + default: + t := true + info.EnableTranscoding = &t + } +} + func getLoggerFields(info *livekit.IngressInfo, loggingFields map[string]string) []interface{} { fields := []interface{}{"ingressID", info.IngressId, "resourceID", info.State.ResourceId, "roomName", info.RoomName, "participantIdentity", info.ParticipantIdentity} for k, v := range loggingFields { diff --git a/pkg/service/service.go b/pkg/service/service.go index 1d626368..176b118a 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -139,7 +139,7 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc } var rpcServer rpc.IngressHandlerServer - if p.BypassTranscoding { + if !*p.EnableTranscoding { // RPC is handled in the handler process when transcoding rpcServer, err = rpc.NewIngressHandlerServer(ihs, s.bus) @@ -162,14 +162,14 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc p.SetStatus(livekit.IngressState_ENDPOINT_ERROR, err) p.SendStateUpdate(ctx) - if p.BypassTranscoding { + if !*p.EnableTranscoding { DeregisterIngressRpcHandlers(rpcServer, p.IngressInfo) } span.RecordError(err) return nil } - if p.BypassTranscoding { + if !*p.EnableTranscoding { p.SetStatus(livekit.IngressState_ENDPOINT_PUBLISHING, nil) p.SendStateUpdate(ctx) @@ -196,7 +196,7 @@ func (s *Service) HandleWHIPPublishRequest(streamKey, resourceId string, ihs rpc return stats } - if p.BypassTranscoding { + if !*p.EnableTranscoding { ended = func(err error) { ctx, span := tracer.Start(context.Background(), "Service.HandleWHIPPublishRequest.ended") defer span.End() diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 4ac97f78..8c0492cb 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/ingress/pkg/config" "github.com/livekit/ingress/pkg/errors" + "github.com/livekit/ingress/pkg/params" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/hwstats" @@ -253,7 +254,7 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo, alreadyCommitted b accept = available > m.cpuCostConfig.RTMPCpuCost cpuHold = m.cpuCostConfig.RTMPCpuCost case livekit.IngressInput_WHIP_INPUT: - if info.BypassTranscoding { + if !*info.EnableTranscoding { accept = available > m.cpuCostConfig.WHIPBypassTranscodingCpuCost cpuHold = m.cpuCostConfig.WHIPBypassTranscodingCpuCost } else { @@ -274,6 +275,8 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo, alreadyCommitted b } func (m *Monitor) CanAcceptIngress(info *livekit.IngressInfo) bool { + params.UpdateTranscodingEnabled(info) + accept, _, _ := m.canAcceptIngress(info, false) return accept @@ -294,11 +297,11 @@ func (m *Monitor) AcceptIngress(info *livekit.IngressInfo) bool { func (m *Monitor) IngressStarted(info *livekit.IngressInfo) { switch info.InputType { case livekit.IngressInput_RTMP_INPUT: - m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Add(1) + m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Add(1) case livekit.IngressInput_WHIP_INPUT: - m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Add(1) + m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Add(1) case livekit.IngressInput_URL_INPUT: - m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Add(1) + m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Add(1) } } @@ -306,11 +309,11 @@ func (m *Monitor) IngressStarted(info *livekit.IngressInfo) { func (m *Monitor) IngressEnded(info *livekit.IngressInfo) { switch info.InputType { case livekit.IngressInput_RTMP_INPUT: - m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Sub(1) + m.requestGauge.With(prometheus.Labels{"type": "rtmp", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Sub(1) case livekit.IngressInput_WHIP_INPUT: - m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Sub(1) + m.requestGauge.With(prometheus.Labels{"type": "whip", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Sub(1) case livekit.IngressInput_URL_INPUT: - m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", !info.BypassTranscoding)}).Sub(1) + m.requestGauge.With(prometheus.Labels{"type": "url", "transcoding": fmt.Sprintf("%v", *info.EnableTranscoding)}).Sub(1) } } diff --git a/pkg/whip/whip_handler.go b/pkg/whip/whip_handler.go index 5f6c9baf..50d4cd3f 100644 --- a/pkg/whip/whip_handler.go +++ b/pkg/whip/whip_handler.go @@ -112,7 +112,7 @@ func (h *whipHandler) Init(ctx context.Context, p *params.Params, sdpOffer strin return "", err } - if !p.BypassTranscoding && len(h.simulcastLayers) != 0 { + if *p.EnableTranscoding && len(h.simulcastLayers) != 0 { return "", errors.ErrSimulcastTranscode } @@ -129,7 +129,7 @@ func (h *whipHandler) Init(ctx context.Context, p *params.Params, sdpOffer strin // for each PeerConnection. i := &interceptor.Registry{} - if !p.BypassTranscoding { + if *p.EnableTranscoding { // Use the default set of Interceptors if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil { return "", err @@ -410,7 +410,7 @@ func (h *whipHandler) addTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPRe var th WhipTrackHandler var err error - if h.params.BypassTranscoding { + if !*h.params.EnableTranscoding { th, err = NewSDKWhipTrackHandler(logger, track, trackQuality, receiver, h.writePLI, h.writeRTCPUpstream) if err != nil { logger.Warnw("failed creating SDK whip track handler", err) @@ -483,7 +483,7 @@ func (h *whipHandler) runSession(ctx context.Context) error { var err error var sdkOutput *lksdk_output.LKSDKOutput - if h.params.BypassTranscoding { + if !*h.params.EnableTranscoding { sdkOutput, err = lksdk_output.NewLKSDKOutput(ctx, h.params) if err != nil { return err