Skip to content

Commit

Permalink
add chooser
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jan 22, 2024
1 parent 343f0e8 commit 80e9973
Show file tree
Hide file tree
Showing 28 changed files with 843 additions and 62 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
GOPATH=$GITHUB_WORKSPACE go get golang.org/x/sync/singleflight
GOPATH=$GITHUB_WORKSPACE go get github.com/qiniu/dyn
GOPATH=$GITHUB_WORKSPACE go get github.com/gofrs/flock
GOPATH=$GITHUB_WORKSPACE go get github.com/alex-ant/gomath/rational
# FIXME special package
# github.com/go-playground/validator/v10
Expand Down
5 changes: 0 additions & 5 deletions client/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"fmt"
"net"
"time"
)
Expand Down Expand Up @@ -38,14 +37,10 @@ func defaultDialFunc(ctx context.Context, network string, address string) (net.C
if port != "" {
newAddr = net.JoinHostPort(newAddr, port)
}
fmt.Printf("***** defaultDialFunc 1: network: %s, newAddr: %s\n", network, newAddr)
if conn, err := dialer.DialContext(ctx, network, newAddr); err == nil {
return conn, nil
}
}
fmt.Printf("***** defaultDialFunc 2: network: %s, host: %s\n", network, host)
} else {
fmt.Printf("***** defaultDialFunc 3: network: %s, host: %s\n", network, host)
}
return (&net.Dialer{Timeout: dialTimeout, KeepAlive: keepAliveInterval}).DialContext(ctx, network, address)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/qiniu/go-sdk/v7
go 1.14

require (
github.com/alex-ant/gomath v0.0.0-20160516115720-89013a210a82
github.com/dave/jennifer v1.6.1
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/alex-ant/gomath v0.0.0-20160516115720-89013a210a82 h1:7dONQ3WNZ1zy960TmkxJPuwoolZwL7xKtpcM04MBnt4=
github.com/alex-ant/gomath v0.0.0-20160516115720-89013a210a82/go.mod h1:nLnM0KdK1CmygvjpDUO6m1TjSsiQtL61juhNsvV/JVI=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dave/jennifer v1.6.1 h1:T4T/67t6RAA5AIV6+NP8Uk/BIsXgDoqEowgycdQQLuk=
github.com/dave/jennifer v1.6.1/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc=
Expand Down
9 changes: 0 additions & 9 deletions internal/clientv2/interceptor_retry_hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ import (
"strings"
"time"

clientV1 "github.com/qiniu/go-sdk/v7/client"
"github.com/qiniu/go-sdk/v7/internal/hostprovider"
internal_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

type HostsRetryConfig struct {
Resolver resolver.Resolver // 主备域名解析器
RetryConfig RetryConfig // 主备域名重试参数
HostFreezeDuration time.Duration // 主备域名冻结时间(默认:600s),当一个域名请求失败被冻结的时间,最小 time.Millisecond
HostProvider hostprovider.HostProvider // 备用域名获取方法
Expand Down Expand Up @@ -73,12 +70,6 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H
// Clone 防止后面 Handler 处理对 req 有污染
reqBefore := cloneReq(req)

if resolver := interceptor.options.Resolver; resolver != nil {
if ips, err := resolver.Resolve(req.Context(), req.URL.Hostname()); err == nil && len(ips) > 0 {
req = req.WithContext(clientV1.WithResolvedIPs(req.Context(), req.URL.Hostname(), ips))
}
}

resp, err = handler(req)

if !interceptor.options.RetryConfig.ShouldRetry(reqBefore, resp, err) {
Expand Down
6 changes: 3 additions & 3 deletions internal/clientv2/interceptor_retry_hosts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestHostsAlwaysRetryInterceptor(t *testing.T) {
})

retryMax := 1
sRetryInterceptor := NewSimpleRetryInterceptor(RetryConfig{
sRetryInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func() time.Duration {
return time.Second
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestHostsNotRetryInterceptor(t *testing.T) {
})

retryMax := 1
sRetryInterceptor := NewSimpleRetryInterceptor(RetryConfig{
sRetryInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func() time.Duration {
return time.Second
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestHostsRetryInterceptorByRequest(t *testing.T) {
})

retryMax := 1
sRetryInterceptor := NewSimpleRetryInterceptor(RetryConfig{
sRetryInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func() time.Duration {
return time.Second
Expand Down
68 changes: 59 additions & 9 deletions internal/clientv2/interceptor_retry_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

clientv1 "github.com/qiniu/go-sdk/v7/client"
internal_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

type contextKeyBufferResponse struct{}
Expand Down Expand Up @@ -43,16 +45,44 @@ func (c *RetryConfig) init() {
}
}

type simpleRetryInterceptor struct {
config RetryConfig
}
type (
SimpleRetryConfig struct {
RetryMax int // 最大重试次数
RetryInterval func() time.Duration // 重试时间间隔
ShouldRetry func(req *http.Request, resp *http.Response, err error) bool
Resolver resolver.Resolver // 主备域名解析器
Chooser chooser.Chooser // 主备域名选择器
}

simpleRetryInterceptor struct {
config SimpleRetryConfig
}
)

func (c *SimpleRetryConfig) init() {
if c == nil {
return
}

if c.RetryMax < 0 {
c.RetryMax = 0
}

if c.RetryInterval == nil {
c.RetryInterval = func() time.Duration {
return time.Duration(50+rand.Int()%50) * time.Millisecond
}
}

func NewSimpleRetryInterceptor(config RetryConfig) Interceptor {
return &simpleRetryInterceptor{
config: config,
if c.ShouldRetry == nil {
c.ShouldRetry = isSimpleRetryable
}
}

func NewSimpleRetryInterceptor(config SimpleRetryConfig) Interceptor {
return &simpleRetryInterceptor{config: config}
}

func (interceptor *simpleRetryInterceptor) Priority() InterceptorPriority {
return InterceptorPriorityRetrySimple
}
Expand All @@ -65,9 +95,18 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler

interceptor.config.init()

// 不重试
if interceptor.config.RetryMax <= 0 {
return handler(req)
var ips []net.IP
hostname := req.URL.Hostname()

if resolver := interceptor.config.Resolver; resolver != nil {
if ips, err = resolver.Resolve(req.Context(), hostname); err == nil && len(ips) > 0 {
if cs := interceptor.config.Chooser; cs != nil {
ips = cs.Choose(req.Context(), &chooser.ChooseOptions{IPs: ips, Domain: hostname})
}
if len(ips) > 0 {
req = req.WithContext(clientv1.WithResolvedIPs(req.Context(), hostname, ips))
}
}
}

// 可能会被重试多次
Expand All @@ -83,8 +122,19 @@ func (interceptor *simpleRetryInterceptor) Intercept(req *http.Request, handler
}

if !interceptor.config.ShouldRetry(reqBefore, resp, err) {
if len(ips) > 0 {
if cs := interceptor.config.Chooser; cs != nil {
cs.FeedbackGood(req.Context(), &chooser.FeedbackOptions{IPs: ips, Domain: hostname})
}
}
return resp, err
}
if len(ips) > 0 {
if cs := interceptor.config.Chooser; cs != nil {
cs.FeedbackBad(req.Context(), &chooser.FeedbackOptions{IPs: ips, Domain: hostname})
}
}

req = reqBefore

if i >= interceptor.config.RetryMax {
Expand Down
4 changes: 2 additions & 2 deletions internal/clientv2/interceptor_retry_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestSimpleAlwaysRetryInterceptor(t *testing.T) {

retryMax := 1
rInterceptor := NewSimpleRetryInterceptor(RetryConfig{
rInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func() time.Duration {
return time.Second
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestSimpleAlwaysRetryInterceptor(t *testing.T) {
func TestSimpleNotRetryInterceptor(t *testing.T) {

retryMax := 1
rInterceptor := NewSimpleRetryInterceptor(RetryConfig{
rInterceptor := NewSimpleRetryInterceptor(SimpleRetryConfig{
RetryMax: retryMax,
RetryInterval: func() time.Duration {
return time.Second
Expand Down
17 changes: 15 additions & 2 deletions storage/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"strings"
"time"

"github.com/alex-ant/gomath/rational"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/storagev2/apis"
"github.com/qiniu/go-sdk/v7/storagev2/apis/batch_ops"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/http_client"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"

Expand Down Expand Up @@ -285,10 +287,14 @@ type BatchOpRet struct {
}

type BucketManagerOptions struct {
RetryMax int // 单域名重试次数,当前只有 uc 相关的服务有多域名
// 单域名重试次数,当前只有 uc 相关的服务有多域名
RetryMax int
// 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。
HostFreezeDuration time.Duration
Resolver resolver.Resolver
// 域名解析器
Resolver resolver.Resolver
// 域名选择器
Chooser chooser.Chooser
}

// BucketManager 提供了对资源进行管理的操作
Expand Down Expand Up @@ -1056,6 +1062,13 @@ func (m *BucketManager) resolver() (resolver.Resolver, error) {
}
}

func (m *BucketManager) chooser() chooser.Chooser {
if m.options.Chooser != nil {
return m.options.Chooser
}
return chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
}

// 构建op的方法,导出的方法支持在Batch操作中使用

// URIStat 构建 stat 接口的请求命令
Expand Down
1 change: 1 addition & 0 deletions storage/bucket_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (m *BucketManager) Get(bucket, key string, options *GetObjectInput) (*GetOb
RetryMax: m.options.RetryMax,
HostFreezeDuration: m.options.HostFreezeDuration,
Resolver: resolver,
Chooser: m.chooser(),
}); e != nil {
return nil, e
} else if len(rg.regions) == 0 {
Expand Down
8 changes: 6 additions & 2 deletions storage/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/internal/hostprovider"
"github.com/qiniu/go-sdk/v7/storagev2/apis"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/http_client"
region_v2 "github.com/qiniu/go-sdk/v7/storagev2/region"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
Expand Down Expand Up @@ -383,6 +384,8 @@ type ucClientConfig struct {

Resolver resolver.Resolver

Chooser chooser.Chooser

Client *client.Client
}

Expand Down Expand Up @@ -410,12 +413,13 @@ func getUCClient(config ucClientConfig, mac *auth.Credentials) clientv2.Client {
ShouldFreezeHost: nil,
HostFreezeDuration: config.HostFreezeDuration,
HostProvider: hostprovider.NewWithHosts(hosts),
Resolver: config.Resolver,
}),
clientv2.NewSimpleRetryInterceptor(clientv2.RetryConfig{
clientv2.NewSimpleRetryInterceptor(clientv2.SimpleRetryConfig{
RetryMax: config.RetryMax,
RetryInterval: nil,
ShouldRetry: nil,
Resolver: config.Resolver,
Chooser: config.Chooser,
}),
}

Expand Down
7 changes: 7 additions & 0 deletions storage/region_uc_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (

"golang.org/x/sync/singleflight"

"github.com/alex-ant/gomath/rational"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

Expand Down Expand Up @@ -214,6 +216,7 @@ type UCApiOptions struct {
// 主备域名冻结时间(默认:600s),当一个域名请求失败(单个域名会被重试 TryTimes 次),会被冻结一段时间,使用备用域名进行重试,在冻结时间内,域名不能被使用,当一个操作中所有域名竣备冻结操作不在进行重试,返回最后一次操作的错误。
HostFreezeDuration time.Duration
Resolver resolver.Resolver
Chooser chooser.Chooser
}

func DefaultUCApiOptions() UCApiOptions {
Expand Down Expand Up @@ -258,13 +261,17 @@ func getRegionByV2(ak, bucket string, options UCApiOptions) (*Region, error) {
return nil, err
}
}
if options.Chooser == nil {
options.Chooser = chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
}

var ret UcQueryRet
c := getUCClient(ucClientConfig{
IsUcQueryApi: true,
RetryMax: options.RetryMax,
HostFreezeDuration: options.HostFreezeDuration,
Resolver: options.Resolver,
Chooser: options.Chooser,
}, nil)
err = clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{
Context: context.Background(),
Expand Down
6 changes: 6 additions & 0 deletions storage/region_uc_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (

"golang.org/x/sync/singleflight"

"github.com/alex-ant/gomath/rational"
"github.com/qiniu/go-sdk/v7/internal/clientv2"
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

Expand Down Expand Up @@ -157,13 +159,17 @@ func getRegionByV4(ak, bucket string, options UCApiOptions) (*RegionGroup, error
return nil, err
}
}
if options.Chooser == nil {
options.Chooser = chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
}

var ret ucQueryV4Ret
c := getUCClient(ucClientConfig{
IsUcQueryApi: true,
RetryMax: options.RetryMax,
HostFreezeDuration: options.HostFreezeDuration,
Resolver: options.Resolver,
Chooser: options.Chooser,
}, nil)
err = clientv2.DoAndDecodeJsonResponse(c, clientv2.RequestParams{
Context: context.Background(),
Expand Down
1 change: 1 addition & 0 deletions storage/uc.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,5 +800,6 @@ func (m *BucketManager) getUCClient() (clientv2.Client, error) {
HostFreezeDuration: m.options.HostFreezeDuration,
Client: m.Client,
Resolver: resolver,
Chooser: m.chooser(),
}, m.Mac), nil
}
Loading

0 comments on commit 80e9973

Please sign in to comment.