Skip to content

Commit

Permalink
service refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed May 15, 2024
1 parent dd35b36 commit 132673b
Show file tree
Hide file tree
Showing 22 changed files with 875 additions and 748 deletions.
4 changes: 2 additions & 2 deletions cmd/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package main
import (
"net/http"

"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/pkg/server"
"github.com/livekit/protocol/logger"
)

type httpHandler struct {
svc *service.Service
svc *server.Server
}

func (h *httpHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
Expand Down
30 changes: 8 additions & 22 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/handler"
"github.com/livekit/egress/pkg/server"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -114,15 +115,10 @@ func runService(c *cli.Context) error {
if err != nil {
return err
}
svc, err := service.NewService(conf, ioClient)
svc, err := server.NewServer(conf, bus, ioClient)
if err != nil {
return err
}
psrpcServer, err := rpc.NewEgressInternalServer(svc, bus)
if err != nil {
return err
}
svc.Register(psrpcServer)

if conf.HealthPort != 0 {
go func() {
Expand All @@ -140,10 +136,10 @@ func runService(c *cli.Context) error {
select {
case sig := <-stopChan:
logger.Infow("exit requested, finishing recording then shutting down", "signal", sig)
svc.Stop(false)
svc.Shutdown(false)
case sig := <-killChan:
logger.Infow("exit requested, stopping recording and shutting down", "signal", sig)
svc.Stop(true)
svc.Shutdown(true)
}
}()

Expand All @@ -157,14 +153,7 @@ func runService(c *cli.Context) error {
return err
}

svc.StartDebugHandlers()

if err = svc.RegisterListEgress(""); err != nil {
return err
}
err = svc.Run()
svc.Close()
return err
return svc.Run()
}

func runHandler(c *cli.Context) error {
Expand Down Expand Up @@ -202,11 +191,7 @@ func runHandler(c *cli.Context) error {
signal.Notify(killChan, syscall.SIGINT)

bus := psrpc.NewRedisMessageBus(rc)
ioClient, err := rpc.NewIOInfoClient(bus)
if err != nil {
return err
}
h, err := handler.NewHandler(conf, bus, ioClient)
h, err := handler.NewHandler(conf, bus)
if err != nil {
if errors.IsFatal(err) {
// service will send info update and shut down
Expand All @@ -224,5 +209,6 @@ func runHandler(c *cli.Context) error {
h.Kill()
}()

return h.Run()
h.Run()
return nil
}
2 changes: 1 addition & 1 deletion magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Proto() error {
" --plugin=go=%s"+
" --plugin=go-grpc=%s"+
" -I%s -I=. ipc.proto",
protocGoPath, protocGrpcGoPath, pi.Dir,
protocGoPath, protocGrpcGoPath, pi.Dir+"/protobufs",
))
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ var (
ErrNonStreamingPipeline = psrpc.NewErrorf(psrpc.InvalidArgument, "UpdateStream called on non-streaming egress")
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU")
ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down")
)

func New(err string) error {
Expand Down
59 changes: 20 additions & 39 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline"
"github.com/livekit/protocol/livekit"
Expand All @@ -42,20 +41,18 @@ type Handler struct {
rpcServer rpc.EgressHandlerServer
ipcHandlerServer *grpc.Server
ipcServiceClient ipc.EgressServiceClient
ioClient rpc.IOInfoClient
initialized core.Fuse
kill core.Fuse
}

func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Handler, error) {
func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus) (*Handler, error) {
ipcClient, err := ipc.NewServiceClient(path.Join(conf.TmpDir[:strings.LastIndex(conf.TmpDir, "/")], conf.NodeID))
if err != nil {
return nil, err
}

h := &Handler{
conf: conf,
ioClient: ioClient,
ipcHandlerServer: grpc.NewServer(),
ipcServiceClient: ipcClient,
}
Expand Down Expand Up @@ -83,59 +80,43 @@ func NewHandler(conf *config.PipelineConfig, bus psrpc.MessageBus, ioClient rpc.
return nil, err
}

h.controller, err = pipeline.New(context.Background(), conf, h.ioClient)
h.controller, err = pipeline.New(context.Background(), conf, h.ipcServiceClient)
h.initialized.Break()
if err != nil {
if !errors.IsFatal(err) {
// user error, send update
conf.Info.SetFailed(err)
_, _ = h.ioClient.UpdateEgress(context.Background(), (*livekit.EgressInfo)(conf.Info))
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(conf.Info))
}
return nil, err
}

return h, nil
}

func (h *Handler) Run() error {
func (h *Handler) Run() {
ctx, span := tracer.Start(context.Background(), "Handler.Run")
defer span.End()

// start egress
result := make(chan *info.EgressInfo, 1)
go func() {
result <- h.controller.Run(ctx)
}()

kill := h.kill.Watch()
for {
select {
case <-kill:
// kill signal received
h.conf.Info.Details = "service terminated by deployment"
h.controller.SendEOS(ctx)

case res := <-result:
// recording finished
_, _ = h.ioClient.UpdateEgress(ctx, (*livekit.EgressInfo)(res))

m, err := h.GenerateMetrics(ctx)
if err == nil {
h.ipcServiceClient.HandlerShuttingDown(ctx, &ipc.HandlerShuttingDownRequest{
EgressId: h.conf.Info.EgressId,
Metrics: m,
})
} else {
logger.Errorw("failed generating handler metrics", err)
}

h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
return nil
}
res := h.controller.Run(ctx)
m, err := h.GenerateMetrics(ctx)
if err != nil {
logger.Errorw("failed to generate handler metrics", err)
}

_, _ = h.ipcServiceClient.HandlerFinished(ctx, &ipc.HandlerFinishedRequest{
EgressId: h.conf.Info.EgressId,
Metrics: m,
Info: (*livekit.EgressInfo)(res),
})

h.rpcServer.Shutdown()
h.ipcHandlerServer.Stop()
}

func (h *Handler) Kill() {
h.kill.Break()
// kill signal received
h.conf.Info.Details = "service terminated by deployment"
h.controller.SendEOS(context.Background())
}
Loading

0 comments on commit 132673b

Please sign in to comment.