From 52abee32f252a5090642b5b935d9ae9e5345ddec Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 13 May 2024 11:30:13 -0400 Subject: [PATCH 1/5] error codes --- go.mod | 2 +- go.sum | 4 +-- pkg/config/config_test.go | 9 ++--- pkg/config/pipeline.go | 5 +-- pkg/errors/errors.go | 9 +++-- pkg/handler/handler.go | 14 +++----- pkg/handler/handler_rpc.go | 4 +-- pkg/info/info.go | 58 ++++++++++++++++++++++++++++++ pkg/pipeline/controller.go | 73 ++++++++++++++------------------------ pkg/pipeline/source/web.go | 4 +-- pkg/service/service.go | 1 + pkg/service/service_rpc.go | 6 ++-- pkg/stats/monitor.go | 2 +- 13 files changed, 113 insertions(+), 78 deletions(-) create mode 100644 pkg/info/info.go diff --git a/go.mod b/go.mod index 0d908a63..c1386dd9 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/livekit/livekit-server v1.6.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/protocol v1.15.1-0.20240510165606-93a26f478d00 + github.com/livekit/protocol v1.16.1-0.20240513145257-511f517b1abf github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9 github.com/livekit/server-sdk-go/v2 v2.1.2-0.20240425022832-17b2be53a0d7 github.com/pion/rtp v1.8.6 diff --git a/go.sum b/go.sum index 7ac18d57..db4cb75d 100644 --- a/go.sum +++ b/go.sum @@ -168,8 +168,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df h1:DVhJRlF6/CtiyxJVy3QsbS9bf7GyUMuRZONMwZxIWpY= github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.15.1-0.20240510165606-93a26f478d00 h1:c5VOR2XrAgxjwvWpQIA0lDUX+YcpxGzxXtaRfhu510E= -github.com/livekit/protocol v1.15.1-0.20240510165606-93a26f478d00/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= +github.com/livekit/protocol v1.16.1-0.20240513145257-511f517b1abf h1:dUJ3PGgesuyWju8g3TL4WF9wuTdF758lL6UXFTVeO6E= +github.com/livekit/protocol v1.16.1-0.20240513145257-511f517b1abf/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w= github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9 h1:4CngtPIJ58WcQ1sUDGdxJDkTndQpN6M/T8jXvRAd7Oc= github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.1.2-0.20240425022832-17b2be53a0d7 h1:iZJnxZ0ZHMbiiUKHR3mV2CBWR5quNWv1disk7H4Iw5A= diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 246820bd..39fc00a8 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/livekit/egress/pkg/info" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" ) @@ -62,7 +63,7 @@ func TestRedactUpload(t *testing.T) { p, err := GetValidatedPipelineConfig(conf, fileReq) require.NoError(t, err) - require.Equal(t, "******", p.Info.GetRoomComposite().GetFile().GetS3().AccessKey) + require.Equal(t, "******", (*livekit.EgressInfo)(p.Info).GetRoomComposite().GetFile().GetS3().AccessKey) require.Len(t, p.Outputs, 1) output := p.GetFileConfig() @@ -111,12 +112,12 @@ func TestRedactStreamKeys(t *testing.T) { p, err := GetValidatedPipelineConfig(conf, streamReq) require.NoError(t, err) - urls := p.Info.GetRoomComposite().GetStream().GetUrls() + urls := (*livekit.EgressInfo)(p.Info).GetRoomComposite().GetStream().GetUrls() require.Len(t, urls, 2) require.Equal(t, redactedUrl1, urls[0]) require.Equal(t, redactedUrl2, urls[1]) - streamInfo := p.Info.GetStream() + streamInfo := (*livekit.EgressInfo)(p.Info).GetStream() require.Len(t, streamInfo.Info, 2) require.Equal(t, redactedUrl1, streamInfo.Info[0].Url) require.Equal(t, redactedUrl2, streamInfo.Info[1].Url) @@ -183,7 +184,7 @@ func TestSegmentNaming(t *testing.T) { expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "conf_test_2/filename", }, } { - p := &PipelineConfig{Info: &livekit.EgressInfo{EgressId: "egress_ID"}} + p := &PipelineConfig{Info: &info.EgressInfo{EgressId: "egress_ID"}} o, err := p.getSegmentConfig(&livekit.SegmentedFileOutput{ FilenamePrefix: test.filenamePrefix, PlaylistName: test.playlistName, diff --git a/pkg/config/pipeline.go b/pkg/config/pipeline.go index 72095276..b2b4182b 100644 --- a/pkg/config/pipeline.go +++ b/pkg/config/pipeline.go @@ -27,6 +27,7 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" @@ -54,7 +55,7 @@ type PipelineConfig struct { OutputCount int `yaml:"-"` FinalizationRequired bool `yaml:"-"` - Info *livekit.EgressInfo `yaml:"-"` + Info *info.EgressInfo `yaml:"-"` } type SourceConfig struct { @@ -160,7 +161,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error { } // start with defaults - p.Info = &livekit.EgressInfo{ + p.Info = &info.EgressInfo{ EgressId: request.EgressId, RoomId: request.RoomId, Status: livekit.EgressStatus_EGRESS_STARTING, diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 01c51f87..633b198c 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -27,15 +27,14 @@ var ( ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin") ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline") ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both") + ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen") + ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found") ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress") - ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found") ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") - ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU") - ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track") - ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen") - ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found") + ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track") ErrCPUExhausted = psrpc.NewErrorf(psrpc.Unavailable, "CPU exhausted") + ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found") ) func New(err string) error { diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 4af9a81e..ac00812f 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -18,13 +18,13 @@ import ( "context" "path" "strings" - "time" "github.com/frostbyte73/core" "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" + "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/egress/pkg/pipeline" "github.com/livekit/protocol/livekit" @@ -88,12 +88,8 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. if err != nil { if !errors.IsFatal(err) { // user error, send update - now := time.Now().UnixNano() - conf.Info.Status = livekit.EgressStatus_EGRESS_FAILED - conf.Info.Error = err.Error() - conf.Info.UpdatedAt = now - conf.Info.EndedAt = now - _, _ = h.ioClient.UpdateEgress(context.Background(), conf.Info) + conf.Info.SetFailed(err) + _, _ = h.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(conf.Info)) } return nil, err } @@ -106,7 +102,7 @@ func (h *Handler) Run() error { defer span.End() // start egress - result := make(chan *livekit.EgressInfo, 1) + result := make(chan *info.EgressInfo, 1) go func() { result <- h.controller.Run(ctx) }() @@ -121,7 +117,7 @@ func (h *Handler) Run() error { case res := <-result: // recording finished - _, _ = h.ioClient.UpdateEgress(ctx, res) + _, _ = h.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(res)) m, err := h.GenerateMetrics(ctx) if err == nil { diff --git a/pkg/handler/handler_rpc.go b/pkg/handler/handler_rpc.go index c21ab2c3..96c3532f 100644 --- a/pkg/handler/handler_rpc.go +++ b/pkg/handler/handler_rpc.go @@ -35,7 +35,7 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq if err != nil { return nil, err } - return h.controller.Info, nil + return (*livekit.EgressInfo)(h.controller.Info), nil } func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) { @@ -48,5 +48,5 @@ func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) } h.controller.SendEOS(ctx) - return h.controller.Info, nil + return (*livekit.EgressInfo)(h.controller.Info), nil } diff --git a/pkg/info/info.go b/pkg/info/info.go new file mode 100644 index 00000000..c2c329ac --- /dev/null +++ b/pkg/info/info.go @@ -0,0 +1,58 @@ +package info + +import ( + "net/http" + "time" + + "github.com/livekit/egress/pkg/errors" + "github.com/livekit/protocol/livekit" + "github.com/livekit/psrpc" +) + +type EgressInfo livekit.EgressInfo + +const ( + MsgStartNotReceived = "Start signal not received" + MsgLimitReached = "Session limit reached" + MsgLimitReachedWithoutStart = "Session limit reached before start signal" + MsgStoppedBeforeStarted = "Stop called before pipeline could start" +) + +func (e *EgressInfo) UpdateStatus(status livekit.EgressStatus) { + e.Status = status + e.UpdatedAt = time.Now().UnixNano() +} + +func (e *EgressInfo) SetLimitReached() { + now := time.Now().UnixNano() + e.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED + e.Details = MsgLimitReached + e.UpdatedAt = now + e.EndedAt = now +} + +func (e *EgressInfo) SetAborted(msg string) { + now := time.Now().UnixNano() + e.Status = livekit.EgressStatus_EGRESS_ABORTED + if e.Details == "" { + e.Details = msg + } else { + e.Details = e.Details + "; " + msg + } + e.UpdatedAt = now + e.EndedAt = now +} + +func (e *EgressInfo) SetFailed(err error) { + now := time.Now().UnixNano() + e.Status = livekit.EgressStatus_EGRESS_FAILED + e.UpdatedAt = now + e.EndedAt = now + e.Error = err.Error() + var perr psrpc.Error + if errors.As(err, &perr) { + e.ErrorCode = int32(perr.ToHttp()) + } else { + e.ErrorCode = int32(http.StatusInternalServerError) + } +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 0c4dac77..4a652605 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/gstreamer" + "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/pipeline/builder" "github.com/livekit/egress/pkg/pipeline/sink" "github.com/livekit/egress/pkg/pipeline/source" @@ -191,7 +192,7 @@ func (c *Controller) BuildPipeline() error { return nil } -func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { +func (c *Controller) Run(ctx context.Context) *info.EgressInfo { ctx, span := tracer.Start(ctx, "Pipeline.Run") defer span.End() @@ -213,10 +214,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { logger.Debugw("waiting for start signal") select { case <-c.stopped.Watch(): - c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - if c.Info.Details == "" { - c.Info.Details = "Start signal not received" - } + c.Info.SetAborted(info.MsgStartNotReceived) return c.Info case <-start: // continue @@ -226,16 +224,14 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { for _, si := range c.sinks { for _, s := range si { if err := s.Start(); err != nil { - c.Info.Status = livekit.EgressStatus_EGRESS_FAILED - c.Info.Error = err.Error() + c.Info.SetFailed(err) return c.Info } } } if err := c.p.Run(); err != nil { - c.Info.Status = livekit.EgressStatus_EGRESS_FAILED - c.Info.Error = err.Error() + c.Info.SetFailed(err) return c.Info } @@ -243,8 +239,7 @@ func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo { for _, si := range c.sinks { for _, s := range si { if err := s.Close(); err != nil && c.playing.IsBroken() { - c.Info.Status = livekit.EgressStatus_EGRESS_FAILED - c.Info.Error = err.Error() + c.Info.SetFailed(err) return c.Info } } @@ -293,7 +288,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream } o.StreamInfo[url] = streamInfo c.Info.StreamResults = append(c.Info.StreamResults, streamInfo) - if list := c.Info.GetStream(); list != nil { + if list := (*livekit.EgressInfo)(c.Info).GetStream(); list != nil { list.Info = append(list.Info, streamInfo) } c.mu.Unlock() @@ -317,7 +312,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream if sendUpdate { c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ioClient.UpdateEgress(ctx, c.Info) + _, _ = c.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(c.Info)) } return errs.ToError() @@ -377,7 +372,7 @@ func (c *Controller) removeSink(ctx context.Context, url string, streamErr error // only send updates if the egress will continue, otherwise it's handled by UpdateStream RPC if streamErr != nil { c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ioClient.UpdateEgress(ctx, c.Info) + _, _ = c.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(c.Info)) } return c.streamBin.RemoveStream(url) @@ -395,10 +390,7 @@ func (c *Controller) SendEOS(ctx context.Context) { } switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: - c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - if c.Info.Details == "" { - c.Info.Details = "Stop called before pipeline could start" - } + c.Info.SetAborted(info.MsgStoppedBeforeStarted) fallthrough case livekit.EgressStatus_EGRESS_ABORTED, @@ -406,13 +398,12 @@ func (c *Controller) SendEOS(ctx context.Context) { c.p.Stop() case livekit.EgressStatus_EGRESS_ACTIVE: - c.Info.Status = livekit.EgressStatus_EGRESS_ENDING + c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING) fallthrough case livekit.EgressStatus_EGRESS_ENDING, livekit.EgressStatus_EGRESS_LIMIT_REACHED: - c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ioClient.UpdateEgress(ctx, c.Info) + _, _ = c.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(c.Info)) go func() { c.eosTimer = time.AfterFunc(time.Second*30, func() { @@ -434,8 +425,7 @@ func (c *Controller) OnError(err error) { } if c.Info.Status != livekit.EgressStatus_EGRESS_FAILED && (!c.eos.IsBroken() || c.FinalizationRequired) { - c.Info.Status = livekit.EgressStatus_EGRESS_FAILED - c.Info.Error = err.Error() + c.Info.SetFailed(err) } go c.p.Stop() @@ -449,10 +439,6 @@ func (c *Controller) Close() { logger.Debugw("closing source") c.src.Close() - now := time.Now().UnixNano() - c.Info.UpdatedAt = now - c.Info.EndedAt = now - // update status if c.Info.Status == livekit.EgressStatus_EGRESS_FAILED { if o := c.GetStreamConfig(); o != nil { @@ -465,14 +451,11 @@ func (c *Controller) Close() { // ensure egress ends with a final state switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: - c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - if c.Info.Details == "" { - c.Info.Details = "Stop called before pipeline could start" - } + c.Info.SetAborted(info.MsgStoppedBeforeStarted) case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: - c.Info.Status = livekit.EgressStatus_EGRESS_COMPLETE + c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_COMPLETE) } for _, si := range c.sinks { @@ -506,13 +489,10 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) { c.limitTimer = time.AfterFunc(timeout, func() { switch c.Info.Status { case livekit.EgressStatus_EGRESS_STARTING: - c.Info.Status = livekit.EgressStatus_EGRESS_ABORTED - if c.Info.Details == "" { - c.Info.Details = "Session limit reached before start signal" - } + c.Info.SetAborted(info.MsgLimitReachedWithoutStart) + case livekit.EgressStatus_EGRESS_ACTIVE: - c.Info.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED - c.Info.Details = "Session limit reached" + c.Info.SetLimitReached() } if c.playing.IsBroken() { c.SendEOS(ctx) @@ -551,9 +531,8 @@ func (c *Controller) updateStartTime(startedAt int64) { } if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING { - c.Info.Status = livekit.EgressStatus_EGRESS_ACTIVE - c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ioClient.UpdateEgress(context.Background(), c.Info) + c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE) + _, _ = c.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(c.Info)) } } @@ -564,13 +543,13 @@ func (c *Controller) updateDuration(endedAt int64) { } switch egressType { case types.EgressTypeStream, types.EgressTypeWebsocket: - for _, info := range o[0].(*config.StreamConfig).StreamInfo { - info.Status = livekit.StreamInfo_FINISHED - if info.StartedAt == 0 { - info.StartedAt = endedAt + for _, streamInfo := range o[0].(*config.StreamConfig).StreamInfo { + streamInfo.Status = livekit.StreamInfo_FINISHED + if streamInfo.StartedAt == 0 { + streamInfo.StartedAt = endedAt } - info.EndedAt = endedAt - info.Duration = endedAt - info.StartedAt + streamInfo.EndedAt = endedAt + streamInfo.Duration = endedAt - streamInfo.StartedAt } case types.EgressTypeFile: diff --git a/pkg/pipeline/source/web.go b/pkg/pipeline/source/web.go index 7ba54622..f83d8b49 100644 --- a/pkg/pipeline/source/web.go +++ b/pkg/pipeline/source/web.go @@ -31,7 +31,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" - "github.com/livekit/protocol/livekit" + "github.com/livekit/egress/pkg/info" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/tracer" ) @@ -49,7 +49,7 @@ type WebSource struct { startRecording chan struct{} endRecording chan struct{} - info *livekit.EgressInfo + info *info.EgressInfo } func init() { diff --git a/pkg/service/service.go b/pkg/service/service.go index d0e8a4e7..83df5a46 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -196,6 +196,7 @@ func (s *Service) killProcess(egressID string, maxUsage float64) { now := time.Now().UnixNano() h.info.Status = livekit.EgressStatus_EGRESS_FAILED h.info.Error = errors.ErrCPUExhausted.Error() + h.info.ErrorCode = int32(http.StatusServiceUnavailable) h.info.UpdatedAt = now h.info.EndedAt = now h.kill() diff --git a/pkg/service/service_rpc.go b/pkg/service/service_rpc.go index 482649cd..4218c769 100644 --- a/pkg/service/service_rpc.go +++ b/pkg/service/service_rpc.go @@ -50,7 +50,7 @@ func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) return nil, err } - _, err = s.ioClient.CreateEgress(ctx, p.Info) + _, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info)) if err != nil { s.EgressAborted(req) return nil, err @@ -65,13 +65,13 @@ func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) "request", p.Info.Request, ) - err = s.launchHandler(req, p.Info) + err = s.launchHandler(req, (*livekit.EgressInfo)(p.Info)) if err != nil { s.EgressAborted(req) return nil, err } - return p.Info, nil + return (*livekit.EgressInfo)(p.Info), nil } func (s *Service) StartEgressAffinity(_ context.Context, req *rpc.StartEgressRequest) float32 { diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 6a9d29e9..bbc25b4e 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -341,7 +341,7 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { defer m.mu.Unlock() if !m.canAcceptRequestLocked(req) { - return errors.ErrResourceExhausted + return errors.ErrCPUExhausted } m.requests.Inc() From 3e7dd4773d7086aa214ad4d5366c6425770602e3 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 13 May 2024 11:37:15 -0400 Subject: [PATCH 2/5] update cpu exhausted --- pkg/errors/errors.go | 5 ++++- pkg/service/service.go | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 633b198c..2e2d09f6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -33,7 +33,6 @@ var ( ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track") - ErrCPUExhausted = psrpc.NewErrorf(psrpc.Unavailable, "CPU exhausted") ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found") ) @@ -121,6 +120,10 @@ func ErrProcessStartFailed(err error) error { return psrpc.NewError(psrpc.Internal, err) } +func ErrCPUExhausted(usage float64) error { + return psrpc.NewErrorf(psrpc.PermissionDenied, "CPU exhausted: %.2f cores used", usage) +} + type ErrArray struct { errs []error } diff --git a/pkg/service/service.go b/pkg/service/service.go index 83df5a46..66fc7559 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -192,11 +192,13 @@ func (s *Service) killProcess(egressID string, maxUsage float64) { defer s.mu.RUnlock() if h, ok := s.activeHandlers[egressID]; ok { - logger.Errorw("killing egress", errors.ErrCPUExhausted, "egressID", egressID, "usage", maxUsage) + err := errors.ErrCPUExhausted(maxUsage) + logger.Errorw("killing egress", err, "egressID", egressID) + now := time.Now().UnixNano() h.info.Status = livekit.EgressStatus_EGRESS_FAILED - h.info.Error = errors.ErrCPUExhausted.Error() - h.info.ErrorCode = int32(http.StatusServiceUnavailable) + h.info.Error = err.Error() + h.info.ErrorCode = int32(http.StatusForbidden) h.info.UpdatedAt = now h.info.EndedAt = now h.kill() From f303f1a1135dca3f5355346e81ee599c9736c3f8 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 13 May 2024 11:38:16 -0400 Subject: [PATCH 3/5] add code for internal error --- pkg/service/service_rpc.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/service/service_rpc.go b/pkg/service/service_rpc.go index 4218c769..48410d53 100644 --- a/pkg/service/service_rpc.go +++ b/pkg/service/service_rpc.go @@ -16,6 +16,7 @@ package service import ( "context" + "net/http" "os" "os/exec" "path" @@ -177,6 +178,7 @@ func (s *Service) processEnded(p *Process, err error) { p.info.Status = livekit.EgressStatus_EGRESS_FAILED if p.info.Error == "" { p.info.Error = "internal error" + p.info.ErrorCode = int32(http.StatusInternalServerError) } _, _ = s.ioClient.UpdateEgress(p.ctx, p.info) if p.info.Error == "internal error" { From 96087945dce294430e2ed36bf2e26c16b7e92815 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 13 May 2024 11:48:53 -0400 Subject: [PATCH 4/5] fix Accept error --- pkg/errors/errors.go | 1 + pkg/stats/monitor.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 2e2d09f6..56484dae 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -34,6 +34,7 @@ var ( ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track") ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found") + ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU") ) func New(err string) error { diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index bbc25b4e..aea5e748 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -341,7 +341,7 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { defer m.mu.Unlock() if !m.canAcceptRequestLocked(req) { - return errors.ErrCPUExhausted + return errors.ErrNotEnoughCPU } m.requests.Inc() From f241e0f08ffac7ca17a8c3ba23e0865665a82c2b Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 13 May 2024 12:06:16 -0400 Subject: [PATCH 5/5] add SetComplete --- pkg/info/info.go | 7 +++++++ pkg/pipeline/controller.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/info/info.go b/pkg/info/info.go index c2c329ac..681e8436 100644 --- a/pkg/info/info.go +++ b/pkg/info/info.go @@ -56,3 +56,10 @@ func (e *EgressInfo) SetFailed(err error) { e.ErrorCode = int32(http.StatusInternalServerError) } } + +func (e *EgressInfo) SetComplete() { + now := time.Now().UnixNano() + e.Status = livekit.EgressStatus_EGRESS_COMPLETE + e.UpdatedAt = now + e.EndedAt = now +} diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 4a652605..51d4519a 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -455,7 +455,7 @@ func (c *Controller) Close() { case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: - c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_COMPLETE) + c.Info.SetComplete() } for _, si := range c.sinks {