Skip to content

Commit

Permalink
Implement exponential backoff in egress RPC client retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
biglittlebigben committed Dec 13, 2023
1 parent 8950f73 commit d1867b9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/lithammer/shortuuid/v4 v4.0.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/psrpc v0.5.2
github.com/livekit/psrpc v0.5.3-0.20231213223846-bc354498735c
github.com/mackerelio/go-osstat v0.2.4
github.com/maxbrunsfeld/counterfeiter/v6 v6.7.0
github.com/pion/logging v0.2.2
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U=
github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/psrpc v0.5.3-0.20231213034606-72c092a7ae28 h1:vb558KoaxeDQ1ojLmRg+PxmNWbqa3Gy+083ECMq2C6Y=
github.com/livekit/psrpc v0.5.3-0.20231213034606-72c092a7ae28/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/livekit/psrpc v0.5.3-0.20231213223846-bc354498735c h1:tfLuE8yp8KrQydTIcyr3DktmMLs6yv+skyGskxaGQgE=
github.com/livekit/psrpc v0.5.3-0.20231213223846-bc354498735c/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down
39 changes: 29 additions & 10 deletions rpc/egress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ package rpc
import (
"context"
"errors"
"math"
"math/rand"
"time"

"github.com/livekit/protocol/livekit"
"github.com/livekit/psrpc"
"github.com/livekit/psrpc/pkg/middleware"
)

const retries = 3
const (
retries = 3
backoffBase = 1 * time.Second
)

type EgressClient interface {
EgressInternalClient
Expand All @@ -36,6 +41,16 @@ type egressClient struct {
EgressHandlerClient
}

func isErrRecoverable(err error) bool {
var e psrpc.Error
if !errors.As(err, &e) {
return true
}
return e.Code() == psrpc.DeadlineExceeded ||
e.Code() == psrpc.ResourceExhausted ||
e.Code() == psrpc.Unavailable
}

func NewEgressClient(params ClientParams) (EgressClient, error) {
if params.Bus == nil {
return nil, nil
Expand All @@ -46,17 +61,21 @@ func NewEgressClient(params ClientParams) (EgressClient, error) {
timeout = 10 * time.Second
}
internalOpts := append(opts, middleware.WithRPCRetries(middleware.RetryOptions{
MaxAttempts: params.MaxAttempts,
// use longer retry timeout
Timeout: timeout,
IsRecoverable: func(err error) bool {
var e psrpc.Error
if !errors.As(err, &e) {
return true
GetRetryParameters: func(err error, attempt int) (retry bool, timeout time.Duration, waitTime time.Duration) {
if !isErrRecoverable(err) {
return false, 0, 0
}
return e.Code() == psrpc.DeadlineExceeded ||
e.Code() == psrpc.ResourceExhausted ||
e.Code() == psrpc.Unavailable

if attempt >= retries {
return false, 0, 0
}

// backoff = base * 2 ^ (attempt - 1) * rand[1,2)
backoff := time.Duration(float64(backoffBase) * math.Pow(2, float64(attempt-1)) * (rand.Float64() + 1))
timeout = time.Duration(float64(timeout) * math.Pow(2, float64(attempt-1)) * (rand.Float64() + 1))

return true, timeout, backoff
},
}))
internalClient, err := NewEgressInternalClient(params.Bus, internalOpts...)
Expand Down

0 comments on commit d1867b9

Please sign in to comment.