Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cpu kill error message #576

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrStreamAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "stream already exists")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
Expand All @@ -38,6 +37,7 @@ var (
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found")
ErrCPUExhausted = psrpc.NewErrorf(psrpc.Unavailable, "CPU exhausted")
)

func New(err string) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/stats"
"github.com/livekit/egress/version"
Expand Down Expand Up @@ -183,11 +184,13 @@ func (s *Service) KillAll() {
}
}

func (s *Service) killProcess(egressID string) {
func (s *Service) killProcess(egressID string, maxUsage float64) {
s.mu.RLock()
defer s.mu.RUnlock()

if h, ok := s.activeHandlers[egressID]; ok {
logger.Errorw("killing egress", errors.ErrCPUExhausted, "egressID", egressID, "usage", maxUsage)
h.info.Error = errors.ErrCPUExhausted.Error()
h.kill()
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/service_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ func (s *Service) processEnded(p *Process, err error) {
p.info.UpdatedAt = now
p.info.EndedAt = now
p.info.Status = livekit.EgressStatus_EGRESS_FAILED
p.info.Error = "internal error"
if p.info.Error == "" {
p.info.Error = "internal error"
}
_, _ = s.ioClient.UpdateEgress(p.ctx, p.info)
s.Stop(false)
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ type Monitor struct {
cpuStats *utils.CPUStats
requests atomic.Int32

mu sync.Mutex
highCPUCount int
killThreshold float64
killProcess func(string)
pending map[string]*processStats
procStats map[int]*processStats
mu sync.Mutex
highCPUDuration int
killThreshold float64
killProcess func(string, float64)
pending map[string]*processStats
procStats map[int]*processStats
}

type processStats struct {
Expand All @@ -64,6 +64,7 @@ type processStats struct {
const (
cpuHoldDuration = time.Second * 30
defaultKillThreshold = 0.95
minKillDuration = 10
)

func NewMonitor(conf *config.ServiceConfig) *Monitor {
Expand All @@ -84,7 +85,7 @@ func (m *Monitor) Start(
conf *config.ServiceConfig,
isIdle func() float64,
canAcceptRequest func() float64,
killProcess func(string),
killProcess func(string, float64),
) error {
m.killProcess = killProcess

Expand Down Expand Up @@ -236,16 +237,15 @@ func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) {
)

if m.requests.Load() > 1 {
if m.highCPUCount < 3 {
m.highCPUCount++
m.highCPUDuration++
if m.highCPUDuration < minKillDuration {
return
} else {
m.killProcess(maxEgress)
}
m.killProcess(maxEgress, maxUsage)
}
}

m.highCPUCount = 0
m.highCPUDuration = 0
}

func (m *Monitor) CloseEgressStats(egressID string) (float64, float64) {
Expand Down