Skip to content

Commit

Permalink
create async then wait
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Sep 6, 2024
1 parent 89be0e9 commit 388d49b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 43 deletions.
55 changes: 31 additions & 24 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/frostbyte73/core"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
Expand All @@ -36,7 +35,9 @@ const (
)

type IOClient interface {
rpc.IOInfoClient
CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error
Drain()
}

Expand Down Expand Up @@ -67,35 +68,40 @@ func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
}, nil
}

func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error {
e := &egressIOClient{
pending: make(chan *livekit.EgressInfo),
}
c.mu.Lock()
c.egresses[info.EgressId] = e
c.mu.Unlock()

_, err := c.IOInfoClient.CreateEgress(ctx, info, opts...)
if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()

c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
errChan := make(chan error, 1)
go func() {
_, err := c.IOInfoClient.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()
errChan <- err

return nil, err
}
c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
} else {
e.created.Break()
errChan <- nil
}
}()

return &emptypb.Empty{}, nil
return errChan
}

func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error {
c.mu.Lock()
e, ok := c.egresses[info.EgressId]
c.mu.Unlock()
if !ok {
return nil, errors.ErrEgressNotFound
return errors.ErrEgressNotFound
}

// ensure updates are sent sequentially
Expand All @@ -106,7 +112,7 @@ func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o
// egress was created, continue
case <-e.aborted.Watch():
// egress was aborted, ignore
return &emptypb.Empty{}, nil
return nil
}

// ensure only one thread is sending updates sequentially
Expand All @@ -117,15 +123,15 @@ func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o
case update := <-e.pending:
var err error
for i := 0; i < 10; i++ {
_, err = c.IOInfoClient.UpdateEgress(ctx, update, opts...)
_, err = c.IOInfoClient.UpdateEgress(ctx, update)
if err == nil {
break
}
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
if err != nil {
logger.Warnw("failed to update egress", err, "egressID", update.EgressId)
return nil, err
return err
}

requestType, outputType := egress.GetTypes(update.Request)
Expand All @@ -149,18 +155,19 @@ func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o
}

default:
return &emptypb.Empty{}, nil
return nil
}
}
}

func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req, opts...)
func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req)
if err != nil {
logger.Errorw("failed to update ms", err)
return nil, err
return err
}
return &emptypb.Empty{}, nil

return nil
}

func (c *ioClient) Drain() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Server) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) (
}

func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
if _, err := s.ioClient.UpdateEgress(ctx, info); err != nil {
if err := s.ioClient.UpdateEgress(ctx, info); err != nil {
logger.Errorw("failed to update egress", err)
}

Expand Down
41 changes: 23 additions & 18 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,21 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
"request", p.Info.Request,
)

err = s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
if err != nil {
errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
launchErr := s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
createErr := <-errChan

if createErr != nil {
s.AbortProcess(req.EgressId, createErr)
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
return nil, createErr
}

_, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
if err != nil {
s.AbortProcess(req.EgressId, err)
if launchErr != nil {
s.processFailed((*livekit.EgressInfo)(p.Info))
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
return nil, launchErr
}

return (*livekit.EgressInfo)(p.Info), nil
Expand Down Expand Up @@ -143,22 +145,14 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress

func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressInfo, err error) {
if err != nil {
// should only happen if process failed catashrophically
now := time.Now().UnixNano()
info.UpdatedAt = now
info.EndedAt = now
info.Status = livekit.EgressStatus_EGRESS_FAILED
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_, _ = s.ioClient.UpdateEgress(context.Background(), info)

s.processFailed(info)
logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
if maxCPU > 0 {
_, _ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{
_ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{
Info: info,
AvgCpuUsage: float32(avgCPU),
MaxCpuUsage: float32(maxCPU),
Expand All @@ -169,6 +163,17 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
s.activeRequests.Dec()
}

func (s *Server) processFailed(info *livekit.EgressInfo) {
// should only happen if process failed catashrophically
now := time.Now().UnixNano()
info.UpdatedAt = now
info.EndedAt = now
info.Status = livekit.EgressStatus_EGRESS_FAILED
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_ = s.ioClient.UpdateEgress(context.Background(), info)
}

func (s *Server) StartEgressAffinity(_ context.Context, req *rpc.StartEgressRequest) float32 {
if s.IsDisabled() || !s.monitor.CanAcceptRequest(req) {
// cannot accept
Expand Down

0 comments on commit 388d49b

Please sign in to comment.