diff --git a/cmd/server/main.go b/cmd/server/main.go index 08806525..447175af 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -29,8 +29,8 @@ import ( "github.com/livekit/egress/pkg/config" "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/handler" + "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/server" - "github.com/livekit/egress/pkg/service" "github.com/livekit/egress/version" "github.com/livekit/protocol/logger" lkredis "github.com/livekit/protocol/redis" @@ -111,7 +111,7 @@ func runService(c *cli.Context) error { } bus := psrpc.NewRedisMessageBus(rc) - ioClient, err := service.NewIOClient(bus) + ioClient, err := info.NewIOClient(bus) if err != nil { return err } diff --git a/pkg/service/io.go b/pkg/info/io.go similarity index 50% rename from pkg/service/io.go rename to pkg/info/io.go index 9cba79b2..6e3918da 100644 --- a/pkg/service/io.go +++ b/pkg/info/io.go @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -package service +package info import ( "context" + "strings" "sync" + "time" + "github.com/frostbyte73/core" "google.golang.org/protobuf/types/known/emptypb" "github.com/livekit/egress/pkg/errors" @@ -28,42 +31,85 @@ import ( "github.com/livekit/psrpc" ) -type IOClient struct { +type IOClient interface { rpc.IOInfoClient + Drain() +} + +type ioClient struct { + rpc.IOInfoClient + + mu sync.Mutex + egresses map[string]*egressIOClient +} + +type egressIOClient struct { + created core.Fuse + aborted core.Fuse mu sync.Mutex - updates chan *livekit.EgressInfo + pending chan *livekit.EgressInfo } -func NewIOClient(bus psrpc.MessageBus) (rpc.IOInfoClient, error) { +func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { client, err := rpc.NewIOInfoClient(bus) if err != nil { return nil, err } - return &IOClient{ + return &ioClient{ IOInfoClient: client, - updates: make(chan *livekit.EgressInfo, 10), + egresses: make(map[string]*egressIOClient), }, nil } -func (c *IOClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { +func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { + e := &egressIOClient{ + pending: make(chan *livekit.EgressInfo), + } + c.mu.Lock() + c.egresses[info.EgressId] = e + c.mu.Unlock() + _, err := c.IOInfoClient.CreateEgress(ctx, info, opts...) if err != nil { logger.Errorw("failed to create egress", err) + e.aborted.Break() + + c.mu.Lock() + delete(c.egresses, info.EgressId) + c.mu.Unlock() + return nil, err } + return &emptypb.Empty{}, nil } -func (c *IOClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { - c.updates <- info +func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { + c.mu.Lock() + e, ok := c.egresses[info.EgressId] + c.mu.Unlock() + if !ok { + return nil, errors.ErrEgressNotFound + } // ensure updates are sent sequentially - c.mu.Lock() - defer c.mu.Unlock() + e.pending <- info + + select { + case <-e.created.Watch(): + // egress was created, continue + case <-e.aborted.Watch(): + // egress was aborted, ignore + return &emptypb.Empty{}, nil + } + + // ensure only one thread is sending updates sequentially + e.mu.Lock() + defer e.mu.Unlock() for { select { - case update := <-c.updates: + case update := <-e.pending: _, err := c.IOInfoClient.UpdateEgress(ctx, update, opts...) if err != nil { logger.Errorw("failed to update egress", err) @@ -71,34 +117,32 @@ func (c *IOClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o } requestType, outputType := egress.GetTypes(update.Request) + logger.Infow(strings.ToLower(update.Status.String()), + "requestType", requestType, + "outputType", outputType, + "error", update.Error, + "code", update.ErrorCode, + "details", update.Details, + ) + switch update.Status { - case livekit.EgressStatus_EGRESS_FAILED: - logger.Warnw("egress failed", errors.New(update.Error), - "egressID", update.EgressId, - "requestType", requestType, - "outputType", outputType, - ) - case livekit.EgressStatus_EGRESS_COMPLETE: - logger.Infow("egress completed", - "egressID", update.EgressId, - "requestType", requestType, - "outputType", outputType, - ) - default: - logger.Infow("egress updated", - "egressID", update.EgressId, - "requestType", requestType, - "outputType", outputType, - "status", update.Status, - ) + case livekit.EgressStatus_EGRESS_COMPLETE, + livekit.EgressStatus_EGRESS_FAILED, + livekit.EgressStatus_EGRESS_ABORTED, + livekit.EgressStatus_EGRESS_LIMIT_REACHED: + // egress is done, delete ioEgressClient + c.mu.Lock() + delete(c.egresses, info.EgressId) + c.mu.Unlock() } + default: return &emptypb.Empty{}, nil } } } -func (c *IOClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { +func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) { _, err := c.IOInfoClient.UpdateMetrics(ctx, req, opts...) if err != nil { logger.Errorw("failed to update ms", err) @@ -106,3 +150,16 @@ func (c *IOClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequ } return &emptypb.Empty{}, nil } + +func (c *ioClient) Drain() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + c.mu.Lock() + if len(c.egresses) == 0 { + c.mu.Unlock() + return + } + c.mu.Unlock() + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 10d2cc0f..6ba43e6f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc" "github.com/livekit/egress/pkg/config" + "github.com/livekit/egress/pkg/info" "github.com/livekit/egress/pkg/ipc" "github.com/livekit/egress/pkg/service" "github.com/livekit/egress/pkg/stats" @@ -51,14 +52,14 @@ type Server struct { psrpcServer rpc.EgressInternalServer ipcServiceServer *grpc.Server promServer *http.Server - ioClient rpc.IOInfoClient + ioClient info.IOClient activeRequests atomic.Int32 terminating core.Fuse shutdown core.Fuse } -func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Server, error) { +func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient info.IOClient) (*Server, error) { pm := service.NewProcessManager() s := &Server{ @@ -146,9 +147,9 @@ func (s *Server) Run() error { logger.Infow("service ready") <-s.shutdown.Watch() - logger.Infow("shutting down") - + logger.Infow("draining") s.Drain() + logger.Infow("service stopped") return nil } @@ -191,6 +192,7 @@ func (s *Server) Drain() { time.Sleep(time.Second) } - logger.Infow("closing server") s.psrpcServer.Shutdown() + logger.Infow("draining io client") + s.ioClient.Drain() } diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 0c15a7da..3569e1b4 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -64,13 +64,6 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) ( return nil, err } - _, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info)) - if err != nil { - s.monitor.EgressAborted(req) - s.activeRequests.Dec() - return nil, err - } - requestType, outputType := egress.GetTypes(p.Info.Request) logger.Infow("request validated", "egressID", req.EgressId, @@ -87,6 +80,14 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) ( return nil, err } + _, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info)) + if err != nil { + s.AbortProcess(req.EgressId, err) + s.monitor.EgressAborted(req) + s.activeRequests.Dec() + return nil, err + } + return (*livekit.EgressInfo)(p.Info), nil } diff --git a/pkg/service/process.go b/pkg/service/process.go index 883db5ab..a6b4dd56 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -174,6 +174,16 @@ func (pm *ProcessManager) KillAll() { } } +func (pm *ProcessManager) AbortProcess(egressID string, err error) { + pm.mu.RLock() + defer pm.mu.RUnlock() + + if h, ok := pm.activeHandlers[egressID]; ok { + logger.Warnw("aborting egress", err, "egressID", egressID) + h.kill() + } +} + func (pm *ProcessManager) KillProcess(egressID string, maxUsage float64) { pm.mu.RLock() defer pm.mu.RUnlock()