Skip to content

Commit

Permalink
new strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Dec 15, 2023
1 parent 9f5f461 commit e0f40d3
Showing 1 changed file with 38 additions and 27 deletions.
65 changes: 38 additions & 27 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package stats

import (
"fmt"
"math"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -47,7 +48,7 @@ type Monitor struct {
prevEgressUsage map[string]float64
}

const cpuHoldDuration = time.Second * 10
const cpuHoldDuration = time.Second * 30

func NewMonitor(conf *config.ServiceConfig) *Monitor {
return &Monitor{
Expand Down Expand Up @@ -176,38 +177,41 @@ func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
}

func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool {
accept := false
total := m.cpuStats.NumCPU()

var available float64
if m.requests.Load() == 0 {
// if no requests, use total
available = total - m.pendingCPUs.Load()
} else {
// if already running requests, cap usage at MaxCpuUtilization
available = m.cpuStats.GetCPUIdle() - m.pendingCPUs.Load() - (1-m.cpuCostConfig.MaxCpuUtilization)*total
}
available := math.Min(m.getAvailableCPU(), m.cpuStats.NumCPU()-m.pendingCPUs.Load())

logger.Debugw("cpu check",
"total", total,
"total", m.cpuStats.NumCPU(),
"available", available,
"requests", m.requests,
)

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
accept = available >= m.cpuCostConfig.RoomCompositeCpuCost
return available >= m.cpuCostConfig.RoomCompositeCpuCost
case *rpc.StartEgressRequest_Web:
accept = available >= m.cpuCostConfig.WebCpuCost
return available >= m.cpuCostConfig.WebCpuCost
case *rpc.StartEgressRequest_Participant:
accept = available >= m.cpuCostConfig.ParticipantCpuCost
return available >= m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
accept = available >= m.cpuCostConfig.TrackCompositeCpuCost
return available >= m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
accept = available >= m.cpuCostConfig.TrackCpuCost
return available >= m.cpuCostConfig.TrackCpuCost
default:
logger.Warnw("unrecognized request type", nil)
return false
}
}

func (m *Monitor) getAvailableCPU() float64 {
total := m.cpuStats.NumCPU()

return accept
// if no requests, use total
if m.requests.Load() == 0 {
return total
}

// if already running requests, cap usage at MaxCpuUtilization
return m.cpuStats.GetCPUIdle() - (1-m.cpuCostConfig.MaxCpuUtilization)*total
}

func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
Expand All @@ -218,23 +222,30 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
return errors.ErrResourceExhausted
}

var cpuHold float64
pending := m.getAvailableCPU()
switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
cpuHold = m.cpuCostConfig.RoomCompositeCpuCost
pending = m.cpuCostConfig.RoomCompositeCpuCost
case *rpc.StartEgressRequest_Web:
cpuHold = m.cpuCostConfig.WebCpuCost
pending = m.cpuCostConfig.WebCpuCost
case *rpc.StartEgressRequest_Participant:
cpuHold = m.cpuCostConfig.ParticipantCpuCost
pending = m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
cpuHold = m.cpuCostConfig.TrackCompositeCpuCost
pending = m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
cpuHold = m.cpuCostConfig.TrackCpuCost
pending = m.cpuCostConfig.TrackCpuCost
}

m.requests.Inc()
m.pendingCPUs.Add(cpuHold)
time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) })
m.pendingCPUs.Store(pending)
time.AfterFunc(cpuHoldDuration, func() {
m.mu.Lock()
if m.pendingCPUs.Load() == pending {
m.pendingCPUs.Store(0)
}
m.mu.Unlock()
})

return nil
}

Expand Down

0 comments on commit e0f40d3

Please sign in to comment.