From 9835fc78275969473c79d28211ce36ef96349a47 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Wed, 20 Dec 2023 16:42:20 -0600 Subject: [PATCH] per-process cpu reservations --- pkg/service/process.go | 18 ----- pkg/service/service.go | 1 - pkg/service/service_prom.go | 19 ------ pkg/service/service_rpc.go | 3 +- pkg/stats/monitor.go | 127 ++++++++++++++++++++++++++++-------- 5 files changed, 101 insertions(+), 67 deletions(-) diff --git a/pkg/service/process.go b/pkg/service/process.go index 5a12845c..b399266a 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -38,9 +38,6 @@ type Process struct { cmd *exec.Cmd ipcHandlerClient ipc.EgressHandlerClient ready chan struct{} - totalCPU float64 - cpuCounter int - maxCPU float64 closed core.Fuse } @@ -71,21 +68,6 @@ func NewProcess( return p, nil } -func (p *Process) updateCPU(cpu float64) { - p.totalCPU += cpu - p.cpuCounter++ - if cpu > p.maxCPU { - p.maxCPU = cpu - } -} - -func (p *Process) getUsageStats() (float64, float64) { - if p.cpuCounter == 0 { - return 0, 0 - } - return p.totalCPU / float64(p.cpuCounter), p.maxCPU -} - // Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics func (p *Process) Gather() ([]*dto.MetricFamily, error) { // Get the metrics from the handler via IPC diff --git a/pkg/service/service.go b/pkg/service/service.go index aceb8080..b5417b23 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -86,7 +86,6 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service if err := s.Start(s.conf, s.promIsIdle, s.promCanAcceptRequest, - s.promProcUpdate, ); err != nil { return nil, err } diff --git a/pkg/service/service_prom.go b/pkg/service/service_prom.go index 289daaf5..46f35bfe 100644 --- a/pkg/service/service_prom.go +++ b/pkg/service/service_prom.go @@ -72,22 +72,3 @@ func (s *Service) promCanAcceptRequest() float64 { } return 0 } - -func (s *Service) promProcUpdate(pUsage map[int]float64) map[string]float64 { - s.mu.RLock() - defer s.mu.RUnlock() - - eUsage := make(map[string]float64) - for _, h := range s.activeHandlers { - if cmd := h.cmd; cmd != nil { - if process := cmd.Process; process != nil { - if usage, ok := pUsage[process.Pid]; ok { - eUsage[h.req.EgressId] = usage - h.updateCPU(usage) - } - } - } - } - - return eUsage -} diff --git a/pkg/service/service_rpc.go b/pkg/service/service_rpc.go index 73688856..e4505a20 100644 --- a/pkg/service/service_rpc.go +++ b/pkg/service/service_rpc.go @@ -154,6 +154,7 @@ func (s *Service) AddHandler(egressID string, p *Process) error { select { case <-p.ready: + s.UpdatePID(egressID, p.cmd.Process.Pid) go func() { err := p.cmd.Wait() s.processEnded(p, err) @@ -178,7 +179,7 @@ func (s *Service) processEnded(p *Process, err error) { s.Stop(false) } - avgCPU, maxCPU := p.getUsageStats() + avgCPU, maxCPU := s.GetUsageStats(p.info.EgressId) logger.Infow("egress stats", "egressID", p.req.EgressId, "avgCPU", avgCPU, "maxCPU", maxCPU) s.EgressEnded(p.req) diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index e326de86..9d4fb32c 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -39,15 +39,25 @@ type Monitor struct { requestGauge *prometheus.GaugeVec cpuStats *utils.CPUStats + requests atomic.Int32 - pendingCPUs atomic.Float64 + mu sync.Mutex + pending map[string]*processStats + procStats map[int]*processStats +} + +type processStats struct { + egressID string + + pendingUsage float64 + lastUsage float64 - mu sync.Mutex - requests atomic.Int32 - prevEgressUsage map[string]float64 + totalCPU float64 + cpuCounter int + maxCPU float64 } -const cpuHoldDuration = time.Second * 5 +const cpuHoldDuration = time.Second * 30 func NewMonitor(conf *config.ServiceConfig) *Monitor { return &Monitor{ @@ -59,20 +69,30 @@ func (m *Monitor) Start( conf *config.ServiceConfig, isIdle func() float64, canAcceptRequest func() float64, - procUpdate func(map[int]float64) map[string]float64, ) error { procStats, err := utils.NewProcCPUStats(func(idle float64, usage map[int]float64) { m.promCPULoad.Set(1 - idle/m.cpuStats.NumCPU()) - egressUsage := procUpdate(usage) - for egressID, cpuUsage := range egressUsage { - m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(cpuUsage) - } - for egressID := range m.prevEgressUsage { - if _, ok := egressUsage[egressID]; !ok { - m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(0) + + m.mu.Unlock() + defer m.mu.Unlock() + + for pid, cpuUsage := range usage { + if m.procStats[pid] == nil { + m.procStats[pid] = &processStats{} + } + procStats := m.procStats[pid] + + procStats.lastUsage = cpuUsage + procStats.totalCPU += cpuUsage + procStats.cpuCounter++ + if cpuUsage > procStats.maxCPU { + procStats.maxCPU = cpuUsage + } + + if procStats.egressID != "" { + m.promProcCPULoad.With(prometheus.Labels{"egress_id": procStats.egressID}).Set(cpuUsage) } } - m.prevEgressUsage = egressUsage }) if err != nil { return err @@ -168,33 +188,78 @@ func (m *Monitor) GetRequestCount() int { return int(m.requests.Load()) } +func (m *Monitor) UpdatePID(egressID string, pid int) { + m.mu.Lock() + defer m.mu.Unlock() + + ps := m.pending[egressID] + delete(m.pending, egressID) + + if existing := m.procStats[pid]; existing != nil { + ps.maxCPU = existing.maxCPU + ps.totalCPU = existing.totalCPU + ps.cpuCounter = existing.cpuCounter + } + m.procStats[pid] = ps +} + +func (m *Monitor) GetUsageStats(egressID string) (float64, float64) { + m.mu.Lock() + defer m.mu.Unlock() + + for pid, ps := range m.procStats { + if ps.egressID == egressID { + delete(m.procStats, pid) + return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU + } + } + + return 0, 0 +} + func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { m.mu.Lock() defer m.mu.Unlock() - return m.canAcceptRequest(req) + return m.canAcceptRequestLocked(req) } -func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool { +func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { accept := false total := m.cpuStats.NumCPU() - available := m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load() - - logger.Debugw("cpu check", - "total", total, - "available", available, - "active_requests", m.requests, - ) + var available float64 if m.requests.Load() == 0 { // if no requests, use total available = total } else { + var used float64 + for _, ps := range m.pending { + if ps.pendingUsage > ps.lastUsage { + used += ps.pendingUsage + } else { + used += ps.lastUsage + } + } + for _, ps := range m.procStats { + if ps.pendingUsage > ps.lastUsage { + used += ps.pendingUsage + } else { + used += ps.lastUsage + } + } + // if already running requests, cap usage at MaxCpuUtilization - available -= (1 - m.cpuCostConfig.MaxCpuUtilization) * total + available = total - used - (total * (1 - m.cpuCostConfig.MaxCpuUtilization)) } + logger.Debugw("cpu check", + "total", total, + "available", available, + "active_requests", m.requests, + ) + switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: accept = available >= m.cpuCostConfig.RoomCompositeCpuCost @@ -215,10 +280,12 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { m.mu.Lock() defer m.mu.Unlock() - if !m.canAcceptRequest(req) { + if !m.canAcceptRequestLocked(req) { return errors.ErrResourceExhausted } + m.requests.Inc() + var cpuHold float64 switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: @@ -233,9 +300,13 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { cpuHold = m.cpuCostConfig.TrackCpuCost } - m.requests.Inc() - m.pendingCPUs.Add(cpuHold) - time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) }) + ps := &processStats{ + egressID: req.EgressId, + pendingUsage: cpuHold, + } + time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 }) + m.pending[req.EgressId] = ps + return nil }