diff --git a/pkg/config/service.go b/pkg/config/service.go index 4bb71ccc..4ea60381 100644 --- a/pkg/config/service.go +++ b/pkg/config/service.go @@ -36,6 +36,7 @@ const ( trackCompositeCpuCost = 1 trackCpuCost = 0.5 maxCpuUtilization = 0.8 + maxConcurrentWeb = 18 defaultTemplatePort = 7980 defaultTemplateBaseTemplate = "http://localhost:%d/" @@ -53,7 +54,8 @@ type ServiceConfig struct { } type CPUCostConfig struct { - MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` // Maximum allowed CPU utilization when deciding to accept a request. Default to 80%. + MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` // maximum allowed CPU utilization when deciding to accept a request. Default to 80%. + MaxConcurrentWeb int32 `yaml:"max_concurrent_web"` // maximum allowed chrome/x/pulse instances RoomCompositeCpuCost float64 `yaml:"room_composite_cpu_cost"` AudioRoomCompositeCpuCost float64 `yaml:"audio_room_composite_cpu_cost"` WebCpuCost float64 `yaml:"web_cpu_cost"` @@ -110,6 +112,9 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) { if conf.MaxCpuUtilization <= 0 || conf.MaxCpuUtilization > 1 { conf.MaxCpuUtilization = maxCpuUtilization } + if conf.MaxConcurrentWeb <= 0 { + conf.MaxConcurrentWeb = maxConcurrentWeb + } if conf.TemplateBase == "" { conf.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, conf.TemplatePort) diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 65eb7c5d..5c90b287 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -52,9 +52,10 @@ type Monitor struct { promCPULoad prometheus.Gauge requestGauge *prometheus.GaugeVec - svc Service - cpuStats *hwstats.CPUStats - requests atomic.Int32 + svc Service + cpuStats *hwstats.CPUStats + requests atomic.Int32 + webRequests atomic.Int32 mu sync.Mutex highCPUDuration int @@ -138,6 +139,15 @@ func (m *Monitor) validateCPUConfig() error { return nil } +func (m *Monitor) CanAcceptWebRequest() bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.canAcceptRequestLocked(&rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_Web{}, + }) +} + func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { m.mu.Lock() defer m.mu.Unlock() @@ -152,12 +162,18 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { var required float64 switch r := req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: + if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { + return false + } if r.RoomComposite.AudioOnly { required = m.cpuCostConfig.AudioRoomCompositeCpuCost } else { required = m.cpuCostConfig.RoomCompositeCpuCost } case *rpc.StartEgressRequest_Web: + if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb { + return false + } if r.Web.AudioOnly { required = m.cpuCostConfig.AudioWebCpuCost } else { @@ -197,16 +213,17 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { } m.requests.Inc() - var cpuHold float64 switch r := req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: + m.webRequests.Inc() if r.RoomComposite.AudioOnly { cpuHold = m.cpuCostConfig.AudioRoomCompositeCpuCost } else { cpuHold = m.cpuCostConfig.RoomCompositeCpuCost } case *rpc.StartEgressRequest_Web: + m.webRequests.Inc() if r.Web.AudioOnly { cpuHold = m.cpuCostConfig.AudioWebCpuCost } else { @@ -260,6 +277,10 @@ func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) { delete(m.pending, req.EgressId) m.requests.Dec() + switch req.Request.(type) { + case *rpc.StartEgressRequest_RoomComposite, *rpc.StartEgressRequest_Web: + m.webRequests.Dec() + } } func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) { @@ -284,8 +305,10 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) { switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Sub(1) + m.webRequests.Dec() case *rpc.StartEgressRequest_Web: m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1) + m.webRequests.Dec() case *rpc.StartEgressRequest_Participant: m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1) case *rpc.StartEgressRequest_TrackComposite: diff --git a/pkg/stats/monitor_prom.go b/pkg/stats/monitor_prom.go index 30698e95..bd407839 100644 --- a/pkg/stats/monitor_prom.go +++ b/pkg/stats/monitor_prom.go @@ -16,9 +16,6 @@ package stats import ( "github.com/prometheus/client_golang/prometheus" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/rpc" ) func (m *Monitor) initPrometheus() { @@ -75,11 +72,7 @@ func (m *Monitor) promIsIdle() float64 { } func (m *Monitor) promCanAcceptRequest() float64 { - if !m.svc.IsDisabled() && m.CanAcceptRequest(&rpc.StartEgressRequest{ - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{}, - }, - }) { + if !m.svc.IsDisabled() && m.CanAcceptWebRequest() { return 1 } return 0