Skip to content

Commit

Permalink
shut down in internal error
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed May 16, 2024
1 parent 34fe4be commit 2dddfec
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 134 deletions.
11 changes: 3 additions & 8 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,9 @@ func runHandler(c *cli.Context) error {
bus := psrpc.NewRedisMessageBus(rc)
h, err := handler.NewHandler(conf, bus)
if err != nil {
if errors.IsFatal(err) {
// service will send info update and shut down
logger.Errorw("fatal error", err)
return err
} else {
// update sent by handler
return nil
}
// service will send info update and shut down
logger.Errorw("failed to create handler", err)
return err
}

go func() {
Expand Down
136 changes: 60 additions & 76 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,11 @@ package errors

import (
"errors"
"fmt"
"strings"

"github.com/livekit/psrpc"
)

var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU")
ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down")
)

func New(err string) error {
return errors.New(err)
}
Expand All @@ -50,28 +33,77 @@ func As(err error, target any) bool {
return errors.As(err, target)
}

type FatalError struct {
err error
type ErrArray struct {
errs []error
}

func (e *FatalError) Error() string {
return fmt.Sprintf("FATAL: %s", e.err.Error())
func (e *ErrArray) AppendErr(err error) {
e.errs = append(e.errs, err)
}

func (e *FatalError) Unwrap() error {
return e.err
func (e *ErrArray) Check(err error) {
if err != nil {
e.errs = append(e.errs, err)
}
}

func Fatal(err error) error {
return &FatalError{err}
func (e *ErrArray) ToError() psrpc.Error {
if len(e.errs) == 0 {
return nil
}

code := psrpc.Unknown
var errStr []string

// Return the code for the first error of type psrpc.Error
for _, err := range e.errs {
var psrpcErr psrpc.Error

if code == psrpc.Unknown && errors.As(err, &psrpcErr) {
code = psrpcErr.Code()
}

errStr = append(errStr, err.Error())
}

return psrpc.NewErrorf(code, "%s", strings.Join(errStr, "\n"))
}

func IsFatal(err error) bool {
e := &FatalError{}
// internal errors

return errors.As(err, &e)
var (
ErrNoConfig = psrpc.NewErrorf(psrpc.Internal, "missing config")
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found")
)

func ErrPadLinkFailed(src, sink, status string) error {
return psrpc.NewErrorf(psrpc.Internal, "failed to link %s to %s: %s", src, sink, status)
}

func ErrGstPipelineError(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

func ErrProcessStartFailed(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

// other errors

var (
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU")
ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down")
)

func ErrCouldNotParseConfig(err error) error {
return psrpc.NewErrorf(psrpc.InvalidArgument, "could not parse config: %v", err)
}
Expand Down Expand Up @@ -104,60 +136,12 @@ func ErrParticipantNotFound(identity string) error {
return psrpc.NewErrorf(psrpc.NotFound, "participant %s not found", identity)
}

func ErrPadLinkFailed(src, sink, status string) error {
return psrpc.NewErrorf(psrpc.Internal, "failed to link %s to %s: %s", src, sink, status)
}

func ErrGstPipelineError(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

// This can have many reasons, some related to invalid parameters, other because of system failure.
// Do not provide an error code until we have code to analyze the error from the underlying upload library further.
func ErrUploadFailed(location string, err error) error {
return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err)
}

func ErrProcessStartFailed(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

func ErrCPUExhausted(usage float64) error {
return psrpc.NewErrorf(psrpc.PermissionDenied, "CPU exhausted: %.2f cores used", usage)
}

type ErrArray struct {
errs []error
}

func (e *ErrArray) AppendErr(err error) {
e.errs = append(e.errs, err)
}

func (e *ErrArray) Check(err error) {
if err != nil {
e.errs = append(e.errs, err)
}
}

func (e *ErrArray) ToError() psrpc.Error {
if len(e.errs) == 0 {
return nil
}

code := psrpc.Unknown
var errStr []string

// Return the code for the first error of type psrpc.Error
for _, err := range e.errs {
var psrpcErr psrpc.Error

if code == psrpc.Unknown && errors.As(err, &psrpcErr) {
code = psrpcErr.Code()
}

errStr = append(errStr, err.Error())
}

return psrpc.NewErrorf(code, "%s", strings.Join(errStr, "\n"))
}
27 changes: 0 additions & 27 deletions pkg/errors/errors_test.go

This file was deleted.

35 changes: 17 additions & 18 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/protocol/livekit"
Expand Down Expand Up @@ -64,13 +63,13 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, er

rpcServer, err := rpc.NewEgressHandlerServer(h, bus)
if err != nil {
return nil, errors.Fatal(err)
return nil, err
}
if err = rpcServer.RegisterUpdateStreamTopic(conf.Info.EgressId); err != nil {
return nil, errors.Fatal(err)
return nil, err
}
if err = rpcServer.RegisterStopEgressTopic(conf.Info.EgressId); err != nil {
return nil, errors.Fatal(err)
return nil, err
}
h.rpcServer = rpcServer

Expand All @@ -80,24 +79,27 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, er
return nil, err
}

h.controller, err = pipeline.New(context.Background(), conf, h.ipcServiceClient)
h.initialized.Break()
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
conf.Info.SetFailed(err)
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(conf.Info))
}
return nil, err
}

return h, nil
}

func (h *Handler) Run() {
ctx, span := tracer.Start(context.Background(), "Handler.Run")
defer span.End()

defer func() {
h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
}()

var err error
h.controller, err = pipeline.New(context.Background(), h.conf, h.ipcServiceClient)
h.initialized.Break()
if err != nil {
h.conf.Info.SetFailed(err)
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(h.conf.Info))
return
}

// start egress
res := h.controller.Run(ctx)
m, err := h.GenerateMetrics(ctx)
Expand All @@ -110,9 +112,6 @@ func (h *Handler) Run() {
Metrics: m,
Info: (*livekit.EgressInfo)(res),
})

h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
}

func (h *Handler) Kill() {
Expand Down
13 changes: 8 additions & 5 deletions pkg/pipeline/source/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *WebSource) createPulseSink(ctx context.Context, p *config.PipelineConfi
cmd.Stderr = &infoLogger{cmd: "pactl"}
err := cmd.Run()
if err != nil {
return errors.Fatal(errors.ErrProcessStartFailed(err))
return errors.ErrProcessStartFailed(err)
}

s.pulseSink = strings.TrimRight(b.String(), "\n")
Expand All @@ -170,7 +170,7 @@ func (s *WebSource) launchXvfb(ctx context.Context, p *config.PipelineConfig) er
xvfb := exec.Command("Xvfb", p.Display, "-screen", "0", dims, "-ac", "-nolisten", "tcp", "-nolisten", "unix")
xvfb.Stderr = &infoLogger{cmd: "xvfb"}
if err := xvfb.Start(); err != nil {
return errors.Fatal(errors.ErrProcessStartFailed(err))
return errors.ErrProcessStartFailed(err)
}

s.xvfb = xvfb
Expand Down Expand Up @@ -313,10 +313,13 @@ func (s *WebSource) launchChrome(ctx context.Context, p *config.PipelineConfig,
}`, &errString,
),
)
if err == nil && errString != "" {
err = errors.New(errString)
if err != nil {
return errors.ErrProcessStartFailed(err)
}
if errString != "" {
return errors.New(errString)
}
return err
return nil
}

func logChrome(eventType string, ev interface{ MarshalJSON() ([]byte, error) }) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/server/server_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package server

import (
"context"
"net/http"

"google.golang.org/protobuf/types/known/emptypb"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
Expand All @@ -37,6 +39,11 @@ func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*
logger.Errorw("failed to update egress", err)
}

if info.ErrorCode == int32(http.StatusInternalServerError) {
logger.Errorw("internal error, shutting down", errors.New(info.Error))
s.Shutdown(false)
}

return &emptypb.Empty{}, nil
}

Expand Down

0 comments on commit 2dddfec

Please sign in to comment.