Skip to content

Commit

Permalink
max concurrent web requests (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Jul 2, 2024
1 parent 885002e commit ac3bea0
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
7 changes: 6 additions & 1 deletion pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
trackCompositeCpuCost = 1
trackCpuCost = 0.5
maxCpuUtilization = 0.8
maxConcurrentWeb = 18

defaultTemplatePort = 7980
defaultTemplateBaseTemplate = "http://localhost:%d/"
Expand All @@ -53,7 +54,8 @@ type ServiceConfig struct {
}

type CPUCostConfig struct {
MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` // Maximum allowed CPU utilization when deciding to accept a request. Default to 80%.
MaxCpuUtilization float64 `yaml:"max_cpu_utilization"` // maximum allowed CPU utilization when deciding to accept a request. Default to 80%.
MaxConcurrentWeb int32 `yaml:"max_concurrent_web"` // maximum allowed chrome/x/pulse instances
RoomCompositeCpuCost float64 `yaml:"room_composite_cpu_cost"`
AudioRoomCompositeCpuCost float64 `yaml:"audio_room_composite_cpu_cost"`
WebCpuCost float64 `yaml:"web_cpu_cost"`
Expand Down Expand Up @@ -110,6 +112,9 @@ func NewServiceConfig(confString string) (*ServiceConfig, error) {
if conf.MaxCpuUtilization <= 0 || conf.MaxCpuUtilization > 1 {
conf.MaxCpuUtilization = maxCpuUtilization
}
if conf.MaxConcurrentWeb <= 0 {
conf.MaxConcurrentWeb = maxConcurrentWeb
}

if conf.TemplateBase == "" {
conf.TemplateBase = fmt.Sprintf(defaultTemplateBaseTemplate, conf.TemplatePort)
Expand Down
31 changes: 27 additions & 4 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type Monitor struct {
promCPULoad prometheus.Gauge
requestGauge *prometheus.GaugeVec

svc Service
cpuStats *hwstats.CPUStats
requests atomic.Int32
svc Service
cpuStats *hwstats.CPUStats
requests atomic.Int32
webRequests atomic.Int32

mu sync.Mutex
highCPUDuration int
Expand Down Expand Up @@ -138,6 +139,15 @@ func (m *Monitor) validateCPUConfig() error {
return nil
}

func (m *Monitor) CanAcceptWebRequest() bool {
m.mu.Lock()
defer m.mu.Unlock()

return m.canAcceptRequestLocked(&rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Web{},
})
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -152,12 +162,18 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
var required float64
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return false
}
if r.RoomComposite.AudioOnly {
required = m.cpuCostConfig.AudioRoomCompositeCpuCost
} else {
required = m.cpuCostConfig.RoomCompositeCpuCost
}
case *rpc.StartEgressRequest_Web:
if m.webRequests.Load() >= m.cpuCostConfig.MaxConcurrentWeb {
return false
}
if r.Web.AudioOnly {
required = m.cpuCostConfig.AudioWebCpuCost
} else {
Expand Down Expand Up @@ -197,16 +213,17 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
}

m.requests.Inc()

var cpuHold float64
switch r := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
m.webRequests.Inc()
if r.RoomComposite.AudioOnly {
cpuHold = m.cpuCostConfig.AudioRoomCompositeCpuCost
} else {
cpuHold = m.cpuCostConfig.RoomCompositeCpuCost
}
case *rpc.StartEgressRequest_Web:
m.webRequests.Inc()
if r.Web.AudioOnly {
cpuHold = m.cpuCostConfig.AudioWebCpuCost
} else {
Expand Down Expand Up @@ -260,6 +277,10 @@ func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) {

delete(m.pending, req.EgressId)
m.requests.Dec()
switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite, *rpc.StartEgressRequest_Web:
m.webRequests.Dec()
}
}

func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) {
Expand All @@ -284,8 +305,10 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) {
switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Sub(1)
m.webRequests.Dec()
case *rpc.StartEgressRequest_Web:
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1)
m.webRequests.Dec()
case *rpc.StartEgressRequest_Participant:
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1)
case *rpc.StartEgressRequest_TrackComposite:
Expand Down
9 changes: 1 addition & 8 deletions pkg/stats/monitor_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ package stats

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)

func (m *Monitor) initPrometheus() {
Expand Down Expand Up @@ -75,11 +72,7 @@ func (m *Monitor) promIsIdle() float64 {
}

func (m *Monitor) promCanAcceptRequest() float64 {
if !m.svc.IsDisabled() && m.CanAcceptRequest(&rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{},
},
}) {
if !m.svc.IsDisabled() && m.CanAcceptWebRequest() {
return 1
}
return 0
Expand Down

0 comments on commit ac3bea0

Please sign in to comment.