Skip to content

Commit

Permalink
add backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jan 31, 2024
1 parent 09b9b4e commit 38ac6b5
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 92 deletions.
25 changes: 9 additions & 16 deletions internal/clientv2/interceptor_retry_hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,23 @@ import (
)

type HostsRetryConfig struct {
RetryConfig RetryConfig // 主备域名重试参数
RetryMax int // 最大重试次数
ShouldRetry func(req *http.Request, resp *http.Response, err error) bool
HostFreezeDuration time.Duration // 主备域名冻结时间(默认:600s),当一个域名请求失败被冻结的时间,最小 time.Millisecond
HostProvider hostprovider.HostProvider // 备用域名获取方法
ShouldFreezeHost func(req *http.Request, resp *http.Response, err error) bool
}

func (c *HostsRetryConfig) init() {
if c.RetryConfig.ShouldRetry == nil {
c.RetryConfig.ShouldRetry = func(req *http.Request, resp *http.Response, err error) bool {
if c.ShouldRetry == nil {
c.ShouldRetry = func(req *http.Request, resp *http.Response, err error) bool {
return isHostRetryable(req, resp, err)
}
}
if c.RetryConfig.RetryMax < 0 {
c.RetryConfig.RetryMax = 1
if c.RetryMax < 0 {
c.RetryMax = 1
}

c.RetryConfig.init()

if c.HostFreezeDuration < time.Millisecond {
c.HostFreezeDuration = 600 * time.Second
}
Expand Down Expand Up @@ -62,7 +61,7 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H
interceptor.options.init()

// 不重试
if interceptor.options.RetryConfig.RetryMax <= 0 {
if interceptor.options.RetryMax <= 0 {
return handler(req)
}

Expand All @@ -72,7 +71,7 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H

resp, err = handler(req)

if !interceptor.options.RetryConfig.ShouldRetry(reqBefore, resp, err) {
if !interceptor.options.ShouldRetry(reqBefore, resp, err) {
return resp, err
}

Expand All @@ -84,7 +83,7 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H
}
}

if i >= interceptor.options.RetryConfig.RetryMax {
if i >= interceptor.options.RetryMax {
break
}

Expand Down Expand Up @@ -121,12 +120,6 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H
internal_io.SinkAll(resp.Body)
resp.Body.Close()
}

retryInterval := interceptor.options.RetryConfig.RetryInterval(&RetryInfo{Retried: i})
if retryInterval < time.Microsecond {
continue
}
time.Sleep(retryInterval)
}
return resp, err
}
Expand Down
41 changes: 8 additions & 33 deletions internal/clientv2/interceptor_retry_hosts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ func TestHostsAlwaysRetryInterceptor(t *testing.T) {
hostB := "bbb.bb.com"
hRetryMax := 2
hRetryInterceptor := NewHostsRetryInterceptor(HostsRetryConfig{
RetryConfig: RetryConfig{
RetryMax: hRetryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
return time.Second
},
ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
return true
},
RetryMax: hRetryMax,
ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
return true
},
ShouldFreezeHost: nil,
HostFreezeDuration: 0,
Expand All @@ -39,7 +34,7 @@ func TestHostsAlwaysRetryInterceptor(t *testing.T) {
retryMax := 1
sRetryInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
RetryInterval: func() time.Duration {
return time.Second
},
ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
Expand All @@ -65,25 +60,18 @@ func TestHostsAlwaysRetryInterceptor(t *testing.T) {

c := NewClient(&testClient{}, interceptor, hRetryInterceptor, sRetryInterceptor)

start := time.Now()

resp, _ := Do(c, RequestParams{
Context: nil,
Method: RequestMethodGet,
Url: "https://" + hostA + "/path/123",
Header: nil,
GetBody: nil,
})
duration := float32(time.Now().UnixNano()-start.UnixNano()) / 1e9

if (retryMax+1)*2 != doCount {
t.Fatalf("retry count is not error:%d", doCount)
}

if duration > float32(doCount-1)+0.3 || duration < float32(doCount-1)-0.3 {
t.Fatalf("retry interval may be error:%f", duration)
}

value := resp.Header.Get(headerKey)
if value != " -> request -> Do -> response" {
t.Fatalf("retry flow error")
Expand All @@ -104,15 +92,7 @@ func TestHostsNotRetryInterceptor(t *testing.T) {
hostB := "bbb.bb.com"
hRetryMax := 2
hRetryInterceptor := NewHostsRetryInterceptor(HostsRetryConfig{
RetryConfig: RetryConfig{
RetryMax: hRetryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
return time.Second
},
//ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
// return true
//},
},
RetryMax: hRetryMax,
ShouldFreezeHost: nil,
HostFreezeDuration: 0,
HostProvider: hostprovider.NewWithHosts([]string{hostA, hostB}),
Expand All @@ -121,7 +101,7 @@ func TestHostsNotRetryInterceptor(t *testing.T) {
retryMax := 1
sRetryInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
RetryInterval: func() time.Duration {
return time.Second
},
//ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
Expand Down Expand Up @@ -186,12 +166,7 @@ func TestHostsRetryInterceptorByRequest(t *testing.T) {
hostB := "www.qiniu.com"
hRetryMax := 30
hRetryInterceptor := NewHostsRetryInterceptor(HostsRetryConfig{
RetryConfig: RetryConfig{
RetryMax: hRetryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
return time.Second
},
},
RetryMax: hRetryMax,
ShouldFreezeHost: nil,
HostFreezeDuration: 0,
HostProvider: hostprovider.NewWithHosts([]string{hostA, hostB}),
Expand All @@ -200,7 +175,7 @@ func TestHostsRetryInterceptorByRequest(t *testing.T) {
retryMax := 1
sRetryInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
RetryInterval: func() time.Duration {
return time.Second
},
})
Expand Down
42 changes: 22 additions & 20 deletions internal/clientv2/interceptor_retry_simple.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clientv2

import (
"context"
"io"
"math/rand"
"net"
Expand All @@ -13,20 +14,18 @@ import (

clientv1 "github.com/qiniu/go-sdk/v7/client"
internal_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/backoff"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

type (
contextKeyBufferResponse struct{}

RetryInfo struct {
Retried int
}

SimpleRetryConfig struct {
RetryMax int // 最大重试次数
RetryInterval func(*RetryInfo) time.Duration // 重试时间间隔
RetryMax int // 最大重试次数
RetryInterval func() time.Duration // 重试时间间隔 v1
Backoff backoff.Backoff // 重试时间间隔 v2,优先级高于 RetryInterval
ShouldRetry func(req *http.Request, resp *http.Response, err error) bool
Resolver resolver.Resolver // 主备域名解析器
Chooser chooser.Chooser // 主备域名选择器
Expand All @@ -37,8 +36,9 @@ type (
}

RetryConfig struct {
RetryMax int // 最大重试次数
RetryInterval func(*RetryInfo) time.Duration // 重试时间间隔
RetryMax int // 最大重试次数
RetryInterval func() time.Duration // 重试时间间隔 v1
Backoff backoff.Backoff // 重试时间间隔 v2,优先级高于 RetryInterval
ShouldRetry func(req *http.Request, resp *http.Response, err error) bool
}
)
Expand All @@ -52,10 +52,6 @@ func (c *RetryConfig) init() {
c.RetryMax = 0
}

if c.RetryInterval == nil {
c.RetryInterval = defaultRetryInterval
}

if c.ShouldRetry == nil {
c.ShouldRetry = isSimpleRetryable
}
Expand All @@ -70,15 +66,21 @@ func (c *SimpleRetryConfig) init() {
c.RetryMax = 0
}

if c.RetryInterval == nil {
c.RetryInterval = defaultRetryInterval
}

if c.ShouldRetry == nil {
c.ShouldRetry = isSimpleRetryable
}
}

func (c *SimpleRetryConfig) getRetryInterval(ctx context.Context, attempts int) time.Duration {
if bf := c.Backoff; bf != nil {
return bf.Time(ctx, &backoff.BackoffOptions{Attempts: attempts})
}
if ri := c.RetryInterval; ri != nil {
return ri()
}
return defaultRetryInterval()
}

func NewSimpleRetryInterceptor(config SimpleRetryConfig) Interceptor {
return &simpleRetryInterceptor{config: config}
}
Expand Down Expand Up @@ -146,11 +148,11 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler
resp.Body.Close()
}

retryInterval := interceptor.config.RetryInterval(&RetryInfo{Retried: i})
if retryInterval < time.Microsecond {
if retryInterval := interceptor.config.getRetryInterval(req.Context(), i); retryInterval < time.Microsecond {
continue
} else {
time.Sleep(retryInterval)
}
time.Sleep(retryInterval)
}
return resp, err
}
Expand Down Expand Up @@ -265,6 +267,6 @@ func isNetworkErrorWithOpError(err *net.OpError) bool {
return false
}

func defaultRetryInterval(_ *RetryInfo) time.Duration {
func defaultRetryInterval() time.Duration {
return time.Duration(50+rand.Int()%50) * time.Millisecond
}
58 changes: 56 additions & 2 deletions internal/clientv2/interceptor_retry_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"net/http"
"testing"
"time"

"github.com/qiniu/go-sdk/v7/storagev2/backoff"
)

func TestSimpleAlwaysRetryInterceptor(t *testing.T) {

retryMax := 1
rInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
RetryInterval: func() time.Duration {
return time.Second
},
ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
Expand Down Expand Up @@ -69,7 +71,7 @@ func TestSimpleNotRetryInterceptor(t *testing.T) {
retryMax := 1
rInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func(_ *RetryInfo) time.Duration {
RetryInterval: func() time.Duration {
return time.Second
},
// 默认状态码是 400,400 不重试
Expand Down Expand Up @@ -120,3 +122,55 @@ func TestSimpleNotRetryInterceptor(t *testing.T) {
t.Fatalf("retry flow error")
}
}

func TestRetryInterceptorWithBackoff(t *testing.T) {
retryMax := 5
rInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
Backoff: backoff.NewExponentialBackoff(100*time.Millisecond, 2),
ShouldRetry: func(req *http.Request, resp *http.Response, err error) bool {
return true
},
})

doCount := 0
interceptor := NewSimpleInterceptor(func(req *http.Request, handler Handler) (*http.Response, error) {
doCount += 1

value := req.Header.Get(headerKey)
value += " -> request"
req.Header.Set(headerKey, value)

resp, err := handler(req)

value = resp.Header.Get(headerKey)
value += " -> response"
resp.Header.Set(headerKey, value)
return resp, err
})

c := NewClient(&testClient{}, rInterceptor, interceptor)

start := time.Now()
resp, _ := Do(c, RequestParams{
Context: nil,
Method: "",
Url: "https://aaa.com",
Header: nil,
GetBody: nil,
})
duration := float32(time.Now().UnixNano()-start.UnixNano()) / float32(time.Millisecond)

if duration > 3100+10 || duration < 3100-10 {
t.Fatalf("retry interval may be error:%f", duration)
}

if (retryMax + 1) != doCount {
t.Fatalf("retry count is not 2")
}

value := resp.Header.Get(headerKey)
if value != " -> request -> Do -> response" {
t.Fatalf("retry flow error")
}
}
Loading

0 comments on commit 38ac6b5

Please sign in to comment.