diff --git a/pkg/service/service.go b/pkg/service/service.go index 7602a765..8a8ca678 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -194,7 +194,6 @@ func (s *Service) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressR func (s *Service) Status() ([]byte, error) { info := map[string]interface{}{ "CpuLoad": s.GetCPULoad(), - "CpuHold": s.GetCPUHold(), } s.mu.RLock() diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 3160b6f4..a6298b34 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -42,8 +42,7 @@ type Monitor struct { pendingCPUs atomic.Float64 mu sync.Mutex - counts map[string]int - reserved float64 + requests atomic.Int32 } const cpuHoldDuration = time.Second * 5 @@ -51,7 +50,6 @@ const cpuHoldDuration = time.Second * 5 func NewMonitor(conf *config.ServiceConfig) *Monitor { return &Monitor{ cpuCostConfig: conf.CPUCostConfig, - counts: make(map[string]int), } } @@ -147,13 +145,6 @@ func (m *Monitor) GetCPULoad() float64 { return (m.cpuStats.NumCPU() - m.cpuStats.GetCPUIdle()) / m.cpuStats.NumCPU() * 100 } -func (m *Monitor) GetCPUHold() float64 { - m.mu.Lock() - defer m.mu.Unlock() - - return m.reserved -} - func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { m.mu.Lock() defer m.mu.Unlock() @@ -170,13 +161,15 @@ func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool { logger.Debugw("cpu check", "total", total, "available", available, - "reserved", m.reserved, + "active_requests", m.requests, ) - if m.reserved == 0 { + if m.requests.Load() == 0 { + // if no requests, use total available = total - } else if total-m.reserved < available { - available = total - m.reserved + } else { + // if already running requests, cap usage at 90% + available -= 0.9 * total } switch req.Request.(type) { @@ -217,7 +210,7 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { cpuHold = m.cpuCostConfig.TrackCpuCost } - m.reserved += cpuHold + m.requests.Inc() m.pendingCPUs.Add(cpuHold) time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) }) return nil @@ -244,37 +237,20 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) { switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: - m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Sub(1) case *rpc.StartEgressRequest_Web: - m.reserved -= m.cpuCostConfig.WebCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1) case *rpc.StartEgressRequest_Participant: - m.reserved -= m.cpuCostConfig.ParticipantCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1) case *rpc.StartEgressRequest_TrackComposite: - m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrackComposite}).Sub(1) case *rpc.StartEgressRequest_Track: - m.reserved -= m.cpuCostConfig.TrackCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrack}).Sub(1) } -} -func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) { - m.mu.Lock() - defer m.mu.Unlock() + m.requests.Dec() +} - switch req.Request.(type) { - case *rpc.StartEgressRequest_RoomComposite: - m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost - case *rpc.StartEgressRequest_Web: - m.reserved -= m.cpuCostConfig.WebCpuCost - case *rpc.StartEgressRequest_Participant: - m.reserved -= m.cpuCostConfig.ParticipantCpuCost - case *rpc.StartEgressRequest_TrackComposite: - m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost - case *rpc.StartEgressRequest_Track: - m.reserved -= m.cpuCostConfig.TrackCpuCost - } +func (m *Monitor) EgressAborted(_ *rpc.StartEgressRequest) { + m.requests.Dec() }