Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error codes #671

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/livekit/livekit-server v1.6.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.15.1-0.20240510165606-93a26f478d00
github.com/livekit/protocol v1.16.1-0.20240513145257-511f517b1abf
github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9
github.com/livekit/server-sdk-go/v2 v2.1.2-0.20240425022832-17b2be53a0d7
github.com/pion/rtp v1.8.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df h1:DVhJRlF6/CtiyxJVy3QsbS9bf7GyUMuRZONMwZxIWpY=
github.com/livekit/mediatransportutil v0.0.0-20240406063423-a67d961689df/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.15.1-0.20240510165606-93a26f478d00 h1:c5VOR2XrAgxjwvWpQIA0lDUX+YcpxGzxXtaRfhu510E=
github.com/livekit/protocol v1.15.1-0.20240510165606-93a26f478d00/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/protocol v1.16.1-0.20240513145257-511f517b1abf h1:dUJ3PGgesuyWju8g3TL4WF9wuTdF758lL6UXFTVeO6E=
github.com/livekit/protocol v1.16.1-0.20240513145257-511f517b1abf/go.mod h1:pnn0Dv+/0K0OFqKHX6J6SreYO1dZxl6tDuAZ1ns8L/w=
github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9 h1:4CngtPIJ58WcQ1sUDGdxJDkTndQpN6M/T8jXvRAd7Oc=
github.com/livekit/psrpc v0.5.3-0.20240403150641-811331b106d9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.1.2-0.20240425022832-17b2be53a0d7 h1:iZJnxZ0ZHMbiiUKHR3mV2CBWR5quNWv1disk7H4Iw5A=
Expand Down
9 changes: 5 additions & 4 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/livekit/egress/pkg/info"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestRedactUpload(t *testing.T) {
p, err := GetValidatedPipelineConfig(conf, fileReq)
require.NoError(t, err)

require.Equal(t, "******", p.Info.GetRoomComposite().GetFile().GetS3().AccessKey)
require.Equal(t, "******", (*livekit.EgressInfo)(p.Info).GetRoomComposite().GetFile().GetS3().AccessKey)

require.Len(t, p.Outputs, 1)
output := p.GetFileConfig()
Expand Down Expand Up @@ -111,12 +112,12 @@ func TestRedactStreamKeys(t *testing.T) {
p, err := GetValidatedPipelineConfig(conf, streamReq)
require.NoError(t, err)

urls := p.Info.GetRoomComposite().GetStream().GetUrls()
urls := (*livekit.EgressInfo)(p.Info).GetRoomComposite().GetStream().GetUrls()
require.Len(t, urls, 2)
require.Equal(t, redactedUrl1, urls[0])
require.Equal(t, redactedUrl2, urls[1])

streamInfo := p.Info.GetStream()
streamInfo := (*livekit.EgressInfo)(p.Info).GetStream()
require.Len(t, streamInfo.Info, 2)
require.Equal(t, redactedUrl1, streamInfo.Info[0].Url)
require.Equal(t, redactedUrl2, streamInfo.Info[1].Url)
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestSegmentNaming(t *testing.T) {
expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "conf_test_2/filename",
},
} {
p := &PipelineConfig{Info: &livekit.EgressInfo{EgressId: "egress_ID"}}
p := &PipelineConfig{Info: &info.EgressInfo{EgressId: "egress_ID"}}
o, err := p.getSegmentConfig(&livekit.SegmentedFileOutput{
FilenamePrefix: test.filenamePrefix,
PlaylistName: test.playlistName,
Expand Down
5 changes: 3 additions & 2 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
Expand Down Expand Up @@ -54,7 +55,7 @@ type PipelineConfig struct {
OutputCount int `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

Info *livekit.EgressInfo `yaml:"-"`
Info *info.EgressInfo `yaml:"-"`
}

type SourceConfig struct {
Expand Down Expand Up @@ -160,7 +161,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {
}

// start with defaults
p.Info = &livekit.EgressInfo{
p.Info = &info.EgressInfo{
EgressId: request.EgressId,
RoomId: request.RoomId,
Status: livekit.EgressStatus_EGRESS_STARTING,
Expand Down
14 changes: 8 additions & 6 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ var (
ErrGhostPadFailed = psrpc.NewErrorf(psrpc.Internal, "failed to add ghost pad to bin")
ErrBinAlreadyAdded = psrpc.NewErrorf(psrpc.Internal, "bin already added to pipeline")
ErrWrongHierarchy = psrpc.NewErrorf(psrpc.Internal, "pipeline can contain bins or elements, not both")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found")
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrResourceExhausted = psrpc.NewErrorf(psrpc.ResourceExhausted, "not enough CPU")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Internal, "failed to subscribe to track")
ErrPipelineFrozen = psrpc.NewErrorf(psrpc.Internal, "pipeline frozen")
ErrSinkNotFound = psrpc.NewErrorf(psrpc.Internal, "sink not found")
ErrCPUExhausted = psrpc.NewErrorf(psrpc.Unavailable, "CPU exhausted")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
)

func New(err string) error {
Expand Down Expand Up @@ -122,6 +120,10 @@ func ErrProcessStartFailed(err error) error {
return psrpc.NewError(psrpc.Internal, err)
}

func ErrCPUExhausted(usage float64) error {
return psrpc.NewErrorf(psrpc.PermissionDenied, "CPU exhausted: %.2f cores used", usage)
}

type ErrArray struct {
errs []error
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"context"
"path"
"strings"
"time"

"github.com/frostbyte73/core"
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/protocol/livekit"
Expand Down Expand Up @@ -88,12 +88,8 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
now := time.Now().UnixNano()
conf.Info.Status = livekit.EgressStatus_EGRESS_FAILED
conf.Info.Error = err.Error()
conf.Info.UpdatedAt = now
conf.Info.EndedAt = now
_, _ = h.ioClient.UpdateEgress(context.Background(), conf.Info)
conf.Info.SetFailed(err)
_, _ = h.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(conf.Info))
}
return nil, err
}
Expand All @@ -106,7 +102,7 @@ func (h *Handler) Run() error {
defer span.End()

// start egress
result := make(chan *livekit.EgressInfo, 1)
result := make(chan *info.EgressInfo, 1)
go func() {
result <- h.controller.Run(ctx)
}()
Expand All @@ -121,7 +117,7 @@ func (h *Handler) Run() error {

case res := <-result:
// recording finished
_, _ = h.ioClient.UpdateEgress(ctx, res)
_, _ = h.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(res))

m, err := h.GenerateMetrics(ctx)
if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/handler_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq
if err != nil {
return nil, err
}
return h.controller.Info, nil
return (*livekit.EgressInfo)(h.controller.Info), nil
}

func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) {
Expand All @@ -48,5 +48,5 @@ func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest)
}

h.controller.SendEOS(ctx)
return h.controller.Info, nil
return (*livekit.EgressInfo)(h.controller.Info), nil
}
58 changes: 58 additions & 0 deletions pkg/info/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package info

import (
"net/http"
"time"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/livekit"
"github.com/livekit/psrpc"
)

type EgressInfo livekit.EgressInfo

const (
MsgStartNotReceived = "Start signal not received"
MsgLimitReached = "Session limit reached"
MsgLimitReachedWithoutStart = "Session limit reached before start signal"
MsgStoppedBeforeStarted = "Stop called before pipeline could start"
)

func (e *EgressInfo) UpdateStatus(status livekit.EgressStatus) {
e.Status = status
e.UpdatedAt = time.Now().UnixNano()
}

func (e *EgressInfo) SetLimitReached() {
now := time.Now().UnixNano()
e.Status = livekit.EgressStatus_EGRESS_LIMIT_REACHED
e.Details = MsgLimitReached
e.UpdatedAt = now
e.EndedAt = now
}

func (e *EgressInfo) SetAborted(msg string) {
now := time.Now().UnixNano()
e.Status = livekit.EgressStatus_EGRESS_ABORTED
if e.Details == "" {
e.Details = msg
} else {
e.Details = e.Details + "; " + msg
}
e.UpdatedAt = now
e.EndedAt = now
}

func (e *EgressInfo) SetFailed(err error) {
now := time.Now().UnixNano()
e.Status = livekit.EgressStatus_EGRESS_FAILED
e.UpdatedAt = now
e.EndedAt = now
e.Error = err.Error()
var perr psrpc.Error
if errors.As(err, &perr) {
e.ErrorCode = int32(perr.ToHttp())
} else {
e.ErrorCode = int32(http.StatusInternalServerError)
}
}
Loading
Loading