Skip to content

Commit

Permalink
remove cpu reservation
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Dec 4, 2023
1 parent d157525 commit 43afe75
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 37 deletions.
1 change: 0 additions & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func (s *Service) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressR
func (s *Service) Status() ([]byte, error) {
info := map[string]interface{}{
"CpuLoad": s.GetCPULoad(),
"CpuHold": s.GetCPUHold(),
}

s.mu.RLock()
Expand Down
48 changes: 12 additions & 36 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ type Monitor struct {
pendingCPUs atomic.Float64

mu sync.Mutex
counts map[string]int
reserved float64
requests atomic.Int32
}

const cpuHoldDuration = time.Second * 5

func NewMonitor(conf *config.ServiceConfig) *Monitor {
return &Monitor{
cpuCostConfig: conf.CPUCostConfig,
counts: make(map[string]int),
}
}

Expand Down Expand Up @@ -147,13 +145,6 @@ func (m *Monitor) GetCPULoad() float64 {
return (m.cpuStats.NumCPU() - m.cpuStats.GetCPUIdle()) / m.cpuStats.NumCPU() * 100
}

func (m *Monitor) GetCPUHold() float64 {
m.mu.Lock()
defer m.mu.Unlock()

return m.reserved
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -170,13 +161,15 @@ func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool {
logger.Debugw("cpu check",
"total", total,
"available", available,
"reserved", m.reserved,
"active_requests", m.requests,
)

if m.reserved == 0 {
if m.requests.Load() == 0 {
// if no requests, use total
available = total
} else if total-m.reserved < available {
available = total - m.reserved
} else {
// if already running requests, cap usage at 90%
available -= 0.9 * total
}

switch req.Request.(type) {
Expand Down Expand Up @@ -217,7 +210,7 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
cpuHold = m.cpuCostConfig.TrackCpuCost
}

m.reserved += cpuHold
m.requests.Inc()
m.pendingCPUs.Add(cpuHold)
time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) })
return nil
Expand All @@ -244,37 +237,20 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) {

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Sub(1)
case *rpc.StartEgressRequest_Web:
m.reserved -= m.cpuCostConfig.WebCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1)
case *rpc.StartEgressRequest_Participant:
m.reserved -= m.cpuCostConfig.ParticipantCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1)
case *rpc.StartEgressRequest_TrackComposite:
m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrackComposite}).Sub(1)
case *rpc.StartEgressRequest_Track:
m.reserved -= m.cpuCostConfig.TrackCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrack}).Sub(1)
}
}

func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) {
m.mu.Lock()
defer m.mu.Unlock()
m.requests.Dec()
}

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost
case *rpc.StartEgressRequest_Web:
m.reserved -= m.cpuCostConfig.WebCpuCost
case *rpc.StartEgressRequest_Participant:
m.reserved -= m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
m.reserved -= m.cpuCostConfig.TrackCpuCost
}
func (m *Monitor) EgressAborted(_ *rpc.StartEgressRequest) {
m.requests.Dec()
}

0 comments on commit 43afe75

Please sign in to comment.