diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 08958a7d..1332874c 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -16,6 +16,7 @@ package stats import ( "fmt" + "math" "sort" "sync" "time" @@ -47,7 +48,7 @@ type Monitor struct { prevEgressUsage map[string]float64 } -const cpuHoldDuration = time.Second * 10 +const cpuHoldDuration = time.Second * 30 func NewMonitor(conf *config.ServiceConfig) *Monitor { return &Monitor{ @@ -176,38 +177,41 @@ func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { } func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool { - accept := false - total := m.cpuStats.NumCPU() - - var available float64 - if m.requests.Load() == 0 { - // if no requests, use total - available = total - m.pendingCPUs.Load() - } else { - // if already running requests, cap usage at MaxCpuUtilization - available = m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load() - (1-m.cpuCostConfig.MaxCpuUtilization)*total - } + available := math.Min(m.getAvailableCPU(), m.cpuStats.NumCPU()-m.pendingCPUs.Load()) logger.Debugw("cpu check", - "total", total, + "total", m.cpuStats.NumCPU(), "available", available, "requests", m.requests, ) switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: - accept = available >= m.cpuCostConfig.RoomCompositeCpuCost + return available >= m.cpuCostConfig.RoomCompositeCpuCost case *rpc.StartEgressRequest_Web: - accept = available >= m.cpuCostConfig.WebCpuCost + return available >= m.cpuCostConfig.WebCpuCost case *rpc.StartEgressRequest_Participant: - accept = available >= m.cpuCostConfig.ParticipantCpuCost + return available >= m.cpuCostConfig.ParticipantCpuCost case *rpc.StartEgressRequest_TrackComposite: - accept = available >= m.cpuCostConfig.TrackCompositeCpuCost + return available >= m.cpuCostConfig.TrackCompositeCpuCost case *rpc.StartEgressRequest_Track: - accept = available >= m.cpuCostConfig.TrackCpuCost + return available >= m.cpuCostConfig.TrackCpuCost + default: + logger.Warnw("unrecognized request type", nil) + return false } +} + +func (m *Monitor) getAvailableCPU() float64 { + total := m.cpuStats.NumCPU() - return accept + // if no requests, use total + if m.requests.Load() == 0 { + return total + } + + // if already running requests, cap usage at MaxCpuUtilization + return m.cpuStats.GetCPUIdle() - (1-m.cpuCostConfig.MaxCpuUtilization)*total } func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { @@ -218,23 +222,30 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { return errors.ErrResourceExhausted } - var cpuHold float64 + pending := m.getAvailableCPU() switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: - cpuHold = m.cpuCostConfig.RoomCompositeCpuCost + pending = m.cpuCostConfig.RoomCompositeCpuCost case *rpc.StartEgressRequest_Web: - cpuHold = m.cpuCostConfig.WebCpuCost + pending = m.cpuCostConfig.WebCpuCost case *rpc.StartEgressRequest_Participant: - cpuHold = m.cpuCostConfig.ParticipantCpuCost + pending = m.cpuCostConfig.ParticipantCpuCost case *rpc.StartEgressRequest_TrackComposite: - cpuHold = m.cpuCostConfig.TrackCompositeCpuCost + pending = m.cpuCostConfig.TrackCompositeCpuCost case *rpc.StartEgressRequest_Track: - cpuHold = m.cpuCostConfig.TrackCpuCost + pending = m.cpuCostConfig.TrackCpuCost } m.requests.Inc() - m.pendingCPUs.Add(cpuHold) - time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) }) + m.pendingCPUs.Store(pending) + time.AfterFunc(cpuHoldDuration, func() { + m.mu.Lock() + if m.pendingCPUs.Load() == pending { + m.pendingCPUs.Store(0) + } + m.mu.Unlock() + }) + return nil }