Skip to content

Commit

Permalink
Add cpu kill error message (#576)
Browse files Browse the repository at this point in the history
* add kill error message

* log error with usage
  • Loading branch information
frostbyte73 authored Jan 8, 2024
1 parent 56f73eb commit d6b560b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrStreamAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "stream already exists")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
Expand All @@ -38,6 +37,7 @@ var (
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found")
ErrCPUExhausted = psrpc.NewErrorf(psrpc.Unavailable, "CPU exhausted")
)

func New(err string) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/version"
Expand Down Expand Up @@ -183,11 +184,13 @@ func (s *Service) KillAll() {
}
}

func (s *Service) killProcess(egressID string) {
func (s *Service) killProcess(egressID string, maxUsage float64) {
s.mu.RLock()
defer s.mu.RUnlock()

if h, ok := s.activeHandlers[egressID]; ok {
logger.Errorw("killing egress", errors.ErrCPUExhausted, "egressID", egressID, "usage", maxUsage)
h.info.Error = errors.ErrCPUExhausted.Error()
h.kill()
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/service_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ func (s *Service) processEnded(p *Process, err error) {
p.info.UpdatedAt = now
p.info.EndedAt = now
p.info.Status = livekit.EgressStatus_EGRESS_FAILED
p.info.Error = "internal error"
if p.info.Error == "" {
p.info.Error = "internal error"
}
_, _ = s.ioClient.UpdateEgress(p.ctx, p.info)
s.Stop(false)
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type Monitor struct {
cpuStats *utils.CPUStats
requests atomic.Int32

mu sync.Mutex
highCPUCount int
killThreshold float64
killProcess func(string)
pending map[string]*processStats
procStats map[int]*processStats
mu sync.Mutex
highCPUDuration int
killThreshold float64
killProcess func(string, float64)
pending map[string]*processStats
procStats map[int]*processStats
}

type processStats struct {
Expand All @@ -64,6 +64,7 @@ type processStats struct {
const (
cpuHoldDuration = time.Second * 30
defaultKillThreshold = 0.95
minKillDuration = 10
)

func NewMonitor(conf *config.ServiceConfig) *Monitor {
Expand All @@ -84,7 +85,7 @@ func (m *Monitor) Start(
conf *config.ServiceConfig,
isIdle func() float64,
canAcceptRequest func() float64,
killProcess func(string),
killProcess func(string, float64),
) error {
m.killProcess = killProcess

Expand Down Expand Up @@ -236,16 +237,15 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
)

if m.requests.Load() > 1 {
if m.highCPUCount < 3 {
m.highCPUCount++
m.highCPUDuration++
if m.highCPUDuration < minKillDuration {
return
} else {
m.killProcess(maxEgress)
}
m.killProcess(maxEgress, maxUsage)
}
}

m.highCPUCount = 0
m.highCPUDuration = 0
}

func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {
Expand Down

0 comments on commit d6b560b

Please sign in to comment.