diff --git a/pkg/service/process.go b/pkg/service/process.go index d13da311..49e98b21 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -17,6 +17,7 @@ package service import ( "context" "os/exec" + "syscall" "github.com/frostbyte73/core" dto "github.com/prometheus/client_model/go" @@ -76,7 +77,14 @@ func (p *Process) Gather() ([]*dto.MetricFamily, error) { // Parse the result to match the Gatherer interface return deserializeMetrics(p.info.EgressId, metricsResponse.Metrics) +} +func (p *Process) kill() { + if !p.closed.IsBroken() { + if err := p.cmd.Process.Signal(syscall.SIGINT); err != nil { + logger.Errorw("failed to kill Process", err, "egressID", p.req.EgressId) + } + } } func applyDefaultLabel(egressID string, families map[string]*dto.MetricFamily) { diff --git a/pkg/service/service.go b/pkg/service/service.go index 59781393..dd1ae190 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -23,7 +23,6 @@ import ( "os" "path" "sync" - "syscall" "time" "github.com/frostbyte73/core" @@ -88,6 +87,7 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service if err := s.Start(s.conf, s.promIsIdle, s.promCanAcceptRequest, + s.killProcess, ); err != nil { return nil, err } @@ -179,11 +179,16 @@ func (s *Service) KillAll() { defer s.mu.RUnlock() for _, h := range s.activeHandlers { - if !h.closed.IsBroken() { - if err := h.cmd.Process.Signal(syscall.SIGINT); err != nil { - logger.Errorw("failed to kill Process", err, "egressID", h.req.EgressId) - } - } + h.kill() + } +} + +func (s *Service) killProcess(egressID string) { + s.mu.RLock() + defer s.mu.RUnlock() + + if h, ok := s.activeHandlers[egressID]; ok { + h.kill() } } diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 94a72bcc..9e6ae166 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -41,9 +41,11 @@ type Monitor struct { cpuStats *utils.CPUStats requests atomic.Int32 - mu sync.Mutex - pending map[string]*processStats - procStats map[int]*processStats + mu sync.Mutex + highCPUCount int + killProcess func(string) + pending map[string]*processStats + procStats map[int]*processStats } type processStats struct { @@ -71,7 +73,10 @@ func (m *Monitor) Start( conf *config.ServiceConfig, isIdle func() float64, canAcceptRequest func() float64, + killProcess func(string), ) error { + m.killProcess = killProcess + procStats, err := utils.NewProcCPUStats(m.updateEgressStats) if err != nil { return err @@ -183,11 +188,14 @@ func (m *Monitor) UpdatePID(egressID string, pid int) { } func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { - m.promCPULoad.Set(1 - idle/m.cpuStats.NumCPU()) + load := 1 - idle/m.cpuStats.NumCPU() + m.promCPULoad.Set(load) m.mu.Lock() defer m.mu.Unlock() + maxUsage := 0.0 + var maxEgress string for pid, cpuUsage := range usage { if m.procStats[pid] == nil { m.procStats[pid] = &processStats{} @@ -203,8 +211,30 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { if procStats.egressID != "" { m.promProcCPULoad.With(prometheus.Labels{"egress_id": procStats.egressID}).Set(cpuUsage) + if cpuUsage > maxUsage { + maxUsage = cpuUsage + maxEgress = procStats.egressID + } + } + } + + if load > 0.95 { + logger.Warnw("high cpu usage", nil, + "load", load, + "requests", m.requests.Load(), + ) + + if m.requests.Load() > 1 { + if m.highCPUCount < 3 { + m.highCPUCount++ + return + } else { + m.killProcess(maxEgress) + } } } + + m.highCPUCount = 0 } func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {