diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ea883edd..f427f6dc 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -27,7 +27,6 @@ import ( var ( ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config") ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin") - ErrStreamAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "stream already exists") ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline") ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both") ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress") @@ -38,6 +37,7 @@ var ( ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track") ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen") ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found") + ErrCPUExhausted = psrpc.NewErrorf(psrpc.Unavailable, "CPU exhausted") ) func New(err string) error { diff --git a/pkg/service/service.go b/pkg/service/service.go index dd1ae190..96e8bae9 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/version" @@ -183,11 +184,13 @@ func (s *Service) KillAll() { } } -func (s *Service) killProcess(egressID string) { +func (s *Service) killProcess(egressID string, maxUsage float64) { s.mu.RLock() defer s.mu.RUnlock() if h, ok := s.activeHandlers[egressID]; ok { + logger.Errorw("killing egress", errors.ErrCPUExhausted, "egressID", egressID, "usage", maxUsage) + h.info.Error = errors.ErrCPUExhausted.Error() h.kill() } } diff --git a/pkg/service/service_rpc.go b/pkg/service/service_rpc.go index 3ccb2736..127b403b 100644 --- a/pkg/service/service_rpc.go +++ b/pkg/service/service_rpc.go @@ -174,7 +174,9 @@ func (s *Service) processEnded(p *Process, err error) { p.info.UpdatedAt = now p.info.EndedAt = now p.info.Status = livekit.EgressStatus_EGRESS_FAILED - p.info.Error = "internal error" + if p.info.Error == "" { + p.info.Error = "internal error" + } _, _ = s.ioClient.UpdateEgress(p.ctx, p.info) s.Stop(false) } diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 53015b27..fa7e3ce3 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -41,12 +41,12 @@ type Monitor struct { cpuStats *utils.CPUStats requests atomic.Int32 - mu sync.Mutex - highCPUCount int - killThreshold float64 - killProcess func(string) - pending map[string]*processStats - procStats map[int]*processStats + mu sync.Mutex + highCPUDuration int + killThreshold float64 + killProcess func(string, float64) + pending map[string]*processStats + procStats map[int]*processStats } type processStats struct { @@ -64,6 +64,7 @@ type processStats struct { const ( cpuHoldDuration = time.Second * 30 defaultKillThreshold = 0.95 + minKillDuration = 10 ) func NewMonitor(conf *config.ServiceConfig) *Monitor { @@ -84,7 +85,7 @@ func (m *Monitor) Start( conf *config.ServiceConfig, isIdle func() float64, canAcceptRequest func() float64, - killProcess func(string), + killProcess func(string, float64), ) error { m.killProcess = killProcess @@ -236,16 +237,15 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { ) if m.requests.Load() > 1 { - if m.highCPUCount < 3 { - m.highCPUCount++ + m.highCPUDuration++ + if m.highCPUDuration < minKillDuration { return - } else { - m.killProcess(maxEgress) } + m.killProcess(maxEgress, maxUsage) } } - m.highCPUCount = 0 + m.highCPUDuration = 0 } func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {