diff --git a/cmd/server/http.go b/cmd/server/http.go index 9fd1c5b7..93290fb4 100644 --- a/cmd/server/http.go +++ b/cmd/server/http.go @@ -17,12 +17,12 @@ package main import ( "net/http" - "github.com/livekit/egress/pkg/service" + "github.com/livekit/egress/pkg/server" "github.com/livekit/protocol/logger" ) type httpHandler struct { - svc *service.Service + svc *server.Server } func (h *httpHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { diff --git a/cmd/server/main.go b/cmd/server/main.go index 4b54dff1..6255e941 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -29,6 +29,7 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/handler" + "github.com/livekit/egress/pkg/server" "github.com/livekit/egress/pkg/service" "github.com/livekit/egress/version" "github.com/livekit/protocol/logger" @@ -114,15 +115,10 @@ func runService(c *cli.Context) error { if err != nil { return err } - svc, err := service.NewService(conf, ioClient) + svc, err := server.NewServer(conf, bus, ioClient) if err != nil { return err } - psrpcServer, err := rpc.NewEgressInternalServer(svc, bus) - if err != nil { - return err - } - svc.Register(psrpcServer) if conf.HealthPort != 0 { go func() { @@ -140,10 +136,10 @@ func runService(c *cli.Context) error { select { case sig := <-stopChan: logger.Infow("exit requested, finishing recording then shutting down", "signal", sig) - svc.Stop(false) + svc.Shutdown(false) case sig := <-killChan: logger.Infow("exit requested, stopping recording and shutting down", "signal", sig) - svc.Stop(true) + svc.Shutdown(true) } }() @@ -157,14 +153,7 @@ func runService(c *cli.Context) error { return err } - svc.StartDebugHandlers() - - if err = svc.RegisterListEgress(""); err != nil { - return err - } - err = svc.Run() - svc.Close() - return err + return svc.Run() } func runHandler(c *cli.Context) error { @@ -202,11 +191,7 @@ func runHandler(c *cli.Context) error { signal.Notify(killChan, syscall.SIGINT) bus := psrpc.NewRedisMessageBus(rc) - ioClient, err := rpc.NewIOInfoClient(bus) - if err != nil { - return err - } - h, err := handler.NewHandler(conf, bus, ioClient) + h, err := handler.NewHandler(conf, bus) if err != nil { if errors.IsFatal(err) { // service will send info update and shut down @@ -224,5 +209,6 @@ func runHandler(c *cli.Context) error { h.Kill() }() - return h.Run() + h.Run() + return nil } diff --git a/magefile.go b/magefile.go index c9f47c30..d911f1e5 100644 --- a/magefile.go +++ b/magefile.go @@ -77,7 +77,7 @@ func Proto() error { " --plugin=go=%s"+ " --plugin=go-grpc=%s"+ " -I%s -I=. ipc.proto", - protocGoPath, protocGrpcGoPath, pi.Dir, + protocGoPath, protocGrpcGoPath, pi.Dir+"/protobufs", )) } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 56484dae..4d23a7c6 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -32,9 +32,10 @@ var ( ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress") ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") - ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track") ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found") + ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track") ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU") + ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down") ) func New(err string) error { diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index ac00812f..b0dd8f5c 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -24,7 +24,6 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" - "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/egress/pkg/pipeline" "github.com/livekit/protocol/livekit" @@ -42,12 +41,11 @@ type Handler struct { rpcServer rpc.EgressHandlerServer ipcHandlerServer *grpc.Server ipcServiceClient ipc.EgressServiceClient - ioClient rpc.IOInfoClient initialized core.Fuse kill core.Fuse } -func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Handler, error) { +func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, error) { ipcClient, err := ipc.NewServiceClient(path.Join(conf.TmpDir[:strings.LastIndex(conf.TmpDir, "/")], conf.NodeID)) if err != nil { return nil, err @@ -55,7 +53,6 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. h := &Handler{ conf: conf, - ioClient: ioClient, ipcHandlerServer: grpc.NewServer(), ipcServiceClient: ipcClient, } @@ -83,13 +80,13 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. return nil, err } - h.controller, err = pipeline.New(context.Background(), conf, h.ioClient) + h.controller, err = pipeline.New(context.Background(), conf, h.ipcServiceClient) h.initialized.Break() if err != nil { if !errors.IsFatal(err) { // user error, send update conf.Info.SetFailed(err) - _, _ = h.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(conf.Info)) + _, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(conf.Info)) } return nil, err } @@ -97,45 +94,29 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc. return h, nil } -func (h *Handler) Run() error { +func (h *Handler) Run() { ctx, span := tracer.Start(context.Background(), "Handler.Run") defer span.End() // start egress - result := make(chan *info.EgressInfo, 1) - go func() { - result <- h.controller.Run(ctx) - }() - - kill := h.kill.Watch() - for { - select { - case <-kill: - // kill signal received - h.conf.Info.Details = "service terminated by deployment" - h.controller.SendEOS(ctx) - - case res := <-result: - // recording finished - _, _ = h.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(res)) - - m, err := h.GenerateMetrics(ctx) - if err == nil { - h.ipcServiceClient.HandlerShuttingDown(ctx, &ipc.HandlerShuttingDownRequest{ - EgressId: h.conf.Info.EgressId, - Metrics: m, - }) - } else { - logger.Errorw("failed generating handler metrics", err) - } - - h.rpcServer.Shutdown() - h.ipcHandlerServer.Stop() - return nil - } + res := h.controller.Run(ctx) + m, err := h.GenerateMetrics(ctx) + if err != nil { + logger.Errorw("failed to generate handler metrics", err) } + + _, _ = h.ipcServiceClient.HandlerFinished(ctx, &ipc.HandlerFinishedRequest{ + EgressId: h.conf.Info.EgressId, + Metrics: m, + Info: (*livekit.EgressInfo)(res), + }) + + h.rpcServer.Shutdown() + h.ipcHandlerServer.Stop() } func (h *Handler) Kill() { - h.kill.Break() + // kill signal received + h.conf.Info.Details = "service terminated by deployment" + h.controller.SendEOS(context.Background()) } diff --git a/pkg/ipc/ipc.pb.go b/pkg/ipc/ipc.pb.go index 5704bdaa..99a4fdb6 100644 --- a/pkg/ipc/ipc.pb.go +++ b/pkg/ipc/ipc.pb.go @@ -15,12 +15,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v3.21.12 +// protoc v4.25.3 // source: ipc.proto package ipc import ( + livekit "github.com/livekit/protocol/livekit" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -82,17 +83,18 @@ func (x *HandlerReadyRequest) GetEgressId() string { return "" } -type HandlerShuttingDownRequest struct { +type HandlerFinishedRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - EgressId string `protobuf:"bytes,1,opt,name=egress_id,json=egressId,proto3" json:"egress_id,omitempty"` - Metrics string `protobuf:"bytes,2,opt,name=metrics,proto3" json:"metrics,omitempty"` + EgressId string `protobuf:"bytes,1,opt,name=egress_id,json=egressId,proto3" json:"egress_id,omitempty"` + Metrics string `protobuf:"bytes,2,opt,name=metrics,proto3" json:"metrics,omitempty"` + Info *livekit.EgressInfo `protobuf:"bytes,3,opt,name=info,proto3" json:"info,omitempty"` } -func (x *HandlerShuttingDownRequest) Reset() { - *x = HandlerShuttingDownRequest{} +func (x *HandlerFinishedRequest) Reset() { + *x = HandlerFinishedRequest{} if protoimpl.UnsafeEnabled { mi := &file_ipc_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -100,13 +102,13 @@ func (x *HandlerShuttingDownRequest) Reset() { } } -func (x *HandlerShuttingDownRequest) String() string { +func (x *HandlerFinishedRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*HandlerShuttingDownRequest) ProtoMessage() {} +func (*HandlerFinishedRequest) ProtoMessage() {} -func (x *HandlerShuttingDownRequest) ProtoReflect() protoreflect.Message { +func (x *HandlerFinishedRequest) ProtoReflect() protoreflect.Message { mi := &file_ipc_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -118,25 +120,32 @@ func (x *HandlerShuttingDownRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use HandlerShuttingDownRequest.ProtoReflect.Descriptor instead. -func (*HandlerShuttingDownRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use HandlerFinishedRequest.ProtoReflect.Descriptor instead. +func (*HandlerFinishedRequest) Descriptor() ([]byte, []int) { return file_ipc_proto_rawDescGZIP(), []int{1} } -func (x *HandlerShuttingDownRequest) GetEgressId() string { +func (x *HandlerFinishedRequest) GetEgressId() string { if x != nil { return x.EgressId } return "" } -func (x *HandlerShuttingDownRequest) GetMetrics() string { +func (x *HandlerFinishedRequest) GetMetrics() string { if x != nil { return x.Metrics } return "" } +func (x *HandlerFinishedRequest) GetInfo() *livekit.EgressInfo { + if x != nil { + return x.Info + } + return nil +} + type GstPipelineDebugDotRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -422,61 +431,69 @@ var File_ipc_proto protoreflect.FileDescriptor var file_ipc_proto_rawDesc = []byte{ 0x0a, 0x09, 0x69, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x69, 0x70, 0x63, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x32, 0x0a, - 0x13, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, - 0x64, 0x22, 0x53, 0x0a, 0x1a, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x53, 0x68, 0x75, 0x74, - 0x74, 0x69, 0x6e, 0x67, 0x44, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x1b, 0x0a, 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, - 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, - 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x1c, 0x0a, 0x1a, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, - 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x22, 0x38, 0x0a, 0x1b, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, - 0x69, 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x6f, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x64, 0x6f, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x61, - 0x0a, 0x0c, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, - 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, - 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x64, - 0x65, 0x62, 0x75, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, - 0x67, 0x22, 0x2e, 0x0a, 0x0d, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x5f, 0x66, 0x69, 0x6c, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x46, 0x69, 0x6c, - 0x65, 0x22, 0x10, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x22, 0x2b, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, - 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, - 0x32, 0xa5, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x42, 0x0a, 0x0c, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, - 0x64, 0x79, 0x12, 0x18, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, - 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x50, 0x0a, 0x13, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, - 0x72, 0x53, 0x68, 0x75, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x44, 0x6f, 0x77, 0x6e, 0x12, 0x1f, 0x2e, - 0x69, 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x53, 0x68, 0x75, 0x74, 0x74, - 0x69, 0x6e, 0x67, 0x44, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x32, 0xd6, 0x01, 0x0a, 0x0d, 0x45, 0x67, 0x72, - 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x55, 0x0a, 0x0e, 0x47, 0x65, - 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x6f, 0x74, 0x12, 0x1f, 0x2e, 0x69, - 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, - 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, - 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, - 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x33, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x12, 0x11, 0x2e, - 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x12, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x12, 0x13, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x69, 0x70, 0x63, 0x2e, - 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x70, - 0x6b, 0x67, 0x2f, 0x69, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, 0x6c, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x32, 0x0a, 0x13, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x67, + 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, + 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x64, 0x22, 0x78, 0x0a, 0x16, 0x48, 0x61, 0x6e, 0x64, 0x6c, + 0x65, 0x72, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x64, 0x12, 0x18, + 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x27, 0x0a, 0x04, 0x69, 0x6e, 0x66, 0x6f, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, + 0x2e, 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x69, 0x6e, 0x66, + 0x6f, 0x22, 0x1c, 0x0a, 0x1a, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, + 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, + 0x38, 0x0a, 0x1b, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x65, + 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x19, + 0x0a, 0x08, 0x64, 0x6f, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x64, 0x6f, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x61, 0x0a, 0x0c, 0x50, 0x50, 0x72, + 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x72, 0x6f, + 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0b, 0x70, 0x72, 0x6f, 0x66, 0x69, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x74, + 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x64, 0x65, 0x62, 0x75, 0x67, 0x22, 0x2e, 0x0a, 0x0d, + 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, + 0x0a, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x09, 0x70, 0x70, 0x72, 0x6f, 0x66, 0x46, 0x69, 0x6c, 0x65, 0x22, 0x10, 0x0a, 0x0e, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x2b, + 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0xdd, 0x01, 0x0a, 0x0d, + 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x42, 0x0a, + 0x0c, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x18, 0x2e, + 0x69, 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, + 0x00, 0x12, 0x3e, 0x0a, 0x0d, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x12, 0x13, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x45, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, + 0x00, 0x12, 0x48, 0x0a, 0x0f, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x46, 0x69, 0x6e, 0x69, + 0x73, 0x68, 0x65, 0x64, 0x12, 0x1b, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x6c, + 0x65, 0x72, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x32, 0xd6, 0x01, 0x0a, 0x0d, + 0x45, 0x67, 0x72, 0x65, 0x73, 0x73, 0x48, 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x55, 0x0a, + 0x0e, 0x47, 0x65, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, 0x65, 0x44, 0x6f, 0x74, 0x12, + 0x1f, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, 0x6e, + 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x20, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x47, 0x73, 0x74, 0x50, 0x69, 0x70, 0x65, 0x6c, 0x69, + 0x6e, 0x65, 0x44, 0x65, 0x62, 0x75, 0x67, 0x44, 0x6f, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x50, 0x50, 0x72, 0x6f, 0x66, + 0x12, 0x11, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x50, 0x50, 0x72, 0x6f, 0x66, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0a, 0x47, 0x65, 0x74, + 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x13, 0x2e, 0x69, 0x70, 0x63, 0x2e, 0x4d, 0x65, + 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x69, + 0x70, 0x63, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x65, 0x67, 0x72, 0x65, 0x73, + 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x69, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -494,31 +511,35 @@ func file_ipc_proto_rawDescGZIP() []byte { var file_ipc_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_ipc_proto_goTypes = []interface{}{ (*HandlerReadyRequest)(nil), // 0: ipc.HandlerReadyRequest - (*HandlerShuttingDownRequest)(nil), // 1: ipc.HandlerShuttingDownRequest + (*HandlerFinishedRequest)(nil), // 1: ipc.HandlerFinishedRequest (*GstPipelineDebugDotRequest)(nil), // 2: ipc.GstPipelineDebugDotRequest (*GstPipelineDebugDotResponse)(nil), // 3: ipc.GstPipelineDebugDotResponse (*PProfRequest)(nil), // 4: ipc.PProfRequest (*PProfResponse)(nil), // 5: ipc.PProfResponse (*MetricsRequest)(nil), // 6: ipc.MetricsRequest (*MetricsResponse)(nil), // 7: ipc.MetricsResponse - (*emptypb.Empty)(nil), // 8: google.protobuf.Empty + (*livekit.EgressInfo)(nil), // 8: livekit.EgressInfo + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty } var file_ipc_proto_depIdxs = []int32{ - 0, // 0: ipc.EgressService.HandlerReady:input_type -> ipc.HandlerReadyRequest - 1, // 1: ipc.EgressService.HandlerShuttingDown:input_type -> ipc.HandlerShuttingDownRequest - 2, // 2: ipc.EgressHandler.GetPipelineDot:input_type -> ipc.GstPipelineDebugDotRequest - 4, // 3: ipc.EgressHandler.GetPProf:input_type -> ipc.PProfRequest - 6, // 4: ipc.EgressHandler.GetMetrics:input_type -> ipc.MetricsRequest - 8, // 5: ipc.EgressService.HandlerReady:output_type -> google.protobuf.Empty - 8, // 6: ipc.EgressService.HandlerShuttingDown:output_type -> google.protobuf.Empty - 3, // 7: ipc.EgressHandler.GetPipelineDot:output_type -> ipc.GstPipelineDebugDotResponse - 5, // 8: ipc.EgressHandler.GetPProf:output_type -> ipc.PProfResponse - 7, // 9: ipc.EgressHandler.GetMetrics:output_type -> ipc.MetricsResponse - 5, // [5:10] is the sub-list for method output_type - 0, // [0:5] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 8, // 0: ipc.HandlerFinishedRequest.info:type_name -> livekit.EgressInfo + 0, // 1: ipc.EgressService.HandlerReady:input_type -> ipc.HandlerReadyRequest + 8, // 2: ipc.EgressService.HandlerUpdate:input_type -> livekit.EgressInfo + 1, // 3: ipc.EgressService.HandlerFinished:input_type -> ipc.HandlerFinishedRequest + 2, // 4: ipc.EgressHandler.GetPipelineDot:input_type -> ipc.GstPipelineDebugDotRequest + 4, // 5: ipc.EgressHandler.GetPProf:input_type -> ipc.PProfRequest + 6, // 6: ipc.EgressHandler.GetMetrics:input_type -> ipc.MetricsRequest + 9, // 7: ipc.EgressService.HandlerReady:output_type -> google.protobuf.Empty + 9, // 8: ipc.EgressService.HandlerUpdate:output_type -> google.protobuf.Empty + 9, // 9: ipc.EgressService.HandlerFinished:output_type -> google.protobuf.Empty + 3, // 10: ipc.EgressHandler.GetPipelineDot:output_type -> ipc.GstPipelineDebugDotResponse + 5, // 11: ipc.EgressHandler.GetPProf:output_type -> ipc.PProfResponse + 7, // 12: ipc.EgressHandler.GetMetrics:output_type -> ipc.MetricsResponse + 7, // [7:13] is the sub-list for method output_type + 1, // [1:7] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_ipc_proto_init() } @@ -540,7 +561,7 @@ func file_ipc_proto_init() { } } file_ipc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*HandlerShuttingDownRequest); i { + switch v := v.(*HandlerFinishedRequest); i { case 0: return &v.state case 1: diff --git a/pkg/ipc/ipc.proto b/pkg/ipc/ipc.proto index ccda578c..6ddb8ffb 100644 --- a/pkg/ipc/ipc.proto +++ b/pkg/ipc/ipc.proto @@ -18,19 +18,22 @@ package ipc; option go_package = "github.com/livekit/egress/pkg/ipc"; import "google/protobuf/empty.proto"; +import "livekit_egress.proto"; service EgressService { rpc HandlerReady(HandlerReadyRequest) returns (google.protobuf.Empty) {}; - rpc HandlerShuttingDown(HandlerShuttingDownRequest) returns (google.protobuf.Empty) {}; + rpc HandlerUpdate(livekit.EgressInfo) returns (google.protobuf.Empty) {}; + rpc HandlerFinished(HandlerFinishedRequest) returns (google.protobuf.Empty) {}; } message HandlerReadyRequest { string egress_id = 1; } -message HandlerShuttingDownRequest { +message HandlerFinishedRequest { string egress_id = 1; string metrics = 2; + livekit.EgressInfo info = 3; } service EgressHandler { diff --git a/pkg/ipc/ipc_grpc.pb.go b/pkg/ipc/ipc_grpc.pb.go index 84802445..3dd28d42 100644 --- a/pkg/ipc/ipc_grpc.pb.go +++ b/pkg/ipc/ipc_grpc.pb.go @@ -15,13 +15,14 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v3.21.12 +// - protoc v4.25.3 // source: ipc.proto package ipc import ( context "context" + livekit "github.com/livekit/protocol/livekit" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -34,8 +35,9 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - EgressService_HandlerReady_FullMethodName = "/ipc.EgressService/HandlerReady" - EgressService_HandlerShuttingDown_FullMethodName = "/ipc.EgressService/HandlerShuttingDown" + EgressService_HandlerReady_FullMethodName = "/ipc.EgressService/HandlerReady" + EgressService_HandlerUpdate_FullMethodName = "/ipc.EgressService/HandlerUpdate" + EgressService_HandlerFinished_FullMethodName = "/ipc.EgressService/HandlerFinished" ) // EgressServiceClient is the client API for EgressService service. @@ -43,7 +45,8 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type EgressServiceClient interface { HandlerReady(ctx context.Context, in *HandlerReadyRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) - HandlerShuttingDown(ctx context.Context, in *HandlerShuttingDownRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + HandlerUpdate(ctx context.Context, in *livekit.EgressInfo, opts ...grpc.CallOption) (*emptypb.Empty, error) + HandlerFinished(ctx context.Context, in *HandlerFinishedRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type egressServiceClient struct { @@ -63,9 +66,18 @@ func (c *egressServiceClient) HandlerReady(ctx context.Context, in *HandlerReady return out, nil } -func (c *egressServiceClient) HandlerShuttingDown(ctx context.Context, in *HandlerShuttingDownRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *egressServiceClient) HandlerUpdate(ctx context.Context, in *livekit.EgressInfo, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, EgressService_HandlerShuttingDown_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, EgressService_HandlerUpdate_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *egressServiceClient) HandlerFinished(ctx context.Context, in *HandlerFinishedRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, EgressService_HandlerFinished_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -77,7 +89,8 @@ func (c *egressServiceClient) HandlerShuttingDown(ctx context.Context, in *Handl // for forward compatibility type EgressServiceServer interface { HandlerReady(context.Context, *HandlerReadyRequest) (*emptypb.Empty, error) - HandlerShuttingDown(context.Context, *HandlerShuttingDownRequest) (*emptypb.Empty, error) + HandlerUpdate(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error) + HandlerFinished(context.Context, *HandlerFinishedRequest) (*emptypb.Empty, error) mustEmbedUnimplementedEgressServiceServer() } @@ -88,8 +101,11 @@ type UnimplementedEgressServiceServer struct { func (UnimplementedEgressServiceServer) HandlerReady(context.Context, *HandlerReadyRequest) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method HandlerReady not implemented") } -func (UnimplementedEgressServiceServer) HandlerShuttingDown(context.Context, *HandlerShuttingDownRequest) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method HandlerShuttingDown not implemented") +func (UnimplementedEgressServiceServer) HandlerUpdate(context.Context, *livekit.EgressInfo) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method HandlerUpdate not implemented") +} +func (UnimplementedEgressServiceServer) HandlerFinished(context.Context, *HandlerFinishedRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method HandlerFinished not implemented") } func (UnimplementedEgressServiceServer) mustEmbedUnimplementedEgressServiceServer() {} @@ -122,20 +138,38 @@ func _EgressService_HandlerReady_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } -func _EgressService_HandlerShuttingDown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(HandlerShuttingDownRequest) +func _EgressService_HandlerUpdate_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(livekit.EgressInfo) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(EgressServiceServer).HandlerShuttingDown(ctx, in) + return srv.(EgressServiceServer).HandlerUpdate(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: EgressService_HandlerShuttingDown_FullMethodName, + FullMethod: EgressService_HandlerUpdate_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(EgressServiceServer).HandlerShuttingDown(ctx, req.(*HandlerShuttingDownRequest)) + return srv.(EgressServiceServer).HandlerUpdate(ctx, req.(*livekit.EgressInfo)) + } + return interceptor(ctx, in, info, handler) +} + +func _EgressService_HandlerFinished_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HandlerFinishedRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EgressServiceServer).HandlerFinished(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: EgressService_HandlerFinished_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EgressServiceServer).HandlerFinished(ctx, req.(*HandlerFinishedRequest)) } return interceptor(ctx, in, info, handler) } @@ -152,8 +186,12 @@ var EgressService_ServiceDesc = grpc.ServiceDesc{ Handler: _EgressService_HandlerReady_Handler, }, { - MethodName: "HandlerShuttingDown", - Handler: _EgressService_HandlerShuttingDown_Handler, + MethodName: "HandlerUpdate", + Handler: _EgressService_HandlerUpdate_Handler, + }, + { + MethodName: "HandlerFinished", + Handler: _EgressService_HandlerFinished_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/pkg/pipeline/controller.go b/pkg/pipeline/controller.go index 51d4519a..33f6914a 100644 --- a/pkg/pipeline/controller.go +++ b/pkg/pipeline/controller.go @@ -27,6 +27,7 @@ import ( "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/gstreamer" "github.com/livekit/egress/pkg/info" + "github.com/livekit/egress/pkg/ipc" "github.com/livekit/egress/pkg/pipeline/builder" "github.com/livekit/egress/pkg/pipeline/sink" "github.com/livekit/egress/pkg/pipeline/source" @@ -34,7 +35,6 @@ import ( "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" "github.com/livekit/protocol/utils" ) @@ -47,12 +47,12 @@ type Controller struct { *config.PipelineConfig // gstreamer - src source.Source - p *gstreamer.Pipeline - sinks map[types.EgressType][]sink.Sink - streamBin *builder.StreamBin - callbacks *gstreamer.Callbacks - ioClient rpc.IOInfoClient + src source.Source + p *gstreamer.Pipeline + sinks map[types.EgressType][]sink.Sink + streamBin *builder.StreamBin + callbacks *gstreamer.Callbacks + ipcServiceClient ipc.EgressServiceClient // internal mu sync.Mutex @@ -65,7 +65,7 @@ type Controller struct { stopped core.Fuse } -func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoClient) (*Controller, error) { +func New(ctx context.Context, conf *config.PipelineConfig, ipcServiceClient ipc.EgressServiceClient) (*Controller, error) { ctx, span := tracer.Start(ctx, "Pipeline.New") defer span.End() @@ -76,9 +76,9 @@ func New(ctx context.Context, conf *config.PipelineConfig, ioClient rpc.IOInfoCl GstReady: make(chan struct{}), BuildReady: make(chan struct{}), }, - ioClient: ioClient, - gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), - monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), + ipcServiceClient: ipcServiceClient, + gstLogger: logger.GetLogger().(logger.ZapLogger).ToZap().WithOptions(zap.WithCaller(false)), + monitor: stats.NewHandlerMonitor(conf.NodeID, conf.ClusterID, conf.Info.EgressId), } c.callbacks.SetOnError(c.OnError) @@ -312,7 +312,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream if sendUpdate { c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) } return errs.ToError() @@ -372,7 +372,7 @@ func (c *Controller) removeSink(ctx context.Context, url string, streamErr error // only send updates if the egress will continue, otherwise it's handled by UpdateStream RPC if streamErr != nil { c.Info.UpdatedAt = time.Now().UnixNano() - _, _ = c.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) } return c.streamBin.RemoveStream(url) @@ -403,7 +403,7 @@ func (c *Controller) SendEOS(ctx context.Context) { case livekit.EgressStatus_EGRESS_ENDING, livekit.EgressStatus_EGRESS_LIMIT_REACHED: - _, _ = c.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info)) go func() { c.eosTimer = time.AfterFunc(time.Second*30, func() { @@ -532,7 +532,7 @@ func (c *Controller) updateStartTime(startedAt int64) { if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING { c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE) - _, _ = c.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(c.Info)) + _, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info)) } } diff --git a/pkg/service/service.go b/pkg/server/server.go similarity index 54% rename from pkg/service/service.go rename to pkg/server/server.go index 66fc7559..43bbaaac 100644 --- a/pkg/service/service.go +++ b/pkg/server/server.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package service +package server import ( "encoding/json" @@ -22,58 +22,61 @@ import ( "net/http" "os" "path" - "sync" "time" "github.com/frostbyte73/core" - dto "github.com/prometheus/client_model/go" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" - "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/ipc" + "github.com/livekit/egress/pkg/service" "github.com/livekit/egress/pkg/stats" "github.com/livekit/egress/version" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/psrpc" ) -type Service struct { +type Server struct { ipc.UnimplementedEgressServiceServer - *stats.Monitor + conf *config.ServiceConfig + + *service.ProcessManager + *service.MetricsService + *service.DebugService + monitor *stats.Monitor - conf *config.ServiceConfig psrpcServer rpc.EgressInternalServer ipcServiceServer *grpc.Server - ioClient rpc.IOInfoClient promServer *http.Server + ioClient rpc.IOInfoClient - mu sync.RWMutex - activeHandlers map[string]*Process - pendingMetrics []*dto.MetricFamily - - shutdown core.Fuse + activeRequests atomic.Int32 + shutdown core.Fuse } -func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service, error) { - s := &Service{ - Monitor: stats.NewMonitor(conf), +func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Server, error) { + pm := service.NewProcessManager() + + s := &Server{ conf: conf, + ProcessManager: pm, + MetricsService: service.NewMetricsService(pm), + DebugService: service.NewDebugService(pm), ipcServiceServer: grpc.NewServer(), ioClient: ioClient, - activeHandlers: make(map[string]*Process), } - tmpDir := path.Join(os.TempDir(), conf.NodeID) - if err := os.MkdirAll(tmpDir, 0755); err != nil { + monitor, err := stats.NewMonitor(conf, s) + if err != nil { return nil, err } + s.monitor = monitor - ipc.RegisterEgressServiceServer(s.ipcServiceServer, s) - if err := ipc.StartServiceListener(s.ipcServiceServer, tmpDir); err != nil { - return nil, err + if conf.DebugHandlerPort > 0 { + s.StartDebugHandlers(conf.DebugHandlerPort) } if conf.PrometheusPort > 0 { @@ -81,18 +84,7 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service Addr: fmt.Sprintf(":%d", conf.PrometheusPort), Handler: s.PromHandler(), } - } - if err := s.Start(s.conf, - s.promIsIdle, - s.promCanAcceptRequest, - s.promIsDisabled, - s.killProcess, - ); err != nil { - return nil, err - } - - if s.promServer != nil { promListener, err := net.Listen("tcp", s.promServer.Addr) if err != nil { return nil, err @@ -102,14 +94,29 @@ func NewService(conf *config.ServiceConfig, ioClient rpc.IOInfoClient) (*Service }() } - return s, nil -} + tmpDir := path.Join(os.TempDir(), s.conf.NodeID) + if err := os.MkdirAll(tmpDir, 0755); err != nil { + return nil, err + } -func (s *Service) Register(psrpcServer rpc.EgressInternalServer) { + ipc.RegisterEgressServiceServer(s.ipcServiceServer, s) + if err := ipc.StartServiceListener(s.ipcServiceServer, tmpDir); err != nil { + return nil, err + } + + psrpcServer, err := rpc.NewEgressInternalServer(s, bus) + if err != nil { + return nil, err + } + if err = psrpcServer.RegisterListActiveEgressTopic(""); err != nil { + return nil, err + } s.psrpcServer = psrpcServer + + return s, nil } -func (s *Service) StartTemplatesServer(fs fs.FS) error { +func (s *Server) StartTemplatesServer(fs fs.FS) error { if s.conf.TemplatePort == 0 { logger.Debugw("templates server disabled") return nil @@ -129,11 +136,7 @@ func (s *Service) StartTemplatesServer(fs fs.FS) error { return nil } -func (s *Service) RegisterListEgress(topic string) error { - return s.psrpcServer.RegisterListActiveEgressTopic(topic) -} - -func (s *Service) Run() error { +func (s *Server) Run() error { logger.Debugw("starting service", "version", version.Version) if err := s.psrpcServer.RegisterStartEgressTopic(s.conf.ClusterID); err != nil { @@ -144,32 +147,29 @@ func (s *Service) Run() error { <-s.shutdown.Watch() logger.Infow("shutting down") + s.Drain() return nil } -func (s *Service) Reset() { - if !s.shutdown.IsBroken() { - s.Stop(false) - } - - s.shutdown = core.Fuse{} -} - -func (s *Service) Status() ([]byte, error) { +func (s *Server) Status() ([]byte, error) { info := map[string]interface{}{ - "CpuLoad": s.GetCPULoad(), + "CpuLoad": s.monitor.GetAvailableCPU(), } - s.mu.RLock() - defer s.mu.RUnlock() + s.GetStatus(info) - for _, h := range s.activeHandlers { - info[h.req.EgressId] = h.req.Request - } return json.Marshal(info) } -func (s *Service) Stop(kill bool) { +func (s *Server) IsIdle() bool { + return s.activeRequests.Load() == 0 +} + +func (s *Server) IsDisabled() bool { + return s.shutdown.IsBroken() +} + +func (s *Server) Shutdown(kill bool) { s.shutdown.Once(func() { s.psrpcServer.DeregisterStartEgressTopic(s.conf.ClusterID) }) @@ -178,53 +178,11 @@ func (s *Service) Stop(kill bool) { } } -func (s *Service) KillAll() { - s.mu.RLock() - defer s.mu.RUnlock() - - for _, h := range s.activeHandlers { - h.kill() - } -} - -func (s *Service) killProcess(egressID string, maxUsage float64) { - s.mu.RLock() - defer s.mu.RUnlock() - - if h, ok := s.activeHandlers[egressID]; ok { - err := errors.ErrCPUExhausted(maxUsage) - logger.Errorw("killing egress", err, "egressID", egressID) - - now := time.Now().UnixNano() - h.info.Status = livekit.EgressStatus_EGRESS_FAILED - h.info.Error = err.Error() - h.info.ErrorCode = int32(http.StatusForbidden) - h.info.UpdatedAt = now - h.info.EndedAt = now - h.kill() - } -} - -func (s *Service) Close() { - s.Monitor.Close() - - // activeHandlers might be empty if a request was just accepted - for s.GetRequestCount() > 0 { +func (s *Server) Drain() { + for !s.IsIdle() { time.Sleep(time.Second) } - // wait for final handler(s) to finish - for { - s.mu.RLock() - isIdle := len(s.activeHandlers) == 0 - s.mu.RUnlock() - - if isIdle { - logger.Infow("closing server") - s.psrpcServer.Shutdown() - return - } - - time.Sleep(time.Second) - } + logger.Infow("closing server") + s.psrpcServer.Shutdown() } diff --git a/pkg/server/server_ipc.go b/pkg/server/server_ipc.go new file mode 100644 index 00000000..06eef65d --- /dev/null +++ b/pkg/server/server_ipc.go @@ -0,0 +1,54 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/livekit/egress/pkg/ipc" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" +) + +func (s *Server) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) (*emptypb.Empty, error) { + if err := s.HandlerStarted(req.EgressId); err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil +} + +func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { + if _, err := s.ioClient.UpdateEgress(ctx, info); err != nil { + logger.Errorw("failed to update egress", err) + } + + return &emptypb.Empty{}, nil +} + +func (s *Server) HandlerFinished(ctx context.Context, req *ipc.HandlerFinishedRequest) (*emptypb.Empty, error) { + _, err := s.ioClient.UpdateEgress(ctx, req.Info) + if err != nil { + logger.Errorw("failed to update egress", err) + } + + if err = s.StoreProcessEndedMetrics(req.EgressId, req.Metrics); err != nil { + logger.Errorw("failed to store ms", err) + } + + return &emptypb.Empty{}, nil +} diff --git a/pkg/service/service_rpc.go b/pkg/server/server_rpc.go similarity index 56% rename from pkg/service/service_rpc.go rename to pkg/server/server_rpc.go index 6536488e..fb098edc 100644 --- a/pkg/service/service_rpc.go +++ b/pkg/server/server_rpc.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package service +package server import ( "context" @@ -35,11 +35,19 @@ import ( "github.com/livekit/protocol/utils" ) -func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { +func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { + s.activeRequests.Inc() + ctx, span := tracer.Start(ctx, "Service.StartEgress") defer span.End() - if err := s.AcceptRequest(req); err != nil { + if s.IsDisabled() { + s.activeRequests.Dec() + return nil, errors.ErrShuttingDown + } + + if err := s.monitor.AcceptRequest(req); err != nil { + s.activeRequests.Dec() return nil, err } @@ -47,13 +55,15 @@ func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) p, err := config.GetValidatedPipelineConfig(s.conf, req) if err != nil { - s.EgressAborted(req) + s.monitor.EgressAborted(req) + s.activeRequests.Dec() return nil, err } _, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info)) if err != nil { - s.EgressAborted(req) + s.monitor.EgressAborted(req) + s.activeRequests.Dec() return nil, err } @@ -66,34 +76,18 @@ func (s *Service) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) "request", p.Info.Request, ) - err = s.launchHandler(req, (*livekit.EgressInfo)(p.Info)) + err = s.launchProcess(req, (*livekit.EgressInfo)(p.Info)) if err != nil { - s.EgressAborted(req) + s.monitor.EgressAborted(req) + s.activeRequests.Dec() return nil, err } return (*livekit.EgressInfo)(p.Info), nil } -func (s *Service) StartEgressAffinity(_ context.Context, req *rpc.StartEgressRequest) float32 { - if !s.CanAcceptRequest(req) { - // cannot accept - return -1 - } - - if s.GetRequestCount() == 0 { - // group multiple track and track composite requests. - // if this instance is idle and another is already handling some, the request will go to that server. - // this avoids having many instances with one track request each, taking availability from room composite. - return 0.5 - } else { - // already handling a request and has available cpu - return 1 - } -} - -func (s *Service) launchHandler(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error { - _, span := tracer.Start(context.Background(), "Service.launchHandler") +func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error { + _, span := tracer.Start(context.Background(), "Service.launchProcess") defer span.End() handlerID := utils.NewGuid("EGH_") @@ -126,96 +120,72 @@ func (s *Service) launchHandler(req *rpc.StartEgressRequest, info *livekit.Egres cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - s.EgressStarted(req) - - h, err := NewProcess(context.Background(), handlerID, req, info, cmd, p.TmpDir) - if err != nil { - span.RecordError(err) - return err - } - - err = s.AddHandler(req.EgressId, h) - if err != nil { - span.RecordError(err) - return err - } - - return nil -} + s.monitor.EgressStarted(req) -func (s *Service) AddHandler(egressID string, p *Process) error { - s.mu.Lock() - s.activeHandlers[egressID] = p - s.mu.Unlock() - - if err := p.cmd.Start(); err != nil { - logger.Errorw("could not launch process", err) - return err - } - - select { - case <-p.ready: - s.UpdatePID(egressID, p.cmd.Process.Pid) + if err = s.Launch(context.Background(), handlerID, req, info, cmd, p.TmpDir); err != nil { + s.processEnded(req, info, err) + } else { + s.monitor.UpdatePID(info.EgressId, cmd.Process.Pid) go func() { - err := p.cmd.Wait() - s.processEnded(p, err) + err = cmd.Wait() + s.processEnded(req, info, err) }() - - case <-time.After(10 * time.Second): - logger.Warnw("no response from handler", nil, "egressID", egressID) - _ = p.cmd.Process.Kill() - _ = p.cmd.Wait() - s.processEnded(p, errors.ErrEgressNotFound) } return nil } -func (s *Service) processEnded(p *Process, err error) { +func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressInfo, err error) { if err != nil { now := time.Now().UnixNano() - p.info.UpdatedAt = now - p.info.EndedAt = now - p.info.Status = livekit.EgressStatus_EGRESS_FAILED - if p.info.Error == "" { - p.info.Error = "internal error" - p.info.ErrorCode = int32(http.StatusInternalServerError) + info.UpdatedAt = now + info.EndedAt = now + info.Status = livekit.EgressStatus_EGRESS_FAILED + if info.Error == "" { + info.Error = "internal error" + info.ErrorCode = int32(http.StatusInternalServerError) } - _, _ = s.ioClient.UpdateEgress(p.ctx, p.info) - if p.info.Error == "internal error" { - s.Stop(false) + _, _ = s.ioClient.UpdateEgress(context.Background(), info) + if info.Error == "internal error" { + s.Shutdown(false) } } - avgCPU, maxCPU := s.EgressEnded(p.req) + avgCPU, maxCPU := s.monitor.EgressEnded(req) if maxCPU > 0 { - _, _ = s.ioClient.UpdateMetrics(p.ctx, &rpc.UpdateMetricsRequest{ - Info: p.info, + _, _ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{ + Info: info, AvgCpuUsage: float32(avgCPU), MaxCpuUsage: float32(maxCPU), }) } - p.closed.Break() + s.ProcessFinished(info.EgressId) + s.activeRequests.Dec() +} + +func (s *Server) StartEgressAffinity(_ context.Context, req *rpc.StartEgressRequest) float32 { + if s.IsDisabled() || !s.monitor.CanAcceptRequest(req) { + // cannot accept + return -1 + } - s.mu.Lock() - delete(s.activeHandlers, p.req.EgressId) - s.mu.Unlock() + if s.activeRequests.Load() == 0 { + // group multiple track and track composite requests. + // if this instance is idle and another is already handling some, the request will go to that server. + // this avoids having many instances with one track request each, taking availability from room composite. + return 0.5 + } else { + // already handling a request and has available cpu + return 1 + } } -func (s *Service) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressRequest) (*rpc.ListActiveEgressResponse, error) { +func (s *Server) ListActiveEgress(ctx context.Context, _ *rpc.ListActiveEgressRequest) (*rpc.ListActiveEgressResponse, error) { ctx, span := tracer.Start(ctx, "Service.ListActiveEgress") defer span.End() - s.mu.RLock() - defer s.mu.RUnlock() - - egressIDs := make([]string, 0, len(s.activeHandlers)) - for egressID := range s.activeHandlers { - egressIDs = append(egressIDs, egressID) - } - return &rpc.ListActiveEgressResponse{ - EgressIds: egressIDs, + EgressIds: s.GetActiveEgressIDs(), }, nil } diff --git a/pkg/service/service_debug.go b/pkg/service/debug.go similarity index 82% rename from pkg/service/service_debug.go rename to pkg/service/debug.go index bc5cb003..c07af9d6 100644 --- a/pkg/service/service_debug.go +++ b/pkg/service/debug.go @@ -33,8 +33,18 @@ const ( pprofApp = "pprof" ) -func (s *Service) StartDebugHandlers() { - if s.conf.DebugHandlerPort == 0 { +type DebugService struct { + pm *ProcessManager +} + +func NewDebugService(pm *ProcessManager) *DebugService { + return &DebugService{ + pm: pm, + } +} + +func (s *DebugService) StartDebugHandlers(port int) { + if port == 0 { logger.Debugw("debug handler disabled") return } @@ -44,27 +54,14 @@ func (s *Service) StartDebugHandlers() { mux.HandleFunc(fmt.Sprintf("/%s/", pprofApp), s.handlePProf) go func() { - addr := fmt.Sprintf(":%d", s.conf.DebugHandlerPort) + addr := fmt.Sprintf(":%d", port) logger.Debugw(fmt.Sprintf("starting debug handler on address %s", addr)) _ = http.ListenAndServe(addr, mux) }() } -func (s *Service) GetGstPipelineDotFile(egressID string) (string, error) { - c, err := s.getGRPCClient(egressID) - if err != nil { - return "", err - } - - res, err := c.GetPipelineDot(context.Background(), &ipc.GstPipelineDebugDotRequest{}) - if err != nil { - return "", err - } - return res.DotFile, nil -} - // URL path format is "///" -func (s *Service) handleGstPipelineDotFile(w http.ResponseWriter, r *http.Request) { +func (s *DebugService) handleGstPipelineDotFile(w http.ResponseWriter, r *http.Request) { pathElements := strings.Split(r.URL.Path, "/") if len(pathElements) < 3 { http.Error(w, "malformed url", http.StatusNotFound) @@ -80,8 +77,21 @@ func (s *Service) handleGstPipelineDotFile(w http.ResponseWriter, r *http.Reques _, _ = w.Write([]byte(dotFile)) } +func (s *DebugService) GetGstPipelineDotFile(egressID string) (string, error) { + c, err := s.pm.GetGRPCClient(egressID) + if err != nil { + return "", err + } + + res, err := c.GetPipelineDot(context.Background(), &ipc.GstPipelineDebugDotRequest{}) + if err != nil { + return "", err + } + return res.DotFile, nil +} + // URL path format is "///" or "//" to profile the service -func (s *Service) handlePProf(w http.ResponseWriter, r *http.Request) { +func (s *DebugService) handlePProf(w http.ResponseWriter, r *http.Request) { var err error var b []byte @@ -96,7 +106,7 @@ func (s *Service) handlePProf(w http.ResponseWriter, r *http.Request) { case 4: egressID := pathElements[2] - c, err := s.getGRPCClient(egressID) + c, err := s.pm.GetGRPCClient(egressID) if err != nil { http.Error(w, "handler not found", http.StatusNotFound) return @@ -126,17 +136,6 @@ func (s *Service) handlePProf(w http.ResponseWriter, r *http.Request) { } } -func (s *Service) getGRPCClient(egressID string) (ipc.EgressHandlerClient, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - h, ok := s.activeHandlers[egressID] - if !ok { - return nil, errors.ErrEgressNotFound - } - return h.ipcHandlerClient, nil -} - func getErrorCode(err error) int { var e psrpc.Error diff --git a/pkg/service/ioclient.go b/pkg/service/io.go similarity index 76% rename from pkg/service/ioclient.go rename to pkg/service/io.go index d124ff48..26dbd3f3 100644 --- a/pkg/service/ioclient.go +++ b/pkg/service/io.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package service import ( @@ -72,7 +86,7 @@ func (c *IOClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o func (c *IOClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { _, err := c.IOInfoClient.UpdateMetrics(ctx, req, opts...) if err != nil { - logger.Errorw("failed to update metrics", err) + logger.Errorw("failed to update ms", err) return nil, err } return &emptypb.Empty{}, nil diff --git a/pkg/service/service_prom.go b/pkg/service/metrics.go similarity index 66% rename from pkg/service/service_prom.go rename to pkg/service/metrics.go index 1dbd14b2..382ba634 100644 --- a/pkg/service/service_prom.go +++ b/pkg/service/metrics.go @@ -18,6 +18,7 @@ import ( "context" "net/http" "strings" + "sync" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -25,13 +26,30 @@ import ( "github.com/prometheus/common/expfmt" "golang.org/x/exp/maps" - "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/tracer" ) -func (s *Service) CreateGatherer() prometheus.Gatherer { +type MetricsService struct { + pm *ProcessManager + + mu sync.Mutex + pendingMetrics []*dto.MetricFamily +} + +func NewMetricsService(pm *ProcessManager) *MetricsService { + return &MetricsService{ + pm: pm, + } +} + +func (s *MetricsService) PromHandler() http.Handler { + return promhttp.InstrumentMetricHandler( + prometheus.DefaultRegisterer, promhttp.HandlerFor(s.CreateGatherer(), promhttp.HandlerOpts{}), + ) +} + +func (s *MetricsService) CreateGatherer() prometheus.Gatherer { return prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { _, span := tracer.Start(context.Background(), "Service.GathererOfHandlerMetrics") defer span.End() @@ -39,73 +57,30 @@ func (s *Service) CreateGatherer() prometheus.Gatherer { gatherers := prometheus.Gatherers{} // Include the default repo gatherers = append(gatherers, prometheus.DefaultGatherer) - // Include process ended metrics + // Include Process ended ms gatherers = append(gatherers, prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { s.mu.Lock() - m := s.pendingMetrics s.pendingMetrics = nil - s.mu.Unlock() - return m, nil })) - s.mu.RLock() - // add all the active handlers as sources - for _, v := range s.activeHandlers { - gatherers = append(gatherers, v) - } - s.mu.RUnlock() + gatherers = append(gatherers, s.pm.GetGatherers()...) return gatherers.Gather() }) } -func (s *Service) PromHandler() http.Handler { - return promhttp.InstrumentMetricHandler( - prometheus.DefaultRegisterer, promhttp.HandlerFor(s.CreateGatherer(), promhttp.HandlerOpts{}), - ) -} - -func (s *Service) promIsIdle() float64 { - if !s.shutdown.IsBroken() && s.GetRequestCount() == 0 { - return 1 - } - return 0 -} - -func (s *Service) promCanAcceptRequest() float64 { - if s.shutdown.IsBroken() { - return 0 - } - if s.CanAcceptRequest(&rpc.StartEgressRequest{ - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: &livekit.RoomCompositeEgressRequest{}, - }, - }) { - return 1 - } - return 0 -} - -func (s *Service) promIsDisabled() float64 { - if s.shutdown.IsBroken() { - return 1 - } - return 0 -} - -func (s *Service) storeProcessEndedMetrics(egressID string, metrics string) error { +func (s *MetricsService) StoreProcessEndedMetrics(egressID string, metrics string) error { m, err := deserializeMetrics(egressID, metrics) if err != nil { return err } s.mu.Lock() - defer s.mu.Unlock() - s.pendingMetrics = append(s.pendingMetrics, m...) + s.mu.Unlock() return nil } @@ -114,7 +89,7 @@ func deserializeMetrics(egressID string, s string) ([]*dto.MetricFamily, error) parser := &expfmt.TextParser{} families, err := parser.TextToMetricFamilies(strings.NewReader(s)) if err != nil { - logger.Warnw("failed to parse metrics from handler", err, "egress_id", egressID) + logger.Warnw("failed to parse ms from handler", err, "egress_id", egressID) return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler } @@ -123,3 +98,28 @@ func deserializeMetrics(egressID string, s string) ([]*dto.MetricFamily, error) return maps.Values(families), nil } + +func applyDefaultLabel(egressID string, families map[string]*dto.MetricFamily) { + egressIDLabel := "egress_id" + egressLabelPair := &dto.LabelPair{ + Name: &egressIDLabel, + Value: &egressID, + } + for _, family := range families { + for _, metric := range family.Metric { + if metric.Label == nil { + metric.Label = make([]*dto.LabelPair, 0) + } + found := false + for _, label := range metric.Label { + if label.GetName() == "egress_id" { + found = true + break + } + } + if !found { + metric.Label = append(metric.Label, egressLabelPair) + } + } + } +} diff --git a/pkg/service/process.go b/pkg/service/process.go index a2ba4ff9..293c0c31 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -16,40 +16,47 @@ package service import ( "context" + "net/http" "os/exec" + "sync" "syscall" + "time" "github.com/frostbyte73/core" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" ) -type Process struct { - ctx context.Context - handlerID string - req *rpc.StartEgressRequest - info *livekit.EgressInfo - cmd *exec.Cmd - ipcHandlerClient ipc.EgressHandlerClient - ready chan struct{} - closed core.Fuse +const launchTimeout = 10 * time.Second + +type ProcessManager struct { + mu sync.RWMutex + activeHandlers map[string]*Process } -func NewProcess( +func NewProcessManager() *ProcessManager { + return &ProcessManager{ + activeHandlers: make(map[string]*Process), + } +} + +func (pm *ProcessManager) Launch( ctx context.Context, handlerID string, req *rpc.StartEgressRequest, info *livekit.EgressInfo, cmd *exec.Cmd, tmpDir string, -) (*Process, error) { +) error { ipcClient, err := ipc.NewHandlerClient(tmpDir) if err != nil { - return nil, err + return err } p := &Process{ @@ -62,15 +69,150 @@ func NewProcess( ready: make(chan struct{}), } - return p, nil + pm.mu.Lock() + pm.activeHandlers[info.EgressId] = p + pm.mu.Unlock() + + if err = cmd.Start(); err != nil { + logger.Errorw("could not launch process", err) + return err + } + + select { + case <-p.ready: + return nil + + case <-time.After(launchTimeout): + logger.Warnw("no response from handler", nil, "egressID", info.EgressId) + _ = cmd.Process.Kill() + _ = cmd.Wait() + return errors.ErrEgressNotFound + } } -// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler metrics +func (pm *ProcessManager) GetContext(egressID string) context.Context { + pm.mu.RLock() + defer pm.mu.RUnlock() + + if p, ok := pm.activeHandlers[egressID]; ok { + return p.ctx + } + + return context.Background() +} + +func (pm *ProcessManager) HandlerStarted(egressID string) error { + pm.mu.RLock() + defer pm.mu.RUnlock() + + if p, ok := pm.activeHandlers[egressID]; ok { + close(p.ready) + return nil + } + + return errors.ErrEgressNotFound +} + +func (pm *ProcessManager) GetActiveEgressIDs() []string { + pm.mu.RLock() + defer pm.mu.RUnlock() + + egressIDs := make([]string, 0, len(pm.activeHandlers)) + for egressID := range pm.activeHandlers { + egressIDs = append(egressIDs, egressID) + } + + return egressIDs +} + +func (pm *ProcessManager) GetStatus(info map[string]interface{}) { + pm.mu.RLock() + defer pm.mu.RUnlock() + + for _, h := range pm.activeHandlers { + info[h.req.EgressId] = h.req.Request + } +} + +func (pm *ProcessManager) GetGatherers() []prometheus.Gatherer { + pm.mu.RLock() + defer pm.mu.RUnlock() + + handlers := make([]prometheus.Gatherer, 0, len(pm.activeHandlers)) + for _, p := range pm.activeHandlers { + handlers = append(handlers, p) + } + + return handlers +} + +func (pm *ProcessManager) GetGRPCClient(egressID string) (ipc.EgressHandlerClient, error) { + pm.mu.RLock() + defer pm.mu.RUnlock() + + h, ok := pm.activeHandlers[egressID] + if !ok { + return nil, errors.ErrEgressNotFound + } + return h.ipcHandlerClient, nil +} + +func (pm *ProcessManager) KillAll() { + pm.mu.RLock() + defer pm.mu.RUnlock() + + for _, h := range pm.activeHandlers { + h.kill() + } +} + +func (pm *ProcessManager) KillProcess(egressID string, maxUsage float64) { + pm.mu.RLock() + defer pm.mu.RUnlock() + + if h, ok := pm.activeHandlers[egressID]; ok { + err := errors.ErrCPUExhausted(maxUsage) + logger.Errorw("killing egress", err, "egressID", egressID) + + now := time.Now().UnixNano() + h.info.Status = livekit.EgressStatus_EGRESS_FAILED + h.info.Error = err.Error() + h.info.ErrorCode = int32(http.StatusForbidden) + h.info.UpdatedAt = now + h.info.EndedAt = now + h.kill() + } +} + +func (pm *ProcessManager) ProcessFinished(egressID string) { + pm.mu.Lock() + defer pm.mu.Unlock() + + p, ok := pm.activeHandlers[egressID] + if ok { + p.closed.Break() + } + + delete(pm.activeHandlers, egressID) +} + +type Process struct { + ctx context.Context + handlerID string + req *rpc.StartEgressRequest + info *livekit.EgressInfo + cmd *exec.Cmd + ipcHandlerClient ipc.EgressHandlerClient + ready chan struct{} + closed core.Fuse +} + +// Gather implements the prometheus.Gatherer interface on server-side to allow aggregation of handler ms func (p *Process) Gather() ([]*dto.MetricFamily, error) { - // Get the metrics from the handler via IPC + // Get the ms from the handler via IPC metricsResponse, err := p.ipcHandlerClient.GetMetrics(context.Background(), &ipc.MetricsRequest{}) if err != nil { - logger.Warnw("failed to obtain metrics from handler", err, "egressID", p.req.EgressId) + logger.Warnw("failed to obtain ms from handler", err, "egressID", p.req.EgressId) return make([]*dto.MetricFamily, 0), nil // don't return an error, just skip this handler } @@ -79,34 +221,9 @@ func (p *Process) Gather() ([]*dto.MetricFamily, error) { } func (p *Process) kill() { - if !p.closed.IsBroken() { + p.closed.Once(func() { if err := p.cmd.Process.Signal(syscall.SIGINT); err != nil { logger.Errorw("failed to kill Process", err, "egressID", p.req.EgressId) } - } -} - -func applyDefaultLabel(egressID string, families map[string]*dto.MetricFamily) { - egressIDLabel := "egress_id" - egressLabelPair := &dto.LabelPair{ - Name: &egressIDLabel, - Value: &egressID, - } - for _, family := range families { - for _, metric := range family.Metric { - if metric.Label == nil { - metric.Label = make([]*dto.LabelPair, 0) - } - found := false - for _, label := range metric.Label { - if label.GetName() == "egress_id" { - found = true - break - } - } - if !found { - metric.Label = append(metric.Label, egressLabelPair) - } - } - } + }) } diff --git a/pkg/service/service_ipc.go b/pkg/service/service_ipc.go deleted file mode 100644 index a0f5dc4b..00000000 --- a/pkg/service/service_ipc.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package service - -import ( - "context" - - "google.golang.org/protobuf/types/known/emptypb" - - "github.com/livekit/egress/pkg/errors" - "github.com/livekit/egress/pkg/ipc" -) - -func (s *Service) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) (*emptypb.Empty, error) { - s.mu.RLock() - p, ok := s.activeHandlers[req.EgressId] - s.mu.RUnlock() - - if !ok { - return nil, errors.ErrEgressNotFound - } - - close(p.ready) - return &emptypb.Empty{}, nil -} - -func (s *Service) HandlerShuttingDown(_ context.Context, req *ipc.HandlerShuttingDownRequest) (*emptypb.Empty, error) { - err := s.storeProcessEndedMetrics(req.EgressId, req.Metrics) - if err != nil { - return nil, err - } - - return &emptypb.Empty{}, nil -} diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index aea5e748..d468bc35 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -31,21 +31,33 @@ import ( "github.com/livekit/protocol/utils/hwstats" ) +const ( + cpuHoldDuration = time.Second * 30 + defaultKillThreshold = 0.95 + minKillDuration = 10 +) + +type Service interface { + IsDisabled() bool + KillProcess(string, float64) +} + type Monitor struct { + nodeID string + clusterID string cpuCostConfig *config.CPUCostConfig promCPULoad prometheus.Gauge requestGauge *prometheus.GaugeVec + svc Service cpuStats *hwstats.CPUStats requests atomic.Int32 mu sync.Mutex highCPUDuration int - killProcess func(string, float64) pending map[string]*processStats procStats map[int]*processStats - closing bool } type processStats struct { @@ -60,80 +72,32 @@ type processStats struct { maxCPU float64 } -const ( - cpuHoldDuration = time.Second * 30 - defaultKillThreshold = 0.95 - minKillDuration = 10 -) - -func NewMonitor(conf *config.ServiceConfig) *Monitor { - return &Monitor{ +func NewMonitor(conf *config.ServiceConfig, svc Service) (*Monitor, error) { + m := &Monitor{ + nodeID: conf.NodeID, + clusterID: conf.ClusterID, cpuCostConfig: conf.CPUCostConfig, + svc: svc, pending: make(map[string]*processStats), procStats: make(map[int]*processStats), } -} - -func (m *Monitor) Start( - conf *config.ServiceConfig, - isIdle func() float64, - canAcceptRequest func() float64, - isDisabled func() float64, - killProcess func(string, float64), -) error { - m.killProcess = killProcess procStats, err := hwstats.NewProcCPUStats(m.updateEgressStats) if err != nil { - return err + return nil, err } m.cpuStats = procStats - if err = m.checkCPUConfig(); err != nil { - return err + if err = m.validateCPUConfig(); err != nil { + return nil, err } - promNodeAvailable := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "livekit", - Subsystem: "egress", - Name: "available", - ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "cluster_id": conf.ClusterID}, - }, isIdle) - - promCanAcceptRequest := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "livekit", - Subsystem: "egress", - Name: "can_accept_request", - ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "cluster_id": conf.ClusterID}, - }, canAcceptRequest) - - promIsDisabled := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "livekit", - Subsystem: "egress", - Name: "is_disabled", - ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "cluster_id": conf.ClusterID}, - }, isDisabled) - - m.promCPULoad = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "livekit", - Subsystem: "node", - Name: "cpu_load", - ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "node_type": "EGRESS", "cluster_id": conf.ClusterID}, - }) - - m.requestGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "livekit", - Subsystem: "egress", - Name: "requests", - ConstLabels: prometheus.Labels{"node_id": conf.NodeID, "cluster_id": conf.ClusterID}, - }, []string{"type"}) - - prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, m.promCPULoad, m.requestGauge) + m.initPrometheus() - return nil + return m, nil } -func (m *Monitor) checkCPUConfig() error { +func (m *Monitor) validateCPUConfig() error { requirements := []float64{ m.cpuCostConfig.RoomCompositeCpuCost, m.cpuCostConfig.AudioRoomCompositeCpuCost, @@ -172,83 +136,6 @@ func (m *Monitor) checkCPUConfig() error { return nil } -func (m *Monitor) GetCPULoad() float64 { - return (m.cpuStats.NumCPU() - m.cpuStats.GetCPUIdle()) / m.cpuStats.NumCPU() * 100 -} - -func (m *Monitor) GetRequestCount() int { - m.mu.Lock() - defer m.mu.Unlock() - - return int(m.requests.Load()) -} - -func (m *Monitor) UpdatePID(egressID string, pid int) { - m.mu.Lock() - defer m.mu.Unlock() - - ps := m.pending[egressID] - delete(m.pending, egressID) - - if existing := m.procStats[pid]; existing != nil { - ps.maxCPU = existing.maxCPU - ps.totalCPU = existing.totalCPU - ps.cpuCounter = existing.cpuCounter - } - m.procStats[pid] = ps -} - -func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { - load := 1 - idle/m.cpuStats.NumCPU() - m.promCPULoad.Set(load) - - m.mu.Lock() - defer m.mu.Unlock() - - maxUsage := 0.0 - var maxEgress string - for pid, cpuUsage := range usage { - procStats := m.procStats[pid] - if procStats == nil { - continue - } - - procStats.lastUsage = cpuUsage - procStats.totalCPU += cpuUsage - procStats.cpuCounter++ - if cpuUsage > procStats.maxCPU { - procStats.maxCPU = cpuUsage - } - - if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage { - maxUsage = cpuUsage - maxEgress = procStats.egressID - } - } - - killThreshold := defaultKillThreshold - if killThreshold <= m.cpuCostConfig.MaxCpuUtilization { - killThreshold = (1 + m.cpuCostConfig.MaxCpuUtilization) / 2 - } - - if load > killThreshold { - logger.Warnw("high cpu usage", nil, - "load", load, - "requests", m.requests.Load(), - ) - - if m.requests.Load() > 1 { - m.highCPUDuration++ - if m.highCPUDuration < minKillDuration { - return - } - m.killProcess(maxEgress, maxUsage) - } - } - - m.highCPUDuration = 0 -} - func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { m.mu.Lock() defer m.mu.Unlock() @@ -257,10 +144,6 @@ func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool { } func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { - if m.closing { - return false - } - total, available, pending, used := m.getCPUUsageLocked() var accept bool @@ -300,42 +183,6 @@ func (m *Monitor) canAcceptRequestLocked(req *rpc.StartEgressRequest) bool { return accept } -func (m *Monitor) GetAvailableCPU() float64 { - m.mu.Lock() - defer m.mu.Unlock() - - _, available, _, _ := m.getCPUUsageLocked() - return available -} - -func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64) { - total = m.cpuStats.NumCPU() - if m.requests.Load() == 0 { - // if no requests, use total - available = total - return - } - - for _, ps := range m.pending { - if ps.pendingUsage > ps.lastUsage { - pending += ps.pendingUsage - } else { - pending += ps.lastUsage - } - } - for _, ps := range m.procStats { - if ps.pendingUsage > ps.lastUsage { - used += ps.pendingUsage - } else { - used += ps.lastUsage - } - } - - // if already running requests, cap usage at MaxCpuUtilization - available = total*m.cpuCostConfig.MaxCpuUtilization - pending - used - return -} - func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { m.mu.Lock() defer m.mu.Unlock() @@ -379,6 +226,29 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { return nil } +func (m *Monitor) UpdatePID(egressID string, pid int) { + m.mu.Lock() + defer m.mu.Unlock() + + ps := m.pending[egressID] + delete(m.pending, egressID) + + if existing := m.procStats[pid]; existing != nil { + ps.maxCPU = existing.maxCPU + ps.totalCPU = existing.totalCPU + ps.cpuCounter = existing.cpuCounter + } + m.procStats[pid] = ps +} + +func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.pending, req.EgressId) + m.requests.Dec() +} + func (m *Monitor) EgressStarted(req *rpc.StartEgressRequest) { switch req.Request.(type) { case *rpc.StartEgressRequest_RoomComposite: @@ -424,17 +294,89 @@ func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64) { return 0, 0 } -func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) { +func (m *Monitor) GetAvailableCPU() float64 { m.mu.Lock() defer m.mu.Unlock() - delete(m.pending, req.EgressId) - m.requests.Dec() + _, available, _, _ := m.getCPUUsageLocked() + return available +} + +func (m *Monitor) getCPUUsageLocked() (total, available, pending, used float64) { + total = m.cpuStats.NumCPU() + if m.requests.Load() == 0 { + // if no requests, use total + available = total + return + } + + for _, ps := range m.pending { + if ps.pendingUsage > ps.lastUsage { + pending += ps.pendingUsage + } else { + pending += ps.lastUsage + } + } + for _, ps := range m.procStats { + if ps.pendingUsage > ps.lastUsage { + used += ps.pendingUsage + } else { + used += ps.lastUsage + } + } + + // if already running requests, cap usage at MaxCpuUtilization + available = total*m.cpuCostConfig.MaxCpuUtilization - pending - used + return } -func (m *Monitor) Close() { +func (m *Monitor) updateEgressStats(idle float64, usage map[int]float64) { + load := 1 - idle/m.cpuStats.NumCPU() + m.promCPULoad.Set(load) + m.mu.Lock() defer m.mu.Unlock() - m.closing = true + maxUsage := 0.0 + var maxEgress string + for pid, cpuUsage := range usage { + procStats := m.procStats[pid] + if procStats == nil { + continue + } + + procStats.lastUsage = cpuUsage + procStats.totalCPU += cpuUsage + procStats.cpuCounter++ + if cpuUsage > procStats.maxCPU { + procStats.maxCPU = cpuUsage + } + + if cpuUsage > procStats.allowedUsage && cpuUsage > maxUsage { + maxUsage = cpuUsage + maxEgress = procStats.egressID + } + } + + killThreshold := defaultKillThreshold + if killThreshold <= m.cpuCostConfig.MaxCpuUtilization { + killThreshold = (1 + m.cpuCostConfig.MaxCpuUtilization) / 2 + } + + if load > killThreshold { + logger.Warnw("high cpu usage", nil, + "load", load, + "requests", m.requests.Load(), + ) + + if m.requests.Load() > 1 { + m.highCPUDuration++ + if m.highCPUDuration < minKillDuration { + return + } + m.svc.KillProcess(maxEgress, maxUsage) + } + } + + m.highCPUDuration = 0 } diff --git a/pkg/stats/monitor_prom.go b/pkg/stats/monitor_prom.go new file mode 100644 index 00000000..70f522a9 --- /dev/null +++ b/pkg/stats/monitor_prom.go @@ -0,0 +1,86 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" +) + +func (m *Monitor) initPrometheus() { + promNodeAvailable := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "available", + ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, + }, m.promIsIdle) + + promCanAcceptRequest := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "can_accept_request", + ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, + }, m.promCanAcceptRequest) + + promIsDisabled := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "is_disabled", + ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, + }, m.promIsDisabled) + + m.promCPULoad = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "node", + Name: "cpu_load", + ConstLabels: prometheus.Labels{"node_id": m.nodeID, "node_type": "EGRESS", "cluster_id": m.clusterID}, + }) + + m.requestGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "livekit", + Subsystem: "egress", + Name: "requests", + ConstLabels: prometheus.Labels{"node_id": m.nodeID, "cluster_id": m.clusterID}, + }, []string{"type"}) + + prometheus.MustRegister(promNodeAvailable, promCanAcceptRequest, promIsDisabled, m.promCPULoad, m.requestGauge) +} + +func (m *Monitor) promIsIdle() float64 { + if !m.svc.IsDisabled() && m.requests.Load() == 0 { + return 1 + } + return 0 +} + +func (m *Monitor) promCanAcceptRequest() float64 { + if !m.svc.IsDisabled() && m.CanAcceptRequest(&rpc.StartEgressRequest{ + Request: &rpc.StartEgressRequest_RoomComposite{ + RoomComposite: &livekit.RoomCompositeEgressRequest{}, + }, + }) { + return 1 + } + return 0 +} + +func (m *Monitor) promIsDisabled() float64 { + if m.svc.IsDisabled() { + return 1 + } + return 0 +} diff --git a/test/integration.go b/test/integration.go index 4b9edd86..fd96230e 100644 --- a/test/integration.go +++ b/test/integration.go @@ -102,7 +102,7 @@ type testCase struct { func (r *Runner) awaitIdle(t *testing.T) { r.svc.KillAll() for i := 0; i < 30; i++ { - if r.svc.GetRequestCount() == 0 { + if r.svc.IsIdle() { return } time.Sleep(time.Second) diff --git a/test/integration_test.go b/test/integration_test.go index 9acf2a6a..1f96c9a4 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -23,9 +23,9 @@ import ( "github.com/stretchr/testify/require" + "github.com/livekit/egress/pkg/server" "github.com/livekit/egress/pkg/service" "github.com/livekit/protocol/redis" - "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" ) @@ -48,12 +48,8 @@ func TestEgress(t *testing.T) { ioClient, err := service.NewIOClient(bus) require.NoError(t, err) - svc, err := service.NewService(r.ServiceConfig, ioClient) + svc, err := server.NewServer(r.ServiceConfig, bus, ioClient) require.NoError(t, err) - psrpcServer, err := rpc.NewEgressInternalServer(svc, bus) - require.NoError(t, err) - svc.Register(psrpcServer) - r.Run(t, svc, bus, rfs) } diff --git a/test/runner.go b/test/runner.go index f330b259..0a6e3032 100644 --- a/test/runner.go +++ b/test/runner.go @@ -30,7 +30,6 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/egress/pkg/config" - "github.com/livekit/egress/pkg/service" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" @@ -39,7 +38,7 @@ import ( ) type Runner struct { - svc *service.Service `yaml:"-"` + svc Server `yaml:"-"` client rpc.EgressClient `yaml:"-"` room *lksdk.Room `yaml:"-"` updates chan *livekit.EgressInfo `yaml:"-"` @@ -69,6 +68,17 @@ type Runner struct { Short bool `yaml:"short"` } +type Server interface { + StartTemplatesServer(fs.FS) error + Run() error + Status() ([]byte, error) + GetGstPipelineDotFile(string) (string, error) + IsIdle() bool + KillAll() + Shutdown(bool) + Drain() +} + func NewRunner(t *testing.T) *Runner { confString := os.Getenv("EGRESS_CONFIG_STRING") if confString == "" { @@ -144,15 +154,15 @@ func NewRunner(t *testing.T) *Runner { return r } -func (r *Runner) Run(t *testing.T, svc *service.Service, bus psrpc.MessageBus, templateFs fs.FS) { +func (r *Runner) Run(t *testing.T, svc Server, bus psrpc.MessageBus, templateFs fs.FS) { lksdk.SetLogger(logger.LogRLogger(logr.Discard())) r.svc = svc t.Cleanup(func() { if r.room != nil { r.room.Disconnect() } - r.svc.Stop(true) - r.svc.Close() + r.svc.Shutdown(true) + r.svc.Drain() }) // connect to room @@ -168,9 +178,6 @@ func (r *Runner) Run(t *testing.T, svc *service.Service, bus psrpc.MessageBus, t psrpcClient, err := rpc.NewEgressClient(rpc.ClientParams{Bus: bus}) require.NoError(t, err) - // start debug handler - r.svc.StartDebugHandlers() - // start templates handler err = r.svc.StartTemplatesServer(templateFs) require.NoError(t, err)