Skip to content

Commit

Permalink
update io client
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Sep 24, 2024
1 parent 978b7da commit c2a9566
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 84 deletions.
173 changes: 95 additions & 78 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/frostbyte73/core"
"go.uber.org/atomic"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
Expand All @@ -31,29 +32,37 @@ import (
)

const (
ioTimeout = time.Second * 30
ioTimeout = time.Second * 30
maxBackoff = time.Minute * 10
)

type IOClient interface {
CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error
IsHealthy() bool
Drain()
}

type ioClient struct {
rpc.IOInfoClient

mu sync.Mutex
egresses map[string]*egressIOClient
egresses map[string]*egressCreation
updates chan *update

healthy atomic.Bool
draining core.Fuse
done core.Fuse
}

type egressIOClient struct {
created core.Fuse
aborted core.Fuse
type egressCreation struct {
pending *update
}

mu sync.Mutex
pending chan *livekit.EgressInfo
type update struct {
ctx context.Context
info *livekit.EgressInfo
}

func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
Expand All @@ -62,101 +71,113 @@ func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
return nil, err
}

return &ioClient{
c := &ioClient{
IOInfoClient: client,
egresses: make(map[string]*egressIOClient),
}, nil
egresses: make(map[string]*egressCreation),
updates: make(chan *update, 1000),
}
c.healthy.Store(true)
go c.updateWorker()

return c, nil
}

func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error {
e := &egressIOClient{
pending: make(chan *livekit.EgressInfo, 10),
}
e := &egressCreation{}

c.mu.Lock()
c.egresses[info.EgressId] = e
c.mu.Unlock()

errChan := make(chan error, 1)
go func() {
_, err := c.IOInfoClient.CreateEgress(ctx, info)

c.mu.Lock()
defer c.mu.Unlock()

delete(c.egresses, info.EgressId)

if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()
if errors.Is(err, psrpc.ErrRequestTimedOut) && c.healthy.Swap(false) {
logger.Infow("io connection unhealthy")
}
errChan <- err
return
} else if !c.healthy.Swap(true) {
logger.Infow("io connection restored")
}

c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
} else {
e.created.Break()
errChan <- nil
if e.pending != nil {
c.updates <- e.pending
}
errChan <- nil
}()

return errChan
}

func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error {
c.mu.Lock()
e, ok := c.egresses[info.EgressId]
c.mu.Unlock()
if !ok {
return errors.ErrEgressNotFound
u := &update{
ctx: ctx,
info: info,
}

// ensure updates are sent sequentially
e.pending <- info

select {
case <-e.created.Watch():
// egress was created, continue
case <-e.aborted.Watch():
// egress was aborted, ignore
c.mu.Lock()
if e, ok := c.egresses[info.EgressId]; ok {
e.pending = u
c.mu.Unlock()
return nil
}
c.mu.Unlock()

// ensure only one thread is sending updates sequentially
e.mu.Lock()
defer e.mu.Unlock()
c.updates <- u
return nil
}

func (c *ioClient) updateWorker() {
draining := c.draining.Watch()
for {
select {
case update := <-e.pending:
var err error
for i := 0; i < 10; i++ {
_, err = c.IOInfoClient.UpdateEgress(ctx, update)
if err == nil {
break
case u := <-c.updates:
c.sendUpdate(u)
case <-draining:
c.done.Break()
return
}
}
}

func (c *ioClient) sendUpdate(u *update) {
d := time.Millisecond * 250
for {
if _, err := c.IOInfoClient.UpdateEgress(u.ctx, u.info); err != nil {
if errors.Is(err, psrpc.ErrRequestTimedOut) {
if c.healthy.Swap(false) {
logger.Infow("io connection unhealthy")
}
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
if err != nil {
logger.Warnw("failed to update egress", err, "egressID", update.EgressId)
return err
d = min(d*2, maxBackoff)
time.Sleep(d)
continue
}

requestType, outputType := egress.GetTypes(update.Request)
logger.Infow(strings.ToLower(update.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", update.Error,
"code", update.ErrorCode,
"details", update.Details,
)

switch update.Status {
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// egress is done, delete ioEgressClient
c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
}
logger.Errorw("failed to update egress", err)
return
}

default:
return nil
if !c.healthy.Swap(true) {
logger.Infow("io connection restored")
}
requestType, outputType := egress.GetTypes(u.info.Request)
logger.Infow(strings.ToLower(u.info.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", u.info.Error,
"code", u.info.ErrorCode,
"details", u.info.Details,
)
return
}
}

Expand All @@ -170,15 +191,11 @@ func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequ
return nil
}

func (c *ioClient) IsHealthy() bool {
return c.healthy.Load()
}

func (c *ioClient) Drain() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
if len(c.egresses) == 0 {
c.mu.Unlock()
return
}
c.mu.Unlock()
}
c.draining.Break()
<-c.done.Watch()
}
10 changes: 4 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,19 @@ func (s *Server) Run() error {
}

func (s *Server) Status() ([]byte, error) {
info := map[string]interface{}{
status := map[string]interface{}{
"CpuLoad": s.monitor.GetAvailableCPU(),
}

s.GetStatus(info)

return json.Marshal(info)
s.GetStatus(status)
return json.Marshal(status)
}

func (s *Server) IsIdle() bool {
return s.activeRequests.Load() == 0
}

func (s *Server) IsDisabled() bool {
return s.shutdown.IsBroken()
return s.shutdown.IsBroken() || !s.ioClient.IsHealthy()
}

func (s *Server) IsTerminating() bool {
Expand Down

0 comments on commit c2a9566

Please sign in to comment.