From 89be0e9214cac815d63ec281419681cddaea35f7 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 6 Sep 2024 12:52:10 -0700 Subject: [PATCH] add retries for updates --- pkg/info/io.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/info/io.go b/pkg/info/io.go index 6e3918da..54c09c24 100644 --- a/pkg/info/io.go +++ b/pkg/info/io.go @@ -31,6 +31,10 @@ import ( "github.com/livekit/psrpc" ) +const ( + ioTimeout = time.Second * 30 +) + type IOClient interface { rpc.IOInfoClient Drain() @@ -52,10 +56,11 @@ type egressIOClient struct { } func NewIOClient(bus psrpc.MessageBus) (IOClient, error) { - client, err := rpc.NewIOInfoClient(bus) + client, err := rpc.NewIOInfoClient(bus, psrpc.WithClientTimeout(ioTimeout)) if err != nil { return nil, err } + return &ioClient{ IOInfoClient: client, egresses: make(map[string]*egressIOClient), @@ -110,9 +115,16 @@ func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, o for { select { case update := <-e.pending: - _, err := c.IOInfoClient.UpdateEgress(ctx, update, opts...) + var err error + for i := 0; i < 10; i++ { + _, err = c.IOInfoClient.UpdateEgress(ctx, update, opts...) + if err == nil { + break + } + time.Sleep(time.Millisecond * 100 * time.Duration(i)) + } if err != nil { - logger.Errorw("failed to update egress", err) + logger.Warnw("failed to update egress", err, "egressID", update.EgressId) return nil, err }