Skip to content

Commit

Permalink
Update process stats (#605)
Browse files Browse the repository at this point in the history
* update process stats

* dont store 0s
  • Loading branch information
frostbyte73 authored Feb 6, 2024
1 parent ca0676a commit 5add822
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 49 deletions.
17 changes: 9 additions & 8 deletions pkg/service/service_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,15 @@ func (s *Service) processEnded(p *Process, err error) {
}
}

avgCPU, maxCPU := s.CloseEgressStats(p.info.EgressId)
_, _ = s.ioClient.UpdateMetrics(p.ctx, &rpc.UpdateMetricsRequest{
Info: p.info,
AvgCpuUsage: float32(avgCPU),
MaxCpuUsage: float32(maxCPU),
})

s.EgressEnded(p.req)
avgCPU, maxCPU := s.EgressEnded(p.req)
if maxCPU > 0 {
_, _ = s.ioClient.UpdateMetrics(p.ctx, &rpc.UpdateMetricsRequest{
Info: p.info,
AvgCpuUsage: float32(avgCPU),
MaxCpuUsage: float32(maxCPU),
})
}

p.closed.Break()

s.mu.Lock()
Expand Down
71 changes: 30 additions & 41 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ import (
type Monitor struct {
cpuCostConfig *config.CPUCostConfig

promCPULoad prometheus.Gauge
promProcCPULoad *prometheus.GaugeVec
requestGauge *prometheus.GaugeVec
promCPULoad prometheus.Gauge
requestGauge *prometheus.GaugeVec

cpuStats *utils.CPUStats
requests atomic.Int32
Expand Down Expand Up @@ -113,21 +112,14 @@ func (m *Monitor) Start(
ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "node_type": "EGRESS", "cluster_id": conf.ClusterID},
})

m.promProcCPULoad = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "handler_cpu_load",
ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "cluster_id": conf.ClusterID},
}, []string{"egress_id"})

m.requestGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "livekit",
Subsystem: "egress",
Name: "requests",
ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "cluster_id": conf.ClusterID},
}, []string{"type"})

prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, m.promCPULoad, m.promProcCPULoad, m.requestGauge)
prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, m.promCPULoad, m.requestGauge)

return nil
}
Expand Down Expand Up @@ -204,10 +196,10 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
maxUsage := 0.0
var maxEgress string
for pid, cpuUsage := range usage {
if m.procStats[pid] == nil {
m.procStats[pid] = &processStats{}
}
procStats := m.procStats[pid]
if procStats == nil {
continue
}

procStats.lastUsage = cpuUsage
procStats.totalCPU += cpuUsage
Expand All @@ -216,12 +208,9 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
procStats.maxCPU = cpuUsage
}

if procStats.egressID != "" {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": procStats.egressID}).Set(cpuUsage)
if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage {
maxUsage = cpuUsage
maxEgress = procStats.egressID
}
if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage {
maxUsage = cpuUsage
maxEgress = procStats.egressID
}
}

Expand All @@ -248,21 +237,6 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
m.highCPUDuration = 0
}

func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {
m.mu.Lock()
defer m.mu.Unlock()

for pid, ps := range m.procStats {
if ps.egressID == egressID {
delete(m.procStats, pid)
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(0)
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
}
}

return 0, 0
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -274,16 +248,16 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
accept := false
total := m.cpuStats.NumCPU()

var available, used float64
var available, pending, used float64
if m.requests.Load() == 0 {
// if no requests, use total
available = total
} else {
for _, ps := range m.pending {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
pending += ps.pendingUsage
} else {
used += ps.lastUsage
pending += ps.lastUsage
}
}
for _, ps := range m.procStats {
Expand All @@ -295,7 +269,7 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
}

// if already running requests, cap usage at MaxCpuUtilization
available = total*m.cpuCostConfig.MaxCpuUtilization - used
available = total*m.cpuCostConfig.MaxCpuUtilization - pending - used
}

var required float64
Expand Down Expand Up @@ -323,6 +297,7 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {

logger.Debugw("cpu check",
"total", total,
"pending", pending,
"used", used,
"required", required,
"available", available,
Expand Down Expand Up @@ -391,7 +366,7 @@ func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) {
}
}

func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) {
func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -408,9 +383,23 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) {
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrack}).Sub(1)
}

delete(m.pending, req.EgressId)
m.requests.Dec()

for pid, ps := range m.procStats {
if ps.egressID == req.EgressId {
delete(m.procStats, pid)
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
}
}

return 0, 0
}

func (m *Monitor) EgressAborted(_ *rpc.StartEgressRequest) {
func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.pending, req.EgressId)
m.requests.Dec()
}

0 comments on commit 5add822

Please sign in to comment.