diff --git a/pkg/service/process.go b/pkg/service/process.go index cace017b..10494c63 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -165,13 +165,6 @@ func (s *Service) awaitCleanup(p *Process) { s.mu.Unlock() } -func (s *Service) IsIdle() bool { - s.mu.RLock() - defer s.mu.RUnlock() - - return len(s.activeHandlers) == 0 -} - func (s *Service) getGRPCClient(egressID string) (ipc.EgressHandlerClient, error) { s.mu.RLock() defer s.mu.RUnlock() diff --git a/pkg/service/service.go b/pkg/service/service.go index 7602a765..653b3fbd 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -163,7 +163,7 @@ func (s *Service) StartEgressAffinity(ctx context.Context, req *rpc.StartEgressR return -1 } - if s.IsIdle() { + if s.GetRequestCount() == 0 { // group multiple track and track composite requests. // if this instance is idle and another is already handling some, the request will go to that server. // this avoids having many instances with one track request each, taking availability from room composite. @@ -194,7 +194,6 @@ func (s *Service) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressR func (s *Service) Status() ([]byte, error) { info := map[string]interface{}{ "CpuLoad": s.GetCPULoad(), - "CpuHold": s.GetCPUHold(), } s.mu.RLock() @@ -207,7 +206,7 @@ func (s *Service) Status() ([]byte, error) { } func (s *Service) promIsIdle() float64 { - if s.IsIdle() { + if s.GetRequestCount() == 0 { return 1 } return 0 @@ -234,7 +233,7 @@ func (s *Service) Stop(kill bool) { } func (s *Service) Close() { - for !s.IsIdle() { + for s.GetRequestCount() > 0 { time.Sleep(shutdownTimer) } logger.Infow("closing server") diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 3160b6f4..ab73aabb 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -42,8 +42,7 @@ type Monitor struct { pendingCPUs atomic.Float64 mu sync.Mutex - counts map[string]int - reserved float64 + requests atomic.Int32 } const cpuHoldDuration = time.Second * 5 @@ -51,7 +50,6 @@ const cpuHoldDuration = time.Second * 5 func NewMonitor(conf *config.ServiceConfig) *Monitor { return &Monitor{ cpuCostConfig: conf.CPUCostConfig, - counts: make(map[string]int), } } @@ -147,11 +145,8 @@ func (m *Monitor) GetCPULoad() float64 { return (m.cpuStats.NumCPU() - m.cpuStats.GetCPUIdle()) / m.cpuStats.NumCPU() * 100 } -func (m *Monitor) GetCPUHold() float64 { - m.mu.Lock() - defer m.mu.Unlock() - - return m.reserved +func (m *Monitor) GetRequestCount() int { + return int(m.requests.Load()) } func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { @@ -170,13 +165,15 @@ func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool { logger.Debugw("cpu check", "total", total, "available", available, - "reserved", m.reserved, + "active_requests", m.requests, ) - if m.reserved == 0 { + if m.requests.Load() == 0 { + // if no requests, use total available = total - } else if total-m.reserved < available { - available = total - m.reserved + } else { + // if already running requests, cap usage at 90% + available -= 0.9 * total } switch req.Request.(type) { @@ -217,7 +214,7 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { cpuHold = m.cpuCostConfig.TrackCpuCost } - m.reserved += cpuHold + m.requests.Inc() m.pendingCPUs.Add(cpuHold) time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) }) return nil @@ -244,37 +241,20 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) { switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: - m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Sub(1) case *rpc.StartEgressRequest_Web: - m.reserved -= m.cpuCostConfig.WebCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1) case *rpc.StartEgressRequest_Participant: - m.reserved -= m.cpuCostConfig.ParticipantCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1) case *rpc.StartEgressRequest_TrackComposite: - m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrackComposite}).Sub(1) case *rpc.StartEgressRequest_Track: - m.reserved -= m.cpuCostConfig.TrackCpuCost m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrack}).Sub(1) } -} -func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) { - m.mu.Lock() - defer m.mu.Unlock() + m.requests.Dec() +} - switch req.Request.(type) { - case *rpc.StartEgressRequest_RoomComposite: - m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost - case *rpc.StartEgressRequest_Web: - m.reserved -= m.cpuCostConfig.WebCpuCost - case *rpc.StartEgressRequest_Participant: - m.reserved -= m.cpuCostConfig.ParticipantCpuCost - case *rpc.StartEgressRequest_TrackComposite: - m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost - case *rpc.StartEgressRequest_Track: - m.reserved -= m.cpuCostConfig.TrackCpuCost - } +func (m *Monitor) EgressAborted(_ *rpc.StartEgressRequest) { + m.requests.Dec() } diff --git a/test/integration.go b/test/integration.go index 7b8856f9..71671204 100644 --- a/test/integration.go +++ b/test/integration.go @@ -102,7 +102,7 @@ type testCase struct { func (r *Runner) awaitIdle(t *testing.T) { r.svc.KillAll() for i := 0; i < 30; i++ { - if r.svc.GetCPUHold() == 0 { + if r.svc.GetRequestCount() == 0 { return } time.Sleep(time.Second)