Skip to content

Commit

Permalink
add defaultDialer
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jan 17, 2024
1 parent bb8ce50 commit f2c7b7f
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 34 deletions.
16 changes: 7 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ import (
"github.com/qiniu/go-sdk/v7/reqid"
)

var UserAgent = getUserAgentWithAppName("default")
var DefaultClient = Client{
&http.Client{
Transport: http.DefaultTransport,
},
}
var (
UserAgent = getUserAgentWithAppName("default")
DefaultClient = Client{&http.Client{Transport: defaultTransport}}

// 用来打印调试信息
var DebugMode = false
var DeepDebugInfo = false
// 用来打印调试信息
DebugMode = false
DeepDebugInfo = false
)

// --------------------------------------------------------------------

Expand Down
18 changes: 18 additions & 0 deletions client/client_1.12.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//go:build !1.13
// +build !1.13

package client

import (
"net/http"
"time"
)

var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: defaultDialFunc,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
19 changes: 19 additions & 0 deletions client/client_1.13.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//go:build 1.13
// +build 1.13

package client

import (
"net/http"
"time"
)

var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: defaultDialFunc,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
64 changes: 64 additions & 0 deletions client/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package client

import (
"context"
"fmt"
"net"
"time"
)

type (
resolverContextKey struct{}
dialTimeoutContextKey struct{}
keepAliveIntervalContextKey struct{}
resolverValues struct {
ips []net.IP
domain string
}
)

func defaultDialFunc(ctx context.Context, network string, address string) (net.Conn, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
host = address
}
dialTimeout, ok := ctx.Value(dialTimeoutContextKey{}).(time.Duration)
if !ok {
dialTimeout = 30 * time.Second
}
keepAliveInterval, ok := ctx.Value(keepAliveIntervalContextKey{}).(time.Duration)
if !ok {
keepAliveInterval = 15 * time.Second
}
if resolved, ok := ctx.Value(resolverContextKey{}).(resolverValues); ok {
if resolved.domain == host && len(resolved.ips) > 0 {
dialer := net.Dialer{Timeout: dialTimeout / time.Duration(len(resolved.ips)), KeepAlive: keepAliveInterval}
for _, ip := range resolved.ips {
newAddr := ip.String()
if port != "" {
newAddr = net.JoinHostPort(newAddr, port)
}
fmt.Printf("******* 1: dialer.DialContext(%s, %s), address = %s\n", network, newAddr, address)
if conn, err := dialer.DialContext(ctx, network, newAddr); err == nil {
return conn, nil
}
}
} else {
fmt.Printf("******* 2: resolved = %#v, host = %s\n", resolved, host)
}
}
fmt.Printf("******* 3: dialer.DialContext(%s, %s)\n", network, address)
return (&net.Dialer{Timeout: dialTimeout, KeepAlive: keepAliveInterval}).DialContext(ctx, network, address)
}

func WithResolvedIPs(ctx context.Context, domain string, ips []net.IP) context.Context {
return context.WithValue(ctx, resolverContextKey{}, resolverValues{ips: ips, domain: domain})
}

func WithDialTimeout(ctx context.Context, timeout time.Duration) context.Context {
return context.WithValue(ctx, dialTimeoutContextKey{}, timeout)
}

func WithKeepAliveInterval(ctx context.Context, interval time.Duration) context.Context {
return context.WithValue(ctx, keepAliveIntervalContextKey{}, interval)
}
37 changes: 37 additions & 0 deletions client/dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//go:build unit
// +build unit

package client

import (
"context"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
)

func TestDefaultDialer(t *testing.T) {
var responseBody struct {
Status string `json:"status"`
}
mux := http.NewServeMux()
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`))
}))
server := httptest.NewServer(mux)
defer server.Close()

port := server.Listener.Addr().(*net.TCPAddr).Port

ctx := WithResolvedIPs(context.Background(), "www.qiniu.com", []net.IP{net.IPv4(127, 0, 0, 1)})
err := DefaultClient.Call(ctx, &responseBody, http.MethodGet, fmt.Sprintf("http://www.qiniu.com:%d/", port), nil)
if err != nil {
t.Fatal(err)
}
if responseBody.Status != "ok" {
t.Fatal("unexpected response")
}
}
17 changes: 1 addition & 16 deletions internal/clientv2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@ type client struct {

func NewClient(cli Client, interceptors ...Interceptor) Client {
if cli == nil {
if clientV1.DefaultClient.Client != nil {
cli = NewClientWithClientV1(&clientV1.DefaultClient)
} else if http.DefaultClient != nil {
cli = http.DefaultClient
} else {
cli = &http.Client{}
}
cli = NewClientWithClientV1(&clientV1.DefaultClient)
}

var is interceptorList = interceptors
Expand Down Expand Up @@ -130,15 +124,6 @@ func NewClientWithClientV1(c *clientV1.Client) Client {
if c == nil {
c = &clientV1.DefaultClient
}

if c.Client == nil {
if clientV1.DefaultClient.Client != nil {
c.Client = clientV1.DefaultClient.Client
} else {
c.Client = &http.Client{}
}
}

return &clientV1Wrapper{
c: c,
}
Expand Down
9 changes: 9 additions & 0 deletions internal/clientv2/interceptor_retry_hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"strings"
"time"

clientV1 "github.com/qiniu/go-sdk/v7/client"
"github.com/qiniu/go-sdk/v7/internal/hostprovider"
internal_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

type HostsRetryConfig struct {
Resolver resolver.Resolver // 主备域名解析器
RetryConfig RetryConfig // 主备域名重试参数
HostFreezeDuration time.Duration // 主备域名冻结时间(默认:600s),当一个域名请求失败被冻结的时间,最小 time.Millisecond
HostProvider hostprovider.HostProvider // 备用域名获取方法
Expand Down Expand Up @@ -112,6 +115,12 @@ func (interceptor *hostsRetryInterceptor) Intercept(req *http.Request, handler H

reqBefore.Host = u.Host
reqBefore.URL = u

if resolver := interceptor.options.Resolver; resolver != nil {
if ips, err := resolver.Resolve(reqBefore.Context(), u.Hostname()); err == nil && len(ips) > 0 {
reqBefore = reqBefore.WithContext(clientV1.WithResolvedIPs(reqBefore.Context(), u.Hostname(), ips))
}
}
}

req = reqBefore
Expand Down
3 changes: 2 additions & 1 deletion storage/backward_compatible.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package storage

import (
"fmt"
"runtime"

"github.com/qiniu/go-sdk/v7/client"
"github.com/qiniu/go-sdk/v7/conf"
"runtime"
)

var DefaultClient = client.DefaultClient
Expand Down
22 changes: 18 additions & 4 deletions storagev2/http_client/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
compatible_io "github.com/qiniu/go-sdk/v7/internal/io"
"github.com/qiniu/go-sdk/v7/storagev2/credentials"
"github.com/qiniu/go-sdk/v7/storagev2/region"
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
"github.com/qiniu/go-sdk/v7/storagev2/uptoken"
)

Expand All @@ -39,6 +40,7 @@ type (
bucketQuery region.BucketRegionsQuery
regions region.RegionsProvider
credentials credentials.CredentialsProvider
resolver resolver.Resolver
hostRetryConfig *RetryConfig
hostsRetryConfig *RetryConfig
hostFreezeDuration time.Duration
Expand All @@ -53,6 +55,7 @@ type (
Credentials credentials.CredentialsProvider
Interceptors []Interceptor
UseInsecureProtocol bool
Resolver resolver.Resolver
HostRetryConfig *RetryConfig
HostsRetryConfig *RetryConfig
HostFreezeDuration time.Duration
Expand Down Expand Up @@ -95,6 +98,7 @@ func NewClient(options *Options) *Client {
bucketQuery: options.BucketQuery,
regions: options.Regions,
credentials: options.Credentials,
resolver: options.Resolver,
hostRetryConfig: options.HostRetryConfig,
hostsRetryConfig: options.HostsRetryConfig,
hostFreezeDuration: options.HostFreezeDuration,
Expand Down Expand Up @@ -204,7 +208,7 @@ func (httpClient *Client) makeReq(ctx context.Context, request *Request) (*http.
return nil, err
}
hostProvider := endpoints.ToHostProvider()
url, err := httpClient.generateUrl(request, hostProvider)
url, host, err := httpClient.generateUrl(request, hostProvider)
if err != nil {
return nil, err
}
Expand All @@ -216,7 +220,17 @@ func (httpClient *Client) makeReq(ctx context.Context, request *Request) (*http.
RetryMax: len(endpoints.Preferred) + len(endpoints.Alternative),
}
}
r := httpClient.resolver
if r == nil {
if r, err = resolver.NewCacheResolver(nil, nil); err != nil {
return nil, err
}
}
if ips, err := r.Resolve(ctx, host); err == nil && len(ips) > 0 {
ctx = clientv1.WithResolvedIPs(ctx, host, ips)
}
interceptors = append(interceptors, clientv2.NewHostsRetryInterceptor(clientv2.HostsRetryConfig{
Resolver: r,
RetryConfig: *hostsRetryConfig,
HostProvider: hostProvider,
HostFreezeDuration: httpClient.hostFreezeDuration,
Expand All @@ -239,11 +253,11 @@ func (httpClient *Client) makeReq(ctx context.Context, request *Request) (*http.
return clientv2.WithInterceptors(req, interceptors...), nil
}

func (httpClient *Client) generateUrl(request *Request, hostProvider hostprovider.HostProvider) (string, error) {
func (httpClient *Client) generateUrl(request *Request, hostProvider hostprovider.HostProvider) (string, string, error) {
var url string
host, err := hostProvider.Provider()
if err != nil {
return "", err
return "", "", err
}
if strings.Contains(host, "://") {
url = host
Expand Down Expand Up @@ -273,7 +287,7 @@ func (httpClient *Client) generateUrl(request *Request, hostProvider hostprovide
}
url += rawQuery
}
return url, nil
return url, host, nil
}

func (options *Options) SetBucketHosts(bucketHosts region.Endpoints) (err error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http_client
package resolver

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
//go:build unit
// +build unit

package http_client
package resolver_test

import (
"context"
"net"
"testing"

"github.com/qiniu/go-sdk/v7/storagev2/resolver"
)

func TestDefaultResolver(t *testing.T) {
ips, err := new(DefaultResolver).Resolve(context.Background(), "upload.qiniup.com")
ips, err := new(resolver.DefaultResolver).Resolve(context.Background(), "upload.qiniup.com")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if len(ips) == 0 {
Expand All @@ -30,7 +32,7 @@ func (mr *mockResolver) Resolve(ctx context.Context, host string) ([]net.IP, err

func TestCacheResolver(t *testing.T) {
mr := &mockResolver{m: map[string][]net.IP{"upload.qiniup.com": {net.IPv4(1, 1, 1, 1)}}, c: make(map[string]int)}
resolver, err := NewCacheResolver(mr, nil)
resolver, err := resolver.NewCacheResolver(mr, nil)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit f2c7b7f

Please sign in to comment.