Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service refactor #674

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package main
import (
"net/http"

"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/pkg/server"
"github.com/livekit/protocol/logger"
)

type httpHandler struct {
svc *service.Service
svc *server.Server
}

func (h *httpHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
Expand Down
30 changes: 8 additions & 22 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/handler"
"github.com/livekit/egress/pkg/server"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -114,15 +115,10 @@ func runService(c *cli.Context) error {
if err != nil {
return err
}
svc, err := service.NewService(conf, ioClient)
svc, err := server.NewServer(conf, bus, ioClient)
if err != nil {
return err
}
psrpcServer, err := rpc.NewEgressInternalServer(svc, bus)
if err != nil {
return err
}
svc.Register(psrpcServer)

if conf.HealthPort != 0 {
go func() {
Expand All @@ -140,10 +136,10 @@ func runService(c *cli.Context) error {
select {
case sig := <-stopChan:
logger.Infow("exit requested, finishing recording then shutting down", "signal", sig)
svc.Stop(false)
svc.Shutdown(false)
case sig := <-killChan:
logger.Infow("exit requested, stopping recording and shutting down", "signal", sig)
svc.Stop(true)
svc.Shutdown(true)
}
}()

Expand All @@ -157,14 +153,7 @@ func runService(c *cli.Context) error {
return err
}

svc.StartDebugHandlers()

if err = svc.RegisterListEgress(""); err != nil {
return err
}
err = svc.Run()
svc.Close()
return err
return svc.Run()
}

func runHandler(c *cli.Context) error {
Expand Down Expand Up @@ -202,11 +191,7 @@ func runHandler(c *cli.Context) error {
signal.Notify(killChan, syscall.SIGINT)

bus := psrpc.NewRedisMessageBus(rc)
ioClient, err := rpc.NewIOInfoClient(bus)
if err != nil {
return err
}
h, err := handler.NewHandler(conf, bus, ioClient)
h, err := handler.NewHandler(conf, bus)
if err != nil {
if errors.IsFatal(err) {
// service will send info update and shut down
Expand All @@ -224,5 +209,6 @@ func runHandler(c *cli.Context) error {
h.Kill()
}()

return h.Run()
h.Run()
return nil
}
2 changes: 1 addition & 1 deletion magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Proto() error {
" --plugin=go=%s"+
" --plugin=go-grpc=%s"+
" -I%s -I=. ipc.proto",
protocGoPath, protocGrpcGoPath, pi.Dir,
protocGoPath, protocGrpcGoPath, pi.Dir+"/protobufs",
))
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ var (
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU")
ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down")
)

func New(err string) error {
Expand Down
59 changes: 20 additions & 39 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/protocol/livekit"
Expand All @@ -42,20 +41,18 @@ type Handler struct {
rpcServer rpc.EgressHandlerServer
ipcHandlerServer *grpc.Server
ipcServiceClient ipc.EgressServiceClient
ioClient rpc.IOInfoClient
initialized core.Fuse
kill core.Fuse
}

func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Handler, error) {
func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, error) {
ipcClient, err := ipc.NewServiceClient(path.Join(conf.TmpDir[:strings.LastIndex(conf.TmpDir, "/")], conf.NodeID))
if err != nil {
return nil, err
}

h := &Handler{
conf: conf,
ioClient: ioClient,
ipcHandlerServer: grpc.NewServer(),
ipcServiceClient: ipcClient,
}
Expand Down Expand Up @@ -83,59 +80,43 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
return nil, err
}

h.controller, err = pipeline.New(context.Background(), conf, h.ioClient)
h.controller, err = pipeline.New(context.Background(), conf, h.ipcServiceClient)
h.initialized.Break()
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
conf.Info.SetFailed(err)
_, _ = h.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(conf.Info))
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(conf.Info))
}
return nil, err
}

return h, nil
}

func (h *Handler) Run() error {
func (h *Handler) Run() {
ctx, span := tracer.Start(context.Background(), "Handler.Run")
defer span.End()

// start egress
result := make(chan *info.EgressInfo, 1)
go func() {
result <- h.controller.Run(ctx)
}()

kill := h.kill.Watch()
for {
select {
case <-kill:
// kill signal received
h.conf.Info.Details = "service terminated by deployment"
h.controller.SendEOS(ctx)

case res := <-result:
// recording finished
_, _ = h.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(res))

m, err := h.GenerateMetrics(ctx)
if err == nil {
h.ipcServiceClient.HandlerShuttingDown(ctx, &ipc.HandlerShuttingDownRequest{
EgressId: h.conf.Info.EgressId,
Metrics: m,
})
} else {
logger.Errorw("failed generating handler metrics", err)
}

h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
return nil
}
res := h.controller.Run(ctx)
m, err := h.GenerateMetrics(ctx)
if err != nil {
logger.Errorw("failed to generate handler metrics", err)
}

_, _ = h.ipcServiceClient.HandlerFinished(ctx, &ipc.HandlerFinishedRequest{
EgressId: h.conf.Info.EgressId,
Metrics: m,
Info: (*livekit.EgressInfo)(res),
})

h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
}

func (h *Handler) Kill() {
h.kill.Break()
// kill signal received
h.conf.Info.Details = "service terminated by deployment"
h.controller.SendEOS(context.Background())
}
Loading
Loading