Skip to content

Commit

Permalink
Update prometheus process usage (#556)
Browse files Browse the repository at this point in the history
* update prometheus process usage

* update protocol
  • Loading branch information
frostbyte73 authored Dec 7, 2023
1 parent 8a54fdb commit 3cd6379
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 20 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/livekit/livekit-server v1.5.1-0.20231026153736-8b16db227070
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876
github.com/livekit/protocol v1.9.4-0.20231207062609-297e4990e7d3
github.com/livekit/psrpc v0.5.2
github.com/livekit/server-sdk-go v1.1.1
github.com/pion/rtp v1.8.3
Expand All @@ -32,7 +32,7 @@ require (
github.com/urfave/cli/v2 v2.25.7
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231127185646-65229373498e
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
google.golang.org/api v0.130.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876 h1:NnbpPgxDHOcSdgW0JzBkc4QzzLVAe4sOaiYqUUH0/K4=
github.com/livekit/protocol v1.9.4-0.20231206174612-7bba17ea7876/go.mod h1:SzrmeWw8sbf99laJJNMwp+5izlvh/ynlMbVOX0JUoes=
github.com/livekit/protocol v1.9.4-0.20231207062609-297e4990e7d3 h1:1Au/odnRyEe70dZd8y61LYnmGn/nN7cMT/Mh4LiL/XY=
github.com/livekit/protocol v1.9.4-0.20231207062609-297e4990e7d3/go.mod h1:EKF+U1kf2Fh7wG2xLET2dNDVxi7Q+6nHN6uYs5NsIVY=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY=
Expand Down Expand Up @@ -333,8 +333,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb h1:c0vyKkb6yr3KR7jEfJaOSv4lG7xPkbN6r52aJz1d8a8=
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
18 changes: 18 additions & 0 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Process struct {
cmd *exec.Cmd
ipcHandlerClient ipc.EgressHandlerClient
ready chan struct{}
totalCPU float64
cpuCounter int
maxCPU float64
closed core.Fuse
}

Expand Down Expand Up @@ -68,6 +71,21 @@ func NewProcess(
return p, nil
}

func (p *Process) updateCPU(cpu float64) {
p.totalCPU += cpu
p.cpuCounter++
if cpu > p.maxCPU {
p.maxCPU = cpu
}
}

func (p *Process) getUsageStats() (float64, float64) {
if p.cpuCounter == 0 {
return 0, 0
}
return p.totalCPU / float64(p.cpuCounter), p.maxCPU
}

// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics
func (p *Process) Gather() ([]*dto.MetricFamily, error) {
// Get the metrics from the handler via IPC
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service
if err := s.Start(s.conf,
s.promIsIdle,
s.promCanAcceptRequest,
s.promGetEgressIDs,
s.promProcUpdate,
); err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/service/service_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func (s *Service) promCanAcceptRequest() float64 {
return 0
}

func (s *Service) promGetEgressIDs(pUsage map[int]float64) map[string]float64 {
func (s *Service) promProcUpdate(pUsage map[int]float64) map[string]float64 {
s.mu.RLock()
pids := make(map[int]string)
for _, h := range s.activeHandlers {
pids[h.cmd.Process.Pid] = h.req.EgressId
}
s.mu.RUnlock()
defer s.mu.RUnlock()

eUsage := make(map[string]float64)
for pid, egressID := range pids {
eUsage[egressID] = pUsage[pid]
for _, h := range s.activeHandlers {
if usage, ok := pUsage[h.cmd.Process.Pid]; ok {
eUsage[h.req.EgressId] = usage
h.updateCPU(usage)
}
}

return eUsage
}
3 changes: 3 additions & 0 deletions pkg/service/service_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ func (s *Service) processEnded(p *Process, err error) {
s.Stop(false)
}

avgCPU, maxCPU := p.getUsageStats()
logger.Infow("egress stats", "egressID", p.req.EgressId, "avgCPU", avgCPU, "maxCPU", maxCPU)

s.EgressEnded(p.req)
p.closed.Break()

Expand Down
17 changes: 12 additions & 5 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type Monitor struct {

pendingCPUs atomic.Float64

mu sync.Mutex
requests atomic.Int32
mu sync.Mutex
requests atomic.Int32
prevEgressUsage map[string]float64
}

const cpuHoldDuration = time.Second * 5
Expand All @@ -60,7 +61,7 @@ func (m *Monitor) Start(
conf *config.ServiceConfig,
isIdle func() float64,
canAcceptRequest func() float64,
getEgressIDs func(map[int]float64) map[string]float64,
procUpdate func(map[int]float64) map[string]float64,
) error {
cpuStats, err := utils.NewCPUStats(func(idle float64) {
m.promCPULoad.Set(1 - idle/m.cpuStats.NumCPU())
Expand All @@ -71,10 +72,16 @@ func (m *Monitor) Start(

procStats, err := utils.NewProcCPUStats(func(idle float64, usage map[int]float64) {
m.promCPULoadV2.Set(1 - idle/m.cpuStats.NumCPU())
egressIDs := getEgressIDs(usage)
for egressID, cpuUsage := range egressIDs {
egressUsage := procUpdate(usage)
for egressID, cpuUsage := range egressUsage {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(cpuUsage)
}
for egressID := range m.prevEgressUsage {
if _, ok := egressUsage[egressID]; !ok {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(0)
}
}
m.prevEgressUsage = egressUsage
})
if err != nil {
return err
Expand Down

0 comments on commit 3cd6379

Please sign in to comment.