diff --git a/go.mod b/go.mod index bc9fa31b..ff05205f 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/livekit/livekit-server v1.8.1-0.20241129023712-3372e6e28532 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a + github.com/livekit/protocol v1.29.5-0.20241210172052-933b7ff01414 github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 github.com/livekit/server-sdk-go/v2 v2.4.0 github.com/pion/rtp v1.8.9 diff --git a/go.sum b/go.sum index dc39bf48..b5a9d5bf 100644 --- a/go.sum +++ b/go.sum @@ -214,8 +214,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ= github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo= -github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a h1:dI26u+l1pBBwojkTCuY8VVbNbwoWBaXM9nRY3uDg3p8= -github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a/go.mod h1:mqXSWNHbENjxM0/HG25wZ7wgja/K9fA0PeQxi+MPmWw= +github.com/livekit/protocol v1.29.5-0.20241210172052-933b7ff01414 h1:NFaEYMhY6Z9l7dknRONRRofjg1I4HqykTKioWP6bAbM= +github.com/livekit/protocol v1.29.5-0.20241210172052-933b7ff01414/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.4.0 h1:ide41hppBf7btHLz/nj6rLIQSkaIOxP5tVSki74ZDhg= diff --git a/pkg/info/io.go b/pkg/info/io.go index 5cf6d4e9..f945ea38 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -188,12 +188,6 @@ func (c *ioClient) sendUpdate(u *update) { } func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error { - _, err := c.IOInfoClient.UpdateMetrics(ctx, req) - if err != nil { - logger.Errorw("failed to update metrics", err, "egressID", req.Info.EgressId) - return err - } - return nil } diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 60f6bfc8..40b2054a 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -157,13 +157,14 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI logger.Errorw("process failed", err) } - avgCPU, maxCPU := s.monitor.EgressEnded(req) + avgCPU, maxCPU, maxMemory := s.monitor.EgressEnded(req) if maxCPU > 0 { - _ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{ - Info: info, - AvgCpuUsage: float32(avgCPU), - MaxCpuUsage: float32(maxCPU), - }) + logger.Debugw("egress metrics", + "egressID", info.EgressId, + "avgCPU", avgCPU, + "maxCPU", maxCPU, + "maxMemory", maxMemory, + ) } s.ProcessFinished(info.EgressId) diff --git a/pkg/service/process.go b/pkg/service/process.go index 5f0e4476..9563f99f 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -239,7 +239,9 @@ func (p *Process) Gather() ([]*dto.MetricFamily, error) { // Get the ms from the handler via IPC metricsResponse, err := p.ipcHandlerClient.GetMetrics(context.Background(), &ipc.MetricsRequest{}) if err != nil { - logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId) + if !p.closed.IsBroken() { + logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId) + } return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler } diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 3ae33819..5ad81ac3 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -66,13 +66,14 @@ type Monitor struct { type processStats struct { egressID string - pendingUsage float64 - lastUsage float64 - allowedUsage float64 + pendingCPU float64 + lastCPU float64 + allowedCPU float64 totalCPU float64 cpuCounter int maxCPU float64 + maxMemory int } func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) { @@ -85,7 +86,7 @@ func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) { procStats: make(map[int]*processStats), } - procStats, err := hwstats.NewProcCPUStats(m.updateEgressStats) + procStats, err := hwstats.NewProcMonitor(m.updateEgressStats) if err != nil { return nil, err } @@ -248,11 +249,11 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { } ps := &processStats{ - egressID: req.EgressId, - pendingUsage: cpuHold, - allowedUsage: cpuHold, + egressID: req.EgressId, + pendingCPU: cpuHold, + allowedCPU: cpuHold, } - time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 }) + time.AfterFunc(cpuHoldDuration, func() { ps.pendingCPU = 0 }) m.pending[req.EgressId] = ps return nil @@ -268,8 +269,8 @@ func (m *Monitor) UpdatePID(egressID string, pid int) { if ps == nil { logger.Warnw("missing pending procStats", nil, "egressID", egressID) ps = &processStats{ - egressID: egressID, - allowedUsage: m.cpuCostConfig.WebCpuCost, + egressID: egressID, + allowedCPU: m.cpuCostConfig.WebCpuCost, } } @@ -308,7 +309,7 @@ func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) { } } -func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) { +func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64, int) { m.mu.Lock() defer m.mu.Unlock() @@ -333,11 +334,11 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) { for pid, ps := range m.procStats { if ps.egressID == req.EgressId { delete(m.procStats, pid) - return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU + return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU, ps.maxMemory } } - return 0, 0 + return 0, 0, 0 } func (m *Monitor) GetAvailableCPU() float64 { @@ -357,17 +358,17 @@ func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64) } for _, ps := range m.pending { - if ps.pendingUsage > ps.lastUsage { - pending += ps.pendingUsage + if ps.pendingCPU > ps.lastCPU { + pending += ps.pendingCPU } else { - pending += ps.lastUsage + pending += ps.lastCPU } } for _, ps := range m.procStats { - if ps.pendingUsage > ps.lastUsage { - used += ps.pendingUsage + if ps.pendingCPU > ps.lastCPU { + used += ps.pendingCPU } else { - used += ps.lastUsage + used += ps.lastCPU } } @@ -376,8 +377,8 @@ func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64) return } -func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { - load := 1 - idle/m.cpuStats.NumCPU() +func (m *Monitor) updateEgressStats(stats *hwstats.ProcStats) { + load := 1 - stats.CpuIdle/m.cpuStats.NumCPU() m.promCPULoad.Set(load) m.mu.Lock() @@ -385,25 +386,36 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { maxUsage := 0.0 var maxEgress string - for pid, cpuUsage := range usage { + for pid, cpuUsage := range stats.Cpu { procStats := m.procStats[pid] if procStats == nil { continue } - procStats.lastUsage = cpuUsage + procStats.lastCPU = cpuUsage procStats.totalCPU += cpuUsage procStats.cpuCounter++ if cpuUsage > procStats.maxCPU { procStats.maxCPU = cpuUsage } - if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage { + if cpuUsage > procStats.allowedCPU && cpuUsage > maxUsage { maxUsage = cpuUsage maxEgress = procStats.egressID } } + for pid, memUsage := range stats.Memory { + procStats := m.procStats[pid] + if procStats == nil { + continue + } + + if memUsage > procStats.maxMemory { + procStats.maxMemory = memUsage + } + } + killThreshold := defaultKillThreshold if killThreshold <= m.cpuCostConfig.MaxCpuUtilization { killThreshold = (1 + m.cpuCostConfig.MaxCpuUtilization) / 2 diff --git a/test/ioserver.go b/test/ioserver.go index 5c568943..1a78bdd5 100644 --- a/test/ioserver.go +++ b/test/ioserver.go @@ -56,7 +56,6 @@ func (s *ioTestServer) UpdateEgress(_ context.Context, info *livekit.EgressInfo) return &emptypb.Empty{}, nil } -func (s *ioTestServer) UpdateMetrics(_ context.Context, req *rpc.UpdateMetricsRequest) (*emptypb.Empty, error) { - logger.Infow("egress metrics", "egressID", req.Info.EgressId, "avgCPU", req.AvgCpuUsage, "maxCPU", req.MaxCpuUsage) +func (s *ioTestServer) UpdateMetrics(_ context.Context, _ *rpc.UpdateMetricsRequest) (*emptypb.Empty, error) { return &emptypb.Empty{}, nil } diff --git a/test/runner.go b/test/runner.go index 922d5ddd..789ca09c 100644 --- a/test/runner.go +++ b/test/runner.go @@ -192,7 +192,6 @@ func (r *Runner) StartServer(t *testing.T, svc Server, bus psrpc.MessageBus, tem r.room.Disconnect() } r.svc.Shutdown(false, true) - r.svc.Drain() }) // connect to room