Skip to content

Commit

Permalink
per-process cpu reservations
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Dec 20, 2023
1 parent 50db799 commit 9835fc7
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 67 deletions.
18 changes: 0 additions & 18 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Process struct {
cmd *exec.Cmd
ipcHandlerClient ipc.EgressHandlerClient
ready chan struct{}
totalCPU float64
cpuCounter int
maxCPU float64
closed core.Fuse
}

Expand Down Expand Up @@ -71,21 +68,6 @@ func NewProcess(
return p, nil
}

func (p *Process) updateCPU(cpu float64) {
p.totalCPU += cpu
p.cpuCounter++
if cpu > p.maxCPU {
p.maxCPU = cpu
}
}

func (p *Process) getUsageStats() (float64, float64) {
if p.cpuCounter == 0 {
return 0, 0
}
return p.totalCPU / float64(p.cpuCounter), p.maxCPU
}

// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics
func (p *Process) Gather() ([]*dto.MetricFamily, error) {
// Get the metrics from the handler via IPC
Expand Down
1 change: 0 additions & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service
if err := s.Start(s.conf,
s.promIsIdle,
s.promCanAcceptRequest,
s.promProcUpdate,
); err != nil {
return nil, err
}
Expand Down
19 changes: 0 additions & 19 deletions pkg/service/service_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,3 @@ func (s *Service) promCanAcceptRequest() float64 {
}
return 0
}

func (s *Service) promProcUpdate(pUsage map[int]float64) map[string]float64 {
s.mu.RLock()
defer s.mu.RUnlock()

eUsage := make(map[string]float64)
for _, h := range s.activeHandlers {
if cmd := h.cmd; cmd != nil {
if process := cmd.Process; process != nil {
if usage, ok := pUsage[process.Pid]; ok {
eUsage[h.req.EgressId] = usage
h.updateCPU(usage)
}
}
}
}

return eUsage
}
3 changes: 2 additions & 1 deletion pkg/service/service_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *Service) AddHandler(egressID string, p *Process) error {

select {
case <-p.ready:
s.UpdatePID(egressID, p.cmd.Process.Pid)
go func() {
err := p.cmd.Wait()
s.processEnded(p, err)
Expand All @@ -178,7 +179,7 @@ func (s *Service) processEnded(p *Process, err error) {
s.Stop(false)
}

avgCPU, maxCPU := p.getUsageStats()
avgCPU, maxCPU := s.GetUsageStats(p.info.EgressId)
logger.Infow("egress stats", "egressID", p.req.EgressId, "avgCPU", avgCPU, "maxCPU", maxCPU)

s.EgressEnded(p.req)
Expand Down
127 changes: 99 additions & 28 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,25 @@ type Monitor struct {
requestGauge *prometheus.GaugeVec

cpuStats *utils.CPUStats
requests atomic.Int32

pendingCPUs atomic.Float64
mu sync.Mutex
pending map[string]*processStats
procStats map[int]*processStats
}

type processStats struct {
egressID string

pendingUsage float64
lastUsage float64

mu sync.Mutex
requests atomic.Int32
prevEgressUsage map[string]float64
totalCPU float64
cpuCounter int
maxCPU float64
}

const cpuHoldDuration = time.Second * 5
const cpuHoldDuration = time.Second * 30

func NewMonitor(conf *config.ServiceConfig) *Monitor {
return &Monitor{
Expand All @@ -59,20 +69,30 @@ func (m *Monitor) Start(
conf *config.ServiceConfig,
isIdle func() float64,
canAcceptRequest func() float64,
procUpdate func(map[int]float64) map[string]float64,
) error {
procStats, err := utils.NewProcCPUStats(func(idle float64, usage map[int]float64) {
m.promCPULoad.Set(1 - idle/m.cpuStats.NumCPU())
egressUsage := procUpdate(usage)
for egressID, cpuUsage := range egressUsage {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(cpuUsage)
}
for egressID := range m.prevEgressUsage {
if _, ok := egressUsage[egressID]; !ok {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": egressID}).Set(0)

m.mu.Unlock()
defer m.mu.Unlock()

for pid, cpuUsage := range usage {
if m.procStats[pid] == nil {
m.procStats[pid] = &processStats{}
}
procStats := m.procStats[pid]

procStats.lastUsage = cpuUsage
procStats.totalCPU += cpuUsage
procStats.cpuCounter++
if cpuUsage > procStats.maxCPU {
procStats.maxCPU = cpuUsage
}

if procStats.egressID != "" {
m.promProcCPULoad.With(prometheus.Labels{"egress_id": procStats.egressID}).Set(cpuUsage)
}
}
m.prevEgressUsage = egressUsage
})
if err != nil {
return err
Expand Down Expand Up @@ -168,33 +188,78 @@ func (m *Monitor) GetRequestCount() int {
return int(m.requests.Load())
}

func (m *Monitor) UpdatePID(egressID string, pid int) {
m.mu.Lock()
defer m.mu.Unlock()

ps := m.pending[egressID]
delete(m.pending, egressID)

if existing := m.procStats[pid]; existing != nil {
ps.maxCPU = existing.maxCPU
ps.totalCPU = existing.totalCPU
ps.cpuCounter = existing.cpuCounter
}
m.procStats[pid] = ps
}

func (m *Monitor) GetUsageStats(egressID string) (float64, float64) {
m.mu.Lock()
defer m.mu.Unlock()

for pid, ps := range m.procStats {
if ps.egressID == egressID {
delete(m.procStats, pid)
return ps.totalCPU / float64(ps.cpuCounter), ps.maxCPU
}
}

return 0, 0
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
m.mu.Lock()
defer m.mu.Unlock()

return m.canAcceptRequest(req)
return m.canAcceptRequestLocked(req)
}

func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool {
func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool {
accept := false

total := m.cpuStats.NumCPU()
available := m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load()

logger.Debugw("cpu check",
"total", total,
"available", available,
"active_requests", m.requests,
)

var available float64
if m.requests.Load() == 0 {
// if no requests, use total
available = total
} else {
var used float64
for _, ps := range m.pending {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
} else {
used += ps.lastUsage
}
}
for _, ps := range m.procStats {
if ps.pendingUsage > ps.lastUsage {
used += ps.pendingUsage
} else {
used += ps.lastUsage
}
}

// if already running requests, cap usage at MaxCpuUtilization
available -= (1 - m.cpuCostConfig.MaxCpuUtilization) * total
available = total - used - (total * (1 - m.cpuCostConfig.MaxCpuUtilization))
}

logger.Debugw("cpu check",
"total", total,
"available", available,
"active_requests", m.requests,
)

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
accept = available >= m.cpuCostConfig.RoomCompositeCpuCost
Expand All @@ -215,10 +280,12 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
m.mu.Lock()
defer m.mu.Unlock()

if !m.canAcceptRequest(req) {
if !m.canAcceptRequestLocked(req) {
return errors.ErrResourceExhausted
}

m.requests.Inc()

var cpuHold float64
switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
Expand All @@ -233,9 +300,13 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
cpuHold = m.cpuCostConfig.TrackCpuCost
}

m.requests.Inc()
m.pendingCPUs.Add(cpuHold)
time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) })
ps := &processStats{
egressID: req.EgressId,
pendingUsage: cpuHold,
}
time.AfterFunc(cpuHoldDuration, func() { ps.pendingUsage = 0 })
m.pending[req.EgressId] = ps

return nil
}

Expand Down

0 comments on commit 9835fc7

Please sign in to comment.