Skip to content

Commit

Permalink
remove cpu reservation (#552)
Browse files Browse the repository at this point in the history
* remove cpu reservation

* remove IsIdle
  • Loading branch information
frostbyte73 authored Dec 4, 2023
1 parent b7bb753 commit 0c587a5
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 46 deletions.
7 changes: 0 additions & 7 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ func (s *Service) awaitCleanup(p *Process) {
s.mu.Unlock()
}

func (s *Service) IsIdle() bool {
s.mu.RLock()
defer s.mu.RUnlock()

return len(s.activeHandlers) == 0
}

func (s *Service) getGRPCClient(egressID string) (ipc.EgressHandlerClient, error) {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down
7 changes: 3 additions & 4 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *Service) StartEgressAffinity(ctx context.Context, req *rpc.StartEgressR
return -1
}

if s.IsIdle() {
if s.GetRequestCount() == 0 {
// group multiple track and track composite requests.
// if this instance is idle and another is already handling some, the request will go to that server.
// this avoids having many instances with one track request each, taking availability from room composite.
Expand Down Expand Up @@ -194,7 +194,6 @@ func (s *Service) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressR
func (s *Service) Status() ([]byte, error) {
info := map[string]interface{}{
"CpuLoad": s.GetCPULoad(),
"CpuHold": s.GetCPUHold(),
}

s.mu.RLock()
Expand All @@ -207,7 +206,7 @@ func (s *Service) Status() ([]byte, error) {
}

func (s *Service) promIsIdle() float64 {
if s.IsIdle() {
if s.GetRequestCount() == 0 {
return 1
}
return 0
Expand All @@ -234,7 +233,7 @@ func (s *Service) Stop(kill bool) {
}

func (s *Service) Close() {
for !s.IsIdle() {
for s.GetRequestCount() > 0 {
time.Sleep(shutdownTimer)
}
logger.Infow("closing server")
Expand Down
48 changes: 14 additions & 34 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ type Monitor struct {
pendingCPUs atomic.Float64

mu sync.Mutex
counts map[string]int
reserved float64
requests atomic.Int32
}

const cpuHoldDuration = time.Second * 5

func NewMonitor(conf *config.ServiceConfig) *Monitor {
return &Monitor{
cpuCostConfig: conf.CPUCostConfig,
counts: make(map[string]int),
}
}

Expand Down Expand Up @@ -147,11 +145,8 @@ func (m *Monitor) GetCPULoad() float64 {
return (m.cpuStats.NumCPU() - m.cpuStats.GetCPUIdle()) / m.cpuStats.NumCPU() * 100
}

func (m *Monitor) GetCPUHold() float64 {
m.mu.Lock()
defer m.mu.Unlock()

return m.reserved
func (m *Monitor) GetRequestCount() int {
return int(m.requests.Load())
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
Expand All @@ -170,13 +165,15 @@ func (m *Monitor) canAcceptRequest(req *rpc.StartEgressRequest) bool {
logger.Debugw("cpu check",
"total", total,
"available", available,
"reserved", m.reserved,
"active_requests", m.requests,
)

if m.reserved == 0 {
if m.requests.Load() == 0 {
// if no requests, use total
available = total
} else if total-m.reserved < available {
available = total - m.reserved
} else {
// if already running requests, cap usage at 90%
available -= 0.9 * total
}

switch req.Request.(type) {
Expand Down Expand Up @@ -217,7 +214,7 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
cpuHold = m.cpuCostConfig.TrackCpuCost
}

m.reserved += cpuHold
m.requests.Inc()
m.pendingCPUs.Add(cpuHold)
time.AfterFunc(cpuHoldDuration, func() { m.pendingCPUs.Sub(cpuHold) })
return nil
Expand All @@ -244,37 +241,20 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) {

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeRoomComposite}).Sub(1)
case *rpc.StartEgressRequest_Web:
m.reserved -= m.cpuCostConfig.WebCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeWeb}).Sub(1)
case *rpc.StartEgressRequest_Participant:
m.reserved -= m.cpuCostConfig.ParticipantCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeParticipant}).Sub(1)
case *rpc.StartEgressRequest_TrackComposite:
m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrackComposite}).Sub(1)
case *rpc.StartEgressRequest_Track:
m.reserved -= m.cpuCostConfig.TrackCpuCost
m.requestGauge.With(prometheus.Labels{"type": types.RequestTypeTrack}).Sub(1)
}
}

func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) {
m.mu.Lock()
defer m.mu.Unlock()
m.requests.Dec()
}

switch req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
m.reserved -= m.cpuCostConfig.RoomCompositeCpuCost
case *rpc.StartEgressRequest_Web:
m.reserved -= m.cpuCostConfig.WebCpuCost
case *rpc.StartEgressRequest_Participant:
m.reserved -= m.cpuCostConfig.ParticipantCpuCost
case *rpc.StartEgressRequest_TrackComposite:
m.reserved -= m.cpuCostConfig.TrackCompositeCpuCost
case *rpc.StartEgressRequest_Track:
m.reserved -= m.cpuCostConfig.TrackCpuCost
}
func (m *Monitor) EgressAborted(_ *rpc.StartEgressRequest) {
m.requests.Dec()
}
2 changes: 1 addition & 1 deletion test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type testCase struct {
func (r *Runner) awaitIdle(t *testing.T) {
r.svc.KillAll()
for i := 0; i < 30; i++ {
if r.svc.GetCPUHold() == 0 {
if r.svc.GetRequestCount() == 0 {
return
}
time.Sleep(time.Second)
Expand Down

0 comments on commit 0c587a5

Please sign in to comment.