Skip to content

Commit

Permalink
Memory logging (#821)
Browse files Browse the repository at this point in the history
* log memory usage

* remove double drain

* update protocol
  • Loading branch information
frostbyte73 authored Dec 10, 2024
1 parent 59e1ace commit 8821f85
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.8.1-0.20241129023712-3372e6e28532
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a
github.com/livekit/protocol v1.29.5-0.20241210172052-933b7ff01414
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/livekit/server-sdk-go/v2 v2.4.0
github.com/pion/rtp v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo=
github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a h1:dI26u+l1pBBwojkTCuY8VVbNbwoWBaXM9nRY3uDg3p8=
github.com/livekit/protocol v1.28.2-0.20241128110641-c9f20a60249a/go.mod h1:mqXSWNHbENjxM0/HG25wZ7wgja/K9fA0PeQxi+MPmWw=
github.com/livekit/protocol v1.29.5-0.20241210172052-933b7ff01414 h1:NFaEYMhY6Z9l7dknRONRRofjg1I4HqykTKioWP6bAbM=
github.com/livekit/protocol v1.29.5-0.20241210172052-933b7ff01414/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.4.0 h1:ide41hppBf7btHLz/nj6rLIQSkaIOxP5tVSki74ZDhg=
Expand Down
6 changes: 0 additions & 6 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,6 @@ func (c *ioClient) sendUpdate(u *update) {
}

func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req)
if err != nil {
logger.Errorw("failed to update metrics", err, "egressID", req.Info.EgressId)
return err
}

return nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
logger.Errorw("process failed", err)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
avgCPU, maxCPU, maxMemory := s.monitor.EgressEnded(req)
if maxCPU > 0 {
_ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{
Info: info,
AvgCpuUsage: float32(avgCPU),
MaxCpuUsage: float32(maxCPU),
})
logger.Debugw("egress metrics",
"egressID", info.EgressId,
"avgCPU", avgCPU,
"maxCPU", maxCPU,
"maxMemory", maxMemory,
)
}

s.ProcessFinished(info.EgressId)
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (p *Process) Gather() ([]*dto.MetricFamily, error) {
// Get the ms from the handler via IPC
metricsResponse, err := p.ipcHandlerClient.GetMetrics(context.Background(), &ipc.MetricsRequest{})
if err != nil {
logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId)
if !p.closed.IsBroken() {
logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId)
}
return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler
}

Expand Down
60 changes: 36 additions & 24 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ type Monitor struct {
type processStats struct {
egressID string

pendingUsage float64
lastUsage float64
allowedUsage float64
pendingCPU float64
lastCPU float64
allowedCPU float64

totalCPU float64
cpuCounter int
maxCPU float64
maxMemory int
}

func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) {
Expand All @@ -85,7 +86,7 @@ func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) {
procStats: make(map[int]*processStats),
}

procStats, err := hwstats.NewProcCPUStats(m.updateEgressStats)
procStats, err := hwstats.NewProcMonitor(m.updateEgressStats)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,11 +249,11 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
}

ps := &processStats{
egressID: req.EgressId,
pendingUsage: cpuHold,
allowedUsage: cpuHold,
egressID: req.EgressId,
pendingCPU: cpuHold,
allowedCPU: cpuHold,
}
time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 })
time.AfterFunc(cpuHoldDuration, func() { ps.pendingCPU = 0 })
m.pending[req.EgressId] = ps

return nil
Expand All @@ -268,8 +269,8 @@ func (m *Monitor) UpdatePID(egressID string, pid int) {
if ps == nil {
logger.Warnw("missing pending procStats", nil, "egressID", egressID)
ps = &processStats{
egressID: egressID,
allowedUsage: m.cpuCostConfig.WebCpuCost,
egressID: egressID,
allowedCPU: m.cpuCostConfig.WebCpuCost,
}
}

Expand Down Expand Up @@ -308,7 +309,7 @@ func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) {
}
}

func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) {
func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64, int) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -333,11 +334,11 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) {
for pid, ps := range m.procStats {
if ps.egressID == req.EgressId {
delete(m.procStats, pid)
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU, ps.maxMemory
}
}

return 0, 0
return 0, 0, 0
}

func (m *Monitor) GetAvailableCPU() float64 {
Expand All @@ -357,17 +358,17 @@ func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64)
}

for _, ps := range m.pending {
if ps.pendingUsage > ps.lastUsage {
pending += ps.pendingUsage
if ps.pendingCPU > ps.lastCPU {
pending += ps.pendingCPU
} else {
pending += ps.lastUsage
pending += ps.lastCPU
}
}
for _, ps := range m.procStats {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
if ps.pendingCPU > ps.lastCPU {
used += ps.pendingCPU
} else {
used += ps.lastUsage
used += ps.lastCPU
}
}

Expand All @@ -376,34 +377,45 @@ func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64)
return
}

func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
load := 1 - idle/m.cpuStats.NumCPU()
func (m *Monitor) updateEgressStats(stats *hwstats.ProcStats) {
load := 1 - stats.CpuIdle/m.cpuStats.NumCPU()
m.promCPULoad.Set(load)

m.mu.Lock()
defer m.mu.Unlock()

maxUsage := 0.0
var maxEgress string
for pid, cpuUsage := range usage {
for pid, cpuUsage := range stats.Cpu {
procStats := m.procStats[pid]
if procStats == nil {
continue
}

procStats.lastUsage = cpuUsage
procStats.lastCPU = cpuUsage
procStats.totalCPU += cpuUsage
procStats.cpuCounter++
if cpuUsage > procStats.maxCPU {
procStats.maxCPU = cpuUsage
}

if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage {
if cpuUsage > procStats.allowedCPU && cpuUsage > maxUsage {
maxUsage = cpuUsage
maxEgress = procStats.egressID
}
}

for pid, memUsage := range stats.Memory {
procStats := m.procStats[pid]
if procStats == nil {
continue
}

if memUsage > procStats.maxMemory {
procStats.maxMemory = memUsage
}
}

killThreshold := defaultKillThreshold
if killThreshold <= m.cpuCostConfig.MaxCpuUtilization {
killThreshold = (1 + m.cpuCostConfig.MaxCpuUtilization) / 2
Expand Down
3 changes: 1 addition & 2 deletions test/ioserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func (s *ioTestServer) UpdateEgress(_ context.Context, info *livekit.EgressInfo)
return &emptypb.Empty{}, nil
}

func (s *ioTestServer) UpdateMetrics(_ context.Context, req *rpc.UpdateMetricsRequest) (*emptypb.Empty, error) {
logger.Infow("egress metrics", "egressID", req.Info.EgressId, "avgCPU", req.AvgCpuUsage, "maxCPU", req.MaxCpuUsage)
func (s *ioTestServer) UpdateMetrics(_ context.Context, _ *rpc.UpdateMetricsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}
1 change: 0 additions & 1 deletion test/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func (r *Runner) StartServer(t *testing.T, svc Server, bus psrpc.MessageBus, tem
r.room.Disconnect()
}
r.svc.Shutdown(false, true)
r.svc.Drain()
})

// connect to room
Expand Down

0 comments on commit 8821f85

Please sign in to comment.