Skip to content

Commit

Permalink
add retries for updates
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Sep 6, 2024
1 parent 83e1973 commit 89be0e9
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"github.com/livekit/psrpc"
)

const (
ioTimeout = time.Second * 30
)

type IOClient interface {
rpc.IOInfoClient
Drain()
Expand All @@ -52,10 +56,11 @@ type egressIOClient struct {
}

func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
client, err := rpc.NewIOInfoClient(bus)
client, err := rpc.NewIOInfoClient(bus, psrpc.WithClientTimeout(ioTimeout))
if err != nil {
return nil, err
}

return &ioClient{
IOInfoClient: client,
egresses: make(map[string]*egressIOClient),
Expand Down Expand Up @@ -110,9 +115,16 @@ func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o
for {
select {
case update := <-e.pending:
_, err := c.IOInfoClient.UpdateEgress(ctx, update, opts...)
var err error
for i := 0; i < 10; i++ {
_, err = c.IOInfoClient.UpdateEgress(ctx, update, opts...)
if err == nil {
break
}
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
if err != nil {
logger.Errorw("failed to update egress", err)
logger.Warnw("failed to update egress", err, "egressID", update.EgressId)
return nil, err
}

Expand Down

0 comments on commit 89be0e9

Please sign in to comment.