diff --git a/pkg/info/io.go b/pkg/info/io.go index 62998c97..561a7de5 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -21,6 +21,7 @@ import ( "time" "github.com/frostbyte73/core" + "go.uber.org/atomic" "github.com/livekit/egress/pkg/errors" "github.com/livekit/protocol/egress" @@ -31,13 +32,15 @@ import ( ) const ( - ioTimeout = time.Second * 30 + ioTimeout = time.Second * 30 + maxBackoff = time.Minute * 10 ) type IOClient interface { CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error + IsHealthy() bool Drain() } @@ -45,15 +48,21 @@ type ioClient struct { rpc.IOInfoClient mu sync.Mutex - egresses map[string]*egressIOClient + egresses map[string]*egressCreation + updates chan *update + + healthy atomic.Bool + draining core.Fuse + done core.Fuse } -type egressIOClient struct { - created core.Fuse - aborted core.Fuse +type egressCreation struct { + pending *update +} - mu sync.Mutex - pending chan *livekit.EgressInfo +type update struct { + ctx context.Context + info *livekit.EgressInfo } func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { @@ -62,16 +71,20 @@ func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { return nil, err } - return &ioClient{ + c := &ioClient{ IOInfoClient: client, - egresses: make(map[string]*egressIOClient), - }, nil + egresses: make(map[string]*egressCreation), + updates: make(chan *update, 1000), + } + c.healthy.Store(true) + go c.updateWorker() + + return c, nil } func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error { - e := &egressIOClient{ - pending: make(chan *livekit.EgressInfo, 10), - } + e := &egressCreation{} + c.mu.Lock() c.egresses[info.EgressId] = e c.mu.Unlock() @@ -79,84 +92,92 @@ func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) c errChan := make(chan error, 1) go func() { _, err := c.IOInfoClient.CreateEgress(ctx, info) + + c.mu.Lock() + defer c.mu.Unlock() + + delete(c.egresses, info.EgressId) + if err != nil { logger.Errorw("failed to create egress", err) - e.aborted.Break() + if errors.Is(err, psrpc.ErrRequestTimedOut) && c.healthy.Swap(false) { + logger.Infow("io connection unhealthy") + } errChan <- err + return + } else if !c.healthy.Swap(true) { + logger.Infow("io connection restored") + } - c.mu.Lock() - delete(c.egresses, info.EgressId) - c.mu.Unlock() - } else { - e.created.Break() - errChan <- nil + if e.pending != nil { + c.updates <- e.pending } + errChan <- nil }() return errChan } func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error { - c.mu.Lock() - e, ok := c.egresses[info.EgressId] - c.mu.Unlock() - if !ok { - return errors.ErrEgressNotFound + u := &update{ + ctx: ctx, + info: info, } - // ensure updates are sent sequentially - e.pending <- info - - select { - case <-e.created.Watch(): - // egress was created, continue - case <-e.aborted.Watch(): - // egress was aborted, ignore + c.mu.Lock() + if e, ok := c.egresses[info.EgressId]; ok { + e.pending = u + c.mu.Unlock() return nil } + c.mu.Unlock() - // ensure only one thread is sending updates sequentially - e.mu.Lock() - defer e.mu.Unlock() + c.updates <- u + return nil +} + +func (c *ioClient) updateWorker() { + draining := c.draining.Watch() for { select { - case update := <-e.pending: - var err error - for i := 0; i < 10; i++ { - _, err = c.IOInfoClient.UpdateEgress(ctx, update) - if err == nil { - break + case u := <-c.updates: + c.sendUpdate(u) + case <-draining: + c.done.Break() + return + } + } +} + +func (c *ioClient) sendUpdate(u *update) { + d := time.Millisecond * 250 + for { + if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil { + if errors.Is(err, psrpc.ErrRequestTimedOut) { + if c.healthy.Swap(false) { + logger.Infow("io connection unhealthy") } - time.Sleep(time.Millisecond * 100 * time.Duration(i)) - } - if err != nil { - logger.Warnw("failed to update egress", err, "egressID", update.EgressId) - return err + d = min(d*2, maxBackoff) + time.Sleep(d) + continue } - requestType, outputType := egress.GetTypes(update.Request) - logger.Infow(strings.ToLower(update.Status.String()), - "requestType", requestType, - "outputType", outputType, - "error", update.Error, - "code", update.ErrorCode, - "details", update.Details, - ) - - switch update.Status { - case livekit.EgressStatus_EGRESS_COMPLETE, - livekit.EgressStatus_EGRESS_FAILED, - livekit.EgressStatus_EGRESS_ABORTED, - livekit.EgressStatus_EGRESS_LIMIT_REACHED: - // egress is done, delete ioEgressClient - c.mu.Lock() - delete(c.egresses, info.EgressId) - c.mu.Unlock() - } + logger.Errorw("failed to update egress", err) + return + } - default: - return nil + if !c.healthy.Swap(true) { + logger.Infow("io connection restored") } + requestType, outputType := egress.GetTypes(u.info.Request) + logger.Infow(strings.ToLower(u.info.Status.String()), + "requestType", requestType, + "outputType", outputType, + "error", u.info.Error, + "code", u.info.ErrorCode, + "details", u.info.Details, + ) + return } } @@ -170,15 +191,11 @@ func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequ return nil } +func (c *ioClient) IsHealthy() bool { + return c.healthy.Load() +} + func (c *ioClient) Drain() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for range ticker.C { - c.mu.Lock() - if len(c.egresses) == 0 { - c.mu.Unlock() - return - } - c.mu.Unlock() - } + c.draining.Break() + <-c.done.Watch() } diff --git a/pkg/server/server.go b/pkg/server/server.go index 6ba43e6f..9f9035a1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -154,13 +154,11 @@ func (s *Server) Run() error { } func (s *Server) Status() ([]byte, error) { - info := map[string]interface{}{ + status := map[string]interface{}{ "CpuLoad": s.monitor.GetAvailableCPU(), } - - s.GetStatus(info) - - return json.Marshal(info) + s.GetStatus(status) + return json.Marshal(status) } func (s *Server) IsIdle() bool { @@ -168,7 +166,7 @@ func (s *Server) IsIdle() bool { } func (s *Server) IsDisabled() bool { - return s.shutdown.IsBroken() + return s.shutdown.IsBroken() || !s.ioClient.IsHealthy() } func (s *Server) IsTerminating() bool {