diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 54ce8b96e..000f1d6c1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -34,26 +34,41 @@ jobs: cache: false # don't use cache for self-hosted runners - name: Unit Test - run: go test -race -covermode=atomic -coverprofile=coverage.txt ./... - - - name: Codecov - run: bash <(curl -s https://codecov.io/bash) + run: go test -race ./... ut-windows: strategy: matrix: version: ["1.20", "1.21", "1.22", "1.23"] runs-on: windows-latest + env: # Fixes https://github.com/actions/setup-go/issues/240 + GOMODCACHE: 'D:\go\pkg\mod' + GOCACHE: 'D:\go\go-build' steps: - uses: actions/checkout@v4 - name: Set up Go uses: actions/setup-go@v5 with: go-version: ${{ matrix.version }} + + - name: Unit Test + run: go test -race ./... + + code-cov: + runs-on: [self-hosted, X64] + steps: + - uses: actions/checkout@v4 + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: stable cache: false # don't use cache for self-hosted runners - name: Unit Test - run: go test -race -covermode=atomic ./... + run: go test -covermode=atomic -coverprofile=coverage.txt ./... + + - name: Codecov + run: bash <(curl -s https://codecov.io/bash) hz-test-unix: runs-on: [ self-hosted, X64 ] diff --git a/.gitignore b/.gitignore index 338957806..d4c580190 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,8 @@ pkg/app/fs.go.hertz.gz coverage.txt coverage.out + +# test benchmark tmp output +cpu.out +mem.out +*.test diff --git a/go.mod b/go.mod index cc537142f..65cd910d6 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/bytedance/gopkg v0.1.0 github.com/bytedance/mockey v1.2.12 github.com/bytedance/sonic v1.12.0 - github.com/cloudwego/netpoll v0.6.2 + github.com/cloudwego/netpoll v0.6.4 github.com/fsnotify/fsnotify v1.5.4 github.com/tidwall/gjson v1.14.4 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c diff --git a/go.sum b/go.sum index 446891e45..98999c738 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/cloudwego/netpoll v0.6.2 h1:+KdILv5ATJU+222wNNXpHapYaBeRvvL8qhJyhcxRxrQ= github.com/cloudwego/netpoll v0.6.2/go.mod h1:kaqvfZ70qd4T2WtIIpCOi5Cxyob8viEpzLhCrTrz3HM= +github.com/cloudwego/netpoll v0.6.4 h1:z/dA4sOTUQof6zZIO4QNnLBXsDFFFEos9OOGloR6kno= +github.com/cloudwego/netpoll v0.6.4/go.mod h1:BtM+GjKTdwKoC8IOzD08/+8eEn2gYoiNLipFca6BVXQ= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/app/client/client.go b/pkg/app/client/client.go index 614b5b5c7..e75009671 100644 --- a/pkg/app/client/client.go +++ b/pkg/app/client/client.go @@ -551,33 +551,29 @@ func (c *Client) CloseIdleConnections() { } func (c *Client) mCleaner() { - mustStop := false - for { time.Sleep(10 * time.Second) - c.mLock.Lock() - for k, v := range c.m { - shouldRemove := v.ShouldRemove() - - if shouldRemove { - delete(c.m, k) - if f, ok := v.(io.Closer); ok { - err := f.Close() - if err != nil { - hlog.Warnf("clean hostclient error, addr: %s, err: %s", k, err.Error()) - } - } - } - } - if len(c.m) == 0 { - mustStop = true + if c.mClean() { + break } - c.mLock.Unlock() + } +} - if mustStop { - break +func (c *Client) mClean() bool { + c.mLock.Lock() + defer c.mLock.Unlock() + for k, v := range c.m { + if v.ShouldRemove() { + delete(c.m, k) + if f, ok := v.(io.Closer); ok { + err := f.Close() + if err != nil { + hlog.Warnf("clean hostclient error, addr: %s, err: %s", k, err.Error()) + } + } } } + return len(c.m) == 0 } func (c *Client) SetClientFactory(cf suite.ClientFactory) { diff --git a/pkg/app/client/client_test.go b/pkg/app/client/client_test.go index ed5959bea..c9332a2ec 100644 --- a/pkg/app/client/client_test.go +++ b/pkg/app/client/client_test.go @@ -57,6 +57,7 @@ import ( "path/filepath" "reflect" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -82,9 +83,58 @@ import ( var errTooManyRedirects = errors.New("too many redirects detected when doing the request") +func assertNil(err error) { + if err != nil { + panic(err) + } +} + +var unixsockPath string + +func TestMain(m *testing.M) { + dir, err := os.MkdirTemp("", "tests-*") + assertNil(err) + unixsockPath = dir + defer os.RemoveAll(dir) + + m.Run() +} + +var nextUnixSockID = int32(10000) + +func nextUnixSock() string { + n := atomic.AddInt32(&nextUnixSockID, 1) + return filepath.Join(unixsockPath, strconv.Itoa(int(n))+".sock") +} + +func waitEngineRunning(e *route.Engine) { + for i := 0; i < 100; i++ { + if e.IsRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + opts := e.GetOptions() + network, addr := opts.Network, opts.Addr + if network == "" { + network = "tcp" + } + for i := 0; i < 100; i++ { + conn, err := net.Dial(network, addr) + if err != nil { + time.Sleep(10 * time.Millisecond) + continue + } + conn.Close() + return + } + + panic("not running") +} + func TestCloseIdleConnections(t *testing.T) { opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10000" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -92,12 +142,7 @@ func TestCloseIdleConnections(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) @@ -124,13 +169,21 @@ func TestCloseIdleConnections(t *testing.T) { if conns := connsLen(); conns > 0 { t.Errorf("expected 0 conns got %d", conns) } + + c.mClean() + + func() { + c.mLock.Lock() + defer c.mLock.Unlock() + if len(c.m) != 0 { + t.Errorf("expected 0 conns got %d", len(c.m)) + } + }() } func TestClientInvalidURI(t *testing.T) { - t.Parallel() - opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10001" + opt.Addr = nextUnixSock() opt.Network = "unix" requests := int64(0) engine := route.NewEngine(opt) @@ -141,12 +194,7 @@ func TestClientInvalidURI(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) req, res := protocol.AcquireRequest(), protocol.AcquireResponse() @@ -166,10 +214,8 @@ func TestClientInvalidURI(t *testing.T) { } func TestClientGetWithBody(t *testing.T) { - t.Parallel() - opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10002" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) engine.GET("/", func(c context.Context, ctx *app.RequestContext) { @@ -180,12 +226,7 @@ func TestClientGetWithBody(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) req, res := protocol.AcquireRequest(), protocol.AcquireResponse() @@ -206,10 +247,8 @@ func TestClientGetWithBody(t *testing.T) { } func TestClientPostBodyStream(t *testing.T) { - t.Parallel() - opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10102" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) engine.POST("/", func(c context.Context, ctx *app.RequestContext) { @@ -220,12 +259,7 @@ func TestClientPostBodyStream(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) cStream, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil)), WithResponseBodyStream(true)) args := &protocol.Args{} @@ -244,8 +278,6 @@ func TestClientPostBodyStream(t *testing.T) { } func TestClientURLAuth(t *testing.T) { - t.Parallel() - cases := map[string]string{ "foo:bar@": "Basic Zm9vOmJhcg==", "foo:@": "Basic Zm9vOg==", @@ -256,7 +288,7 @@ func TestClientURLAuth(t *testing.T) { ch := make(chan string, 1) opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10003" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) engine.GET("/foo/bar", func(c context.Context, ctx *app.RequestContext) { @@ -266,12 +298,7 @@ func TestClientURLAuth(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) for up, expected := range cases { @@ -293,7 +320,7 @@ func TestClientURLAuth(t *testing.T) { func TestClientNilResp(t *testing.T) { opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10004" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -303,12 +330,7 @@ func TestClientNilResp(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) @@ -324,7 +346,6 @@ func TestClientNilResp(t *testing.T) { } func TestClientParseConn(t *testing.T) { - t.Parallel() opt := config.NewOptions([]config.Option{}) opt.Addr = "127.0.0.1:10005" engine := route.NewEngine(opt) @@ -334,12 +355,7 @@ func TestClientParseConn(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) req, res := protocol.AcquireRequest(), protocol.AcquireResponse() @@ -365,9 +381,8 @@ func TestClientParseConn(t *testing.T) { } func TestClientPostArgs(t *testing.T) { - t.Parallel() opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10006" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) engine.POST("/", func(c context.Context, ctx *app.RequestContext) { @@ -381,12 +396,8 @@ func TestClientPostArgs(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) + c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) req, res := protocol.AcquireRequest(), protocol.AcquireResponse() defer func() { @@ -408,10 +419,8 @@ func TestClientPostArgs(t *testing.T) { } func TestClientHeaderCase(t *testing.T) { - t.Parallel() - opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10007" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) engine.GET("/", func(c context.Context, ctx *app.RequestContext) { @@ -428,7 +437,7 @@ func TestClientHeaderCase(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(time.Second) + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, time.Second, nil)), WithDisableHeaderNamesNormalizing(true)) code, body, err := c.Get(context.Background(), nil, "http://example.com") @@ -442,32 +451,32 @@ func TestClientHeaderCase(t *testing.T) { } func TestClientReadTimeout(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode") - } - opt := config.NewOptions([]config.Option{}) - opt.Addr = "localhost:10024" + opt.Addr = nextUnixSock() + opt.Network = "unix" engine := route.NewEngine(opt) + readtimeout := 50 * time.Millisecond + sleeptime := 75 * time.Millisecond // must > readtimeout + engine.GET("/normal", func(c context.Context, ctx *app.RequestContext) { ctx.String(201, "ok") }) engine.GET("/timeout", func(c context.Context, ctx *app.RequestContext) { - time.Sleep(time.Second * 60) + time.Sleep(sleeptime) ctx.String(202, "timeout ok") }) go engine.Run() defer func() { engine.Close() }() - time.Sleep(time.Second * 1) + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ - ReadTimeout: time.Second * 4, + ReadTimeout: readtimeout, RetryConfig: &retry.Config{MaxAttemptTimes: 1}, - Dialer: standard.NewDialer(), + Dialer: newMockDialerWithCustomFunc(opt.Network, opt.Addr, readtimeout, nil), }, Addr: opt.Addr, } @@ -475,7 +484,7 @@ func TestClientReadTimeout(t *testing.T) { req := protocol.AcquireRequest() res := protocol.AcquireResponse() - req.SetRequestURI("http://" + opt.Addr + "/normal") + req.SetRequestURI("http://example.com/normal") req.Header.SetMethod(consts.MethodGet) // Setting Connection: Close will make the connection be returned to the pool. @@ -485,48 +494,36 @@ func TestClientReadTimeout(t *testing.T) { t.Fatal(err) } - protocol.ReleaseRequest(req) - protocol.ReleaseResponse(res) - - done := make(chan struct{}) - go func() { - req := protocol.AcquireRequest() - res := protocol.AcquireResponse() - - req.SetRequestURI("http://" + opt.Addr + "/timeout") - req.Header.SetMethod(consts.MethodGet) - req.SetConnectionClose() + req.Reset() + req.SetRequestURI("http://example.com/timeout") + req.Header.SetMethod(consts.MethodGet) + req.SetConnectionClose() + res.Reset() - if err := c.Do(context.Background(), req, res); !errors.Is(err, errs.ErrTimeout) { - if err == nil { - t.Errorf("expected ErrTimeout got nil, req url: %s, read resp body: %s, status: %d", string(req.URI().FullURI()), string(res.Body()), res.StatusCode()) - } else { - if !strings.Contains(err.Error(), "timeout") { - t.Errorf("expected ErrTimeout got %#v", err) - } + t0 := time.Now() + err := c.Do(context.Background(), req, res) + t1 := time.Now() + if !errors.Is(err, errs.ErrTimeout) { + if err == nil { + t.Errorf("expected ErrTimeout got nil, req url: %s, read resp body: %s, status: %d", string(req.URI().FullURI()), string(res.Body()), res.StatusCode()) + } else { + if !strings.Contains(err.Error(), "timeout") { + t.Errorf("expected ErrTimeout got %#v", err) } } - - protocol.ReleaseRequest(req) - protocol.ReleaseResponse(res) - close(done) - }() - - select { - case <-done: - // It is abnormal when waiting time exceeds the value of readTimeout times the number of retries. - // Give it extra 2 seconds just to be sure. - case <-time.After(c.ReadTimeout*time.Duration(c.RetryConfig.MaxAttemptTimes) + time.Second*2): - t.Fatal("Client.ReadTimeout didn't work") + } + protocol.ReleaseRequest(req) + protocol.ReleaseResponse(res) + if d := t1.Sub(t0) - readtimeout; d > readtimeout/2 { + t.Errorf("timeout more than expected: %v", d) + } else { + t.Log("latency", d) } } func TestClientDefaultUserAgent(t *testing.T) { - t.Parallel() - opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10009" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -537,12 +534,7 @@ func TestClientDefaultUserAgent(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) req := protocol.AcquireRequest() @@ -561,11 +553,8 @@ func TestClientDefaultUserAgent(t *testing.T) { } func TestClientSetUserAgent(t *testing.T) { - t.Parallel() - opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10010" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -576,12 +565,7 @@ func TestClientSetUserAgent(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) userAgent := "I'm not hertz" c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, time.Second, nil)), WithName(userAgent)) @@ -601,7 +585,7 @@ func TestClientSetUserAgent(t *testing.T) { func TestClientNoUserAgent(t *testing.T) { opt := config.NewOptions([]config.Option{}) - opt.Addr = "unix-test-10011" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -612,12 +596,8 @@ func TestClientNoUserAgent(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) + c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, time.Second, nil)), WithDialTimeout(1*time.Second), WithNoDefaultUserAgentHeader(true)) req := protocol.AcquireRequest() @@ -635,8 +615,6 @@ func TestClientNoUserAgent(t *testing.T) { } func TestClientDoWithCustomHeaders(t *testing.T) { - t.Parallel() - ch := make(chan error) uri := "/foo/bar/baz?a=b&cd=12" headers := map[string]string{ @@ -647,8 +625,7 @@ func TestClientDoWithCustomHeaders(t *testing.T) { } body := "request body" opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10012" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -698,12 +675,7 @@ func TestClientDoWithCustomHeaders(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) // make sure that the client sends all the request headers and body. c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, 1*time.Second, nil))) @@ -731,10 +703,8 @@ func TestClientDoWithCustomHeaders(t *testing.T) { } func TestClientDoTimeoutDisablePathNormalizing(t *testing.T) { - t.Parallel() opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10013" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -748,12 +718,8 @@ func TestClientDoTimeoutDisablePathNormalizing(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) + c, _ := NewClient(WithDialer(newMockDialerWithCustomFunc(opt.Network, opt.Addr, time.Second, nil)), WithDisablePathNormalizing(true)) urlWithEncodedPath := "http://example.com/encoded/Y%2BY%2FY%3D/stuff" @@ -773,14 +739,11 @@ func TestClientDoTimeoutDisablePathNormalizing(t *testing.T) { } func TestHostClientPendingRequests(t *testing.T) { - t.Parallel() - const concurrency = 10 doneCh := make(chan struct{}) readyCh := make(chan struct{}, concurrency) opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10014" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -792,7 +755,7 @@ func TestHostClientPendingRequests(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(time.Second) + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ @@ -867,8 +830,7 @@ func TestHostClientMaxConnsWithDeadline(t *testing.T) { wg sync.WaitGroup ) opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10015" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -883,12 +845,7 @@ func TestHostClientMaxConnsWithDeadline(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ @@ -912,7 +869,7 @@ func TestHostClientMaxConnsWithDeadline(t *testing.T) { for { if err := c.DoDeadline(context.Background(), req, resp, time.Now().Add(timeout)); err != nil { if err.Error() == errs.ErrNoFreeConns.Error() { - time.Sleep(time.Millisecond * 500) + time.Sleep(10 * time.Millisecond) continue } t.Errorf("unexpected error: %s", err) @@ -938,12 +895,9 @@ func TestHostClientMaxConnsWithDeadline(t *testing.T) { } func TestHostClientMaxConnDuration(t *testing.T) { - t.Parallel() - connectionCloseCount := uint32(0) opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10016" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -957,12 +911,7 @@ func TestHostClientMaxConnDuration(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ @@ -992,10 +941,8 @@ func TestHostClientMaxConnDuration(t *testing.T) { } func TestHostClientMultipleAddrs(t *testing.T) { - t.Parallel() opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10017" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -1007,12 +954,7 @@ func TestHostClientMultipleAddrs(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) dialsCount := make(map[string]int) c := &http1.HostClient{ @@ -1048,10 +990,8 @@ func TestHostClientMultipleAddrs(t *testing.T) { } func TestClientFollowRedirects(t *testing.T) { - t.Parallel() opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10018" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -1079,7 +1019,7 @@ func TestClientFollowRedirects(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(time.Second * 2) + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ @@ -1157,8 +1097,7 @@ func TestHostClientMaxConnWaitTimeoutSuccess(t *testing.T) { wg sync.WaitGroup ) opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10019" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -1173,12 +1112,7 @@ func TestHostClientMaxConnWaitTimeoutSuccess(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ @@ -1231,8 +1165,7 @@ func TestHostClientMaxConnWaitTimeoutError(t *testing.T) { wg sync.WaitGroup ) opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10020" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -1247,12 +1180,7 @@ func TestHostClientMaxConnWaitTimeoutError(t *testing.T) { defer func() { engine.Close() }() - for { - time.Sleep(1 * time.Second) - if engine.IsRunning() { - break - } - } + waitEngineRunning(engine) c := &http1.HostClient{ ClientOptions: &http1.ClientOptions{ @@ -1317,7 +1245,7 @@ func TestNewClient(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) client, err := NewClient(WithDialTimeout(2 * time.Second)) if err != nil { @@ -1345,7 +1273,7 @@ func TestUseShortConnection(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) c, _ := NewClient(WithKeepAlive(false)) var wg sync.WaitGroup @@ -1392,8 +1320,8 @@ func TestPostWithFormData(t *testing.T) { defer func() { engine.Close() }() + waitEngineRunning(engine) - time.Sleep(1 * time.Second) client, _ := NewClient() req := protocol.AcquireRequest() rsp := protocol.AcquireResponse() @@ -1446,8 +1374,8 @@ func TestPostWithMultipartField(t *testing.T) { defer func() { engine.Close() }() + waitEngineRunning(engine) - time.Sleep(1 * time.Second) client, _ := NewClient() req := protocol.AcquireRequest() rsp := protocol.AcquireResponse() @@ -1492,8 +1420,8 @@ func TestSetFiles(t *testing.T) { defer func() { engine.Close() }() + waitEngineRunning(engine) - time.Sleep(1 * time.Second) client, _ := NewClient() req := protocol.AcquireRequest() rsp := protocol.AcquireResponse() @@ -1543,8 +1471,8 @@ func TestSetMultipartFields(t *testing.T) { defer func() { engine.Close() }() + waitEngineRunning(engine) - time.Sleep(1 * time.Second) client, _ := NewClient(WithDialTimeout(50 * time.Millisecond)) req := protocol.AcquireRequest() rsp := protocol.AcquireResponse() @@ -1598,7 +1526,7 @@ func TestClientReadResponseBodyStream(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) client, _ := NewClient(WithResponseBodyStream(true)) req, resp := protocol.AcquireRequest(), protocol.AcquireResponse() @@ -1651,7 +1579,8 @@ func TestWithBasicAuth(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + client, _ := NewClient() req := protocol.AcquireRequest() rsp := protocol.AcquireResponse() @@ -2022,7 +1951,7 @@ func TestClientReadResponseBodyStreamWithDoubleRequest(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) client, _ := NewClient(WithResponseBodyStream(true)) req, resp := protocol.AcquireRequest(), protocol.AcquireResponse() @@ -2095,7 +2024,7 @@ func TestClientReadResponseBodyStreamWithConnectionClose(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) client, _ := NewClient(WithResponseBodyStream(true)) @@ -2150,7 +2079,6 @@ func (m *mockDialer) DialConnection(network, address string, timeout time.Durati } func TestClientRetry(t *testing.T) { - t.Parallel() client, err := NewClient( // Default dial function performs different in different os. So unit the performance of dial function. WithDialFunc(func(addr string) (network.Conn, error) { @@ -2368,14 +2296,11 @@ func TestClientDialerName(t *testing.T) { } func TestClientDoWithDialFunc(t *testing.T) { - t.Parallel() - ch := make(chan error, 1) uri := "/foo/bar/baz" body := "request body" opt := config.NewOptions([]config.Option{}) - - opt.Addr = "unix-test-10021" + opt.Addr = nextUnixSock() opt.Network = "unix" engine := route.NewEngine(opt) @@ -2405,7 +2330,7 @@ func TestClientDoWithDialFunc(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) c, _ := NewClient(WithDialFunc(func(addr string) (network.Conn, error) { return dialer.DialConnection(opt.Network, opt.Addr, time.Second, nil) @@ -2442,11 +2367,13 @@ func TestClientState(t *testing.T) { defer func() { engine.Close() }() + waitEngineRunning(engine) - time.Sleep(1 * time.Second) - + var wg sync.WaitGroup + wg.Add(2) state := int32(0) client, _ := NewClient( + WithMaxIdleConnDuration(75*time.Millisecond), WithConnStateObserve(func(hcs config.HostClientState) { switch atomic.LoadInt32(&state) { case int32(0): @@ -2454,19 +2381,18 @@ func TestClientState(t *testing.T) { assert.DeepEqual(t, 1, hcs.ConnPoolState().PoolConnNum) assert.DeepEqual(t, "127.0.0.1:10037", hcs.ConnPoolState().Addr) atomic.StoreInt32(&state, int32(1)) + wg.Done() case int32(1): assert.DeepEqual(t, 0, hcs.ConnPoolState().TotalConnNum) assert.DeepEqual(t, 0, hcs.ConnPoolState().PoolConnNum) assert.DeepEqual(t, "127.0.0.1:10037", hcs.ConnPoolState().Addr) atomic.StoreInt32(&state, int32(2)) - return - case int32(2): - t.Fatal("It shouldn't go to here") + wg.Done() } - }, time.Second*9)) - + }, 50*time.Millisecond)) client.Get(context.Background(), nil, "http://127.0.0.1:10037") - time.Sleep(time.Second * 22) + wg.Wait() + assert.DeepEqual(t, int32(2), atomic.LoadInt32(&state)) } func TestClientRetryErr(t *testing.T) { @@ -2486,7 +2412,8 @@ func TestClientRetryErr(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + c, _ := NewClient(WithRetryConfig(retry.WithMaxAttemptTimes(3))) _, _, err := c.Get(context.Background(), nil, "http://127.0.0.1:10136/ping") assert.Nil(t, err) @@ -2511,7 +2438,8 @@ func TestClientRetryErr(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + c, _ := NewClient(WithRetryConfig(retry.WithMaxAttemptTimes(3))) c.SetRetryIfFunc(func(req *protocol.Request, resp *protocol.Response, err error) bool { return resp.StatusCode() == 502 diff --git a/pkg/app/client/loadbalance/lbcache_test.go b/pkg/app/client/loadbalance/lbcache_test.go index c1e314f59..4e3fe6f62 100644 --- a/pkg/app/client/loadbalance/lbcache_test.go +++ b/pkg/app/client/loadbalance/lbcache_test.go @@ -19,6 +19,7 @@ package loadbalance import ( "context" "fmt" + "strconv" "sync/atomic" "testing" "time" @@ -114,9 +115,11 @@ func TestBalancerRefresh(t *testing.T) { }, NameFunc: func() string { return t.Name() }, } + opts := DefaultLbOpts + opts.RefreshInterval = 30 * time.Millisecond blf := NewBalancerFactory(Config{ Balancer: NewWeightedBalancer(), - LbOpts: DefaultLbOpts, + LbOpts: opts, Resolver: r, }) req := &protocol.Request{} @@ -128,12 +131,44 @@ func TestBalancerRefresh(t *testing.T) { addr, err = blf.GetInstance(context.Background(), req) assert.Assert(t, err == nil, err) assert.Assert(t, addr.Address().String() == "127.0.0.1:8888") - time.Sleep(6 * time.Second) + time.Sleep(2 * opts.RefreshInterval) addr, err = blf.GetInstance(context.Background(), req) assert.Assert(t, err == nil, err) assert.Assert(t, addr.Address().String() == "127.0.0.1:8889") } +func TestBalancerExpires(t *testing.T) { + n := int32(1000) + r := &discovery.SynthesizedResolver{ + TargetFunc: func(ctx context.Context, target *discovery.TargetInfo) string { + return target.Host + }, + ResolveFunc: func(ctx context.Context, key string) (discovery.Result, error) { + ins := discovery.NewInstance("tcp", "127.0.0.1:"+strconv.Itoa(int(atomic.AddInt32(&n, 1))), 10, nil) + return discovery.Result{CacheKey: "svc1", Instances: []discovery.Instance{ins}}, nil + }, + NameFunc: func() string { return t.Name() }, + } + opts := DefaultLbOpts + opts.ExpireInterval = 30 * time.Millisecond + blf := NewBalancerFactory(Config{ + Balancer: NewWeightedBalancer(), + LbOpts: opts, + Resolver: r, + }) + req := &protocol.Request{} + req.SetHost("svc1") + addr1, err := blf.GetInstance(context.Background(), req) + assert.Assert(t, err == nil, err) + addr2, err := blf.GetInstance(context.Background(), req) + assert.Assert(t, err == nil, err) + assert.Assert(t, addr1.Address().String() == addr2.Address().String()) + time.Sleep(3 * opts.ExpireInterval) + addr3, err := blf.GetInstance(context.Background(), req) + assert.Assert(t, err == nil, err) + assert.Assert(t, addr3.Address().String() != addr2.Address().String()) +} + func TestCacheKey(t *testing.T) { uniqueKey := cacheKey("hello", "world", Options{RefreshInterval: 15 * time.Second, ExpireInterval: 5 * time.Minute}) assert.Assert(t, uniqueKey == "hello|world|{15s 5m0s}") diff --git a/pkg/app/client/loadbalance/weight_random_test.go b/pkg/app/client/loadbalance/weight_random_test.go index 1cab4cb0f..720d06250 100644 --- a/pkg/app/client/loadbalance/weight_random_test.go +++ b/pkg/app/client/loadbalance/weight_random_test.go @@ -55,12 +55,11 @@ func TestWeightedBalancer(t *testing.T) { // multi instances, weightSum > 0 insList = []discovery.Instance{ - discovery.NewInstance("tcp", "127.0.0.1:8881", 10, nil), - discovery.NewInstance("tcp", "127.0.0.1:8882", 20, nil), - discovery.NewInstance("tcp", "127.0.0.1:8883", 50, nil), - discovery.NewInstance("tcp", "127.0.0.1:8884", 100, nil), - discovery.NewInstance("tcp", "127.0.0.1:8885", 200, nil), - discovery.NewInstance("tcp", "127.0.0.1:8886", 500, nil), + discovery.NewInstance("tcp", "127.0.0.1:8881", 100, nil), + discovery.NewInstance("tcp", "127.0.0.1:8882", 200, nil), + discovery.NewInstance("tcp", "127.0.0.1:8883", 300, nil), + discovery.NewInstance("tcp", "127.0.0.1:8884", 400, nil), + discovery.NewInstance("tcp", "127.0.0.1:8885", 500, nil), } var weightSum int @@ -69,7 +68,7 @@ func TestWeightedBalancer(t *testing.T) { weightSum += weight } - n := 10000000 + n := 1000000 pickedStat := map[int]int{} e = discovery.Result{ Instances: insList, @@ -91,7 +90,7 @@ func TestWeightedBalancer(t *testing.T) { expect := float64(weight) / float64(weightSum) * float64(n) actual := float64(pickedStat[weight]) delta := math.Abs(expect - actual) - assert.DeepEqual(t, true, delta/expect < 0.01) + assert.DeepEqual(t, true, delta/expect < 0.05) } // have instances that weight < 0 diff --git a/pkg/app/server/hertz_test.go b/pkg/app/server/hertz_test.go index 09e77dd40..b66043d29 100644 --- a/pkg/app/server/hertz_test.go +++ b/pkg/app/server/hertz_test.go @@ -51,6 +51,35 @@ import ( "github.com/cloudwego/hertz/pkg/route/param" ) +type routeEngine interface { + IsRunning() bool + GetOptions() *config.Options +} + +func waitEngineRunning(e routeEngine) { + for i := 0; i < 100; i++ { + if e.IsRunning() { + break + } + time.Sleep(10 * time.Millisecond) + } + opts := e.GetOptions() + network, addr := opts.Network, opts.Addr + if network == "" { + network = "tcp" + } + for i := 0; i < 100; i++ { + conn, err := net.Dial(network, addr) + if err != nil { + time.Sleep(10 * time.Millisecond) + continue + } + conn.Close() + return + } + panic("not running") +} + func TestHertz_Run(t *testing.T) { hertz := Default(WithHostPorts("127.0.0.1:6666")) hertz.GET("/test", func(c context.Context, ctx *app.RequestContext) { @@ -67,7 +96,7 @@ func TestHertz_Run(t *testing.T) { assert.Assert(t, len(hertz.Handlers) == 1) go hertz.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(hertz) hertz.Close() resp, err := http.Get("http://127.0.0.1:6666/test") @@ -77,9 +106,12 @@ func TestHertz_Run(t *testing.T) { } func TestHertz_GracefulShutdown(t *testing.T) { + handling := make(chan struct{}) + closing := make(chan struct{}) engine := New(WithHostPorts("127.0.0.1:6667")) engine.GET("/test", func(c context.Context, ctx *app.RequestContext) { - time.Sleep(time.Second * 2) + close(handling) + <-closing path := ctx.Request.URI().PathOriginal() ctx.SetBodyString(string(path)) }) @@ -95,12 +127,11 @@ func TestHertz_GracefulShutdown(t *testing.T) { atomic.StoreUint32(&testint2, 2) }) engine.Engine.OnShutdown = append(engine.OnShutdown, func(ctx context.Context) { - time.Sleep(2 * time.Second) atomic.StoreUint32(&testint3, 3) }) go engine.Spin() - time.Sleep(time.Millisecond) + waitEngineRunning(engine) hc := http.Client{Timeout: time.Second} var err error @@ -108,7 +139,7 @@ func TestHertz_GracefulShutdown(t *testing.T) { ch := make(chan struct{}) ch2 := make(chan struct{}) go func() { - ticker := time.NewTicker(time.Millisecond * 100) + ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for range ticker.C { t.Logf("[%v]begin listening\n", time.Now()) @@ -127,14 +158,16 @@ func TestHertz_GracefulShutdown(t *testing.T) { ch <- struct{}{} }() - time.Sleep(time.Second * 1) + <-handling + start := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) t.Logf("[%v]begin shutdown\n", start) engine.Shutdown(ctx) end := time.Now() t.Logf("[%v]end shutdown\n", end) + close(closing) <-ch assert.Nil(t, err) assert.NotNil(t, resp) @@ -161,7 +194,8 @@ func TestLoadHTMLGlob(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + resp, _ := http.Get("http://127.0.0.1:8893/index") assert.DeepEqual(t, consts.StatusOK, resp.StatusCode) b := make([]byte, 100) @@ -188,7 +222,8 @@ func TestLoadHTMLFiles(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + resp, _ := http.Get("http://127.0.0.1:8891/raw") assert.DeepEqual(t, consts.StatusOK, resp.StatusCode) b := make([]byte, 100) @@ -233,7 +268,8 @@ func TestServer_Run(t *testing.T) { ctx.Redirect(consts.StatusMovedPermanently, []byte("http://127.0.0.1:8899/test")) }) go hertz.Run() - time.Sleep(1 * time.Second) + waitEngineRunning(hertz) + resp, err := http.Get("http://127.0.0.1:8899/test") assert.Nil(t, err) assert.DeepEqual(t, consts.StatusOK, resp.StatusCode) @@ -269,7 +305,7 @@ func TestNotAbsolutePath(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) s := "POST ?a=b HTTP/1.1\r\nHost: a.b.c\r\nContent-Length: 5\r\nContent-Type: foo/bar\r\n\r\nabcdef4343" zr := mock.NewZeroCopyReader(s) @@ -311,7 +347,7 @@ func TestNotAbsolutePathWithRawPath(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) s := "POST ?a=b HTTP/1.1\r\nHost: a.b.c\r\nContent-Length: 5\r\nContent-Type: foo/bar\r\n\r\nabcdef4343" zr := mock.NewZeroCopyReader(s) @@ -389,7 +425,8 @@ func TestWithBasePath(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + var r http.Request r.ParseForm() r.Form.Add("xxxxxx", "xxx") @@ -407,7 +444,8 @@ func TestNotEnoughBodySize(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + var r http.Request r.ParseForm() r.Form.Add("xxxxxx", "xxx") @@ -427,7 +465,8 @@ func TestEnoughBodySize(t *testing.T) { defer func() { engine.Close() }() - time.Sleep(1 * time.Second) + waitEngineRunning(engine) + var r http.Request r.ParseForm() r.Form.Add("xxxxxx", "xxx") @@ -549,18 +588,19 @@ func TestParamInconsist(t *testing.T) { } }) go h.Run() - time.Sleep(time.Millisecond * 50) + waitEngineRunning(h) + client, _ := c.NewClient() wg := sync.WaitGroup{} tr := func() { defer wg.Done() - for i := 0; i < 5000; i++ { + for i := 0; i < 500; i++ { client.Get(context.Background(), nil, "http://localhost:10091/test1") } } ti := func() { defer wg.Done() - for i := 0; i < 5000; i++ { + for i := 0; i < 500; i++ { client.Get(context.Background(), nil, "http://localhost:10091/test2") } } @@ -580,7 +620,8 @@ func TestDuplicateReleaseBodyStream(t *testing.T) { c.Response.SetBodyStream(stream, -1) }) go h.Spin() - time.Sleep(time.Second) + waitEngineRunning(h) + client, _ := c.NewClient(c.WithMaxConnsPerHost(1000000), c.WithDialTimeout(time.Minute)) bodyBytes := make([]byte, 102388) index := 0 @@ -616,6 +657,8 @@ func TestDuplicateReleaseBodyStream(t *testing.T) { } func TestServiceRegisterFailed(t *testing.T) { + t.Parallel() // slow test, make it parallel + mockRegErr := errors.New("mock register error") var rCount int32 var drCount int32 @@ -634,39 +677,50 @@ func TestServiceRegisterFailed(t *testing.T) { opts = append(opts, WithHostPorts("127.0.0.1:9222")) srv := New(opts...) srv.Spin() - time.Sleep(2 * time.Second) assert.Assert(t, atomic.LoadInt32(&rCount) == 1) } func TestServiceDeregisterFailed(t *testing.T) { + t.Parallel() // slow test, make it parallel + mockDeregErr := errors.New("mock deregister error") + + var wg sync.WaitGroup + wg.Add(2) // RegisterFunc && DeregisterFunc var rCount int32 var drCount int32 mockRegistry := MockRegistry{ RegisterFunc: func(info *registry.Info) error { + defer wg.Done() atomic.AddInt32(&rCount, 1) return nil }, DeregisterFunc: func(info *registry.Info) error { + defer wg.Done() atomic.AddInt32(&drCount, 1) return mockDeregErr }, } + var opts []config.Option opts = append(opts, WithRegistry(mockRegistry, nil)) opts = append(opts, WithHostPorts("127.0.0.1:9223")) srv := New(opts...) go srv.Spin() - time.Sleep(1 * time.Second) + waitEngineRunning(srv) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() _ = srv.Shutdown(ctx) - time.Sleep(1 * time.Second) + + wg.Wait() assert.Assert(t, atomic.LoadInt32(&rCount) == 1) assert.Assert(t, atomic.LoadInt32(&drCount) == 1) } func TestServiceRegistryInfo(t *testing.T) { + t.Parallel() // slow test, make it parallel + registryInfo := ®istry.Info{ Weight: 100, Tags: map[string]string{"aa": "bb"}, @@ -678,15 +732,20 @@ func TestServiceRegistryInfo(t *testing.T) { assert.Assert(t, len(info.Tags) == len(registryInfo.Tags), info.Tags) assert.Assert(t, info.Tags["aa"] == registryInfo.Tags["aa"], info.Tags) } + + var wg sync.WaitGroup + wg.Add(2) // RegisterFunc && DeregisterFunc var rCount int32 var drCount int32 mockRegistry := MockRegistry{ RegisterFunc: func(info *registry.Info) error { + defer wg.Done() checkInfo(info) atomic.AddInt32(&rCount, 1) return nil }, DeregisterFunc: func(info *registry.Info) error { + defer wg.Done() checkInfo(info) atomic.AddInt32(&drCount, 1) return nil @@ -697,28 +756,36 @@ func TestServiceRegistryInfo(t *testing.T) { opts = append(opts, WithHostPorts("127.0.0.1:9225")) srv := New(opts...) go srv.Spin() - time.Sleep(2 * time.Second) + waitEngineRunning(srv) + ctx, cancel := context.WithTimeout(context.Background(), 0) defer cancel() _ = srv.Shutdown(ctx) - time.Sleep(2 * time.Second) + wg.Wait() assert.Assert(t, atomic.LoadInt32(&rCount) == 1) assert.Assert(t, atomic.LoadInt32(&drCount) == 1) } func TestServiceRegistryNoInitInfo(t *testing.T) { + t.Parallel() // slow test, make it parallel + checkInfo := func(info *registry.Info) { assert.Assert(t, info == nil) } + + var wg sync.WaitGroup + wg.Add(2) // RegisterFunc && DeregisterFunc var rCount int32 var drCount int32 mockRegistry := MockRegistry{ RegisterFunc: func(info *registry.Info) error { + defer wg.Done() checkInfo(info) atomic.AddInt32(&rCount, 1) return nil }, DeregisterFunc: func(info *registry.Info) error { + defer wg.Done() checkInfo(info) atomic.AddInt32(&drCount, 1) return nil @@ -729,11 +796,12 @@ func TestServiceRegistryNoInitInfo(t *testing.T) { opts = append(opts, WithHostPorts("127.0.0.1:9227")) srv := New(opts...) go srv.Spin() - time.Sleep(2 * time.Second) + waitEngineRunning(srv) + ctx, cancel := context.WithTimeout(context.Background(), 0) defer cancel() _ = srv.Shutdown(ctx) - time.Sleep(2 * time.Second) + wg.Wait() assert.Assert(t, atomic.LoadInt32(&rCount) == 1) assert.Assert(t, atomic.LoadInt32(&drCount) == 1) } @@ -758,7 +826,8 @@ func TestReuseCtx(t *testing.T) { }) go h.Spin() - time.Sleep(time.Second) + waitEngineRunning(h) + for i := 0; i < 1000; i++ { _, _, err := c.Get(context.Background(), nil, "http://127.0.0.1:9228/ping") assert.Nil(t, err) @@ -770,9 +839,15 @@ type CloseWithoutResetBuffer interface { } func TestOnprepare(t *testing.T) { + n := int32(0) h1 := New( WithHostPorts("localhost:9333"), WithOnConnect(func(ctx context.Context, conn network.Conn) context.Context { + if atomic.AddInt32(&n, 1) == 1 { + // the 1st connection is from waitEngineRunning + conn.Close() + return ctx + } b, err := conn.Peek(3) assert.Nil(t, err) assert.DeepEqual(t, string(b), "GET") @@ -788,7 +863,8 @@ func TestOnprepare(t *testing.T) { }) go h1.Spin() - time.Sleep(time.Second) + waitEngineRunning(h1) + _, _, err := c.Get(context.Background(), nil, "http://127.0.0.1:9333/ping") assert.DeepEqual(t, "the server closed connection before returning the first response byte. Make sure the server returns 'Connection: close' response header before closing the connection", err.Error()) @@ -802,7 +878,8 @@ func TestOnprepare(t *testing.T) { c.JSON(consts.StatusOK, utils.H{"ping": "pong"}) }) go h2.Spin() - time.Sleep(time.Second) + waitEngineRunning(h2) + _, _, err = c.Get(context.Background(), nil, "http://127.0.0.1:9331/ping") if err == nil { t.Fatalf("err should not be nil") @@ -819,7 +896,8 @@ func TestOnprepare(t *testing.T) { c.JSON(consts.StatusOK, utils.H{"ping": "pong"}) }) go h3.Spin() - time.Sleep(time.Second) + waitEngineRunning(h3) + c.Get(context.Background(), nil, "http://127.0.0.1:9231/ping") } @@ -851,7 +929,7 @@ func TestSilentMode(t *testing.T) { ctx.Write([]byte("hello, world")) }) go h.Spin() - time.Sleep(time.Second) + waitEngineRunning(h) d := standard.NewDialer() conn, _ := d.DialConnection("tcp", "127.0.0.1:9232", 0, nil) @@ -886,7 +964,7 @@ func TestHertzDisableHeaderNamesNormalizing(t *testing.T) { }) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) cli, _ := c.NewClient(c.WithDisableHeaderNamesNormalizing(true)) @@ -917,7 +995,8 @@ func TestBindConfig(t *testing.T) { }) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} _, err := hc.Get("http://127.0.0.1:9332/bind?a=") assert.Nil(t, err) @@ -936,7 +1015,7 @@ func TestBindConfig(t *testing.T) { }) go h2.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h2) _, err = hc.Get("http://127.0.0.1:9448/bind?a=") assert.Nil(t, err) @@ -998,7 +1077,8 @@ func TestCustomBinder(t *testing.T) { }) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} _, err := hc.Get("http://127.0.0.1:9334/bind?a=") assert.Nil(t, err) @@ -1025,7 +1105,8 @@ func TestValidateConfigRegValidateFunc(t *testing.T) { }) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} _, err := hc.Get("http://127.0.0.1:9229/bind?a=2") assert.Nil(t, err) @@ -1110,7 +1191,8 @@ func TestValidateConfigSetSetErrorFactory(t *testing.T) { }) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} _, err := hc.Get("http://127.0.0.1:9666/bind?b=1") assert.Nil(t, err) @@ -1136,7 +1218,8 @@ func TestValidateConfigAndBindConfig(t *testing.T) { }) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} _, err := hc.Get("http://127.0.0.1:9876/bind?a=135") assert.Nil(t, err) @@ -1150,7 +1233,8 @@ func TestWithDisableDefaultDate(t *testing.T) { ) h.GET("/", func(_ context.Context, c *app.RequestContext) {}) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} r, _ := hc.Get("http://127.0.0.1:8321") //nolint:errcheck assert.DeepEqual(t, "", r.Header.Get("Date")) @@ -1163,7 +1247,8 @@ func TestWithDisableDefaultContentType(t *testing.T) { ) h.GET("/", func(_ context.Context, c *app.RequestContext) {}) go h.Spin() - time.Sleep(100 * time.Millisecond) + waitEngineRunning(h) + hc := http.Client{Timeout: time.Second} r, _ := hc.Get("http://127.0.0.1:8324") //nolint:errcheck assert.DeepEqual(t, "", r.Header.Get("Content-Type")) diff --git a/pkg/app/server/hertz_unix_test.go b/pkg/app/server/hertz_unix_test.go index b1f7d700c..7e1d8d18a 100644 --- a/pkg/app/server/hertz_unix_test.go +++ b/pkg/app/server/hertz_unix_test.go @@ -67,7 +67,10 @@ func TestReusePorts(t *testing.T) { go hb.Run() go hc.Run() go hd.Run() - time.Sleep(time.Second) + waitEngineRunning(ha) + waitEngineRunning(hb) + waitEngineRunning(hc) + waitEngineRunning(hd) client, _ := c.NewClient() for i := 0; i < 1000; i++ { @@ -81,7 +84,7 @@ func TestReusePorts(t *testing.T) { func TestHertz_Spin(t *testing.T) { engine := New(WithHostPorts("127.0.0.1:6668")) engine.GET("/test", func(c context.Context, ctx *app.RequestContext) { - time.Sleep(time.Second * 2) + time.Sleep(40 * time.Millisecond) path := ctx.Request.URI().PathOriginal() ctx.SetBodyString(string(path)) }) @@ -93,7 +96,7 @@ func TestHertz_Spin(t *testing.T) { }) go engine.Spin() - time.Sleep(time.Millisecond) + waitEngineRunning(engine) hc := http.Client{Timeout: time.Second} var err error @@ -101,7 +104,7 @@ func TestHertz_Spin(t *testing.T) { ch := make(chan struct{}) ch2 := make(chan struct{}) go func() { - ticker := time.NewTicker(time.Millisecond * 100) + ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for range ticker.C { _, err := hc.Get("http://127.0.0.1:6668/test2") @@ -120,7 +123,7 @@ func TestHertz_Spin(t *testing.T) { ch <- struct{}{} }() - time.Sleep(time.Second * 1) + time.Sleep(20 * time.Millisecond) pid := strconv.Itoa(os.Getpid()) cmd := exec.Command("kill", "-SIGHUP", pid) t.Logf("[%v]begin SIGHUP\n", time.Now()) @@ -131,9 +134,9 @@ func TestHertz_Spin(t *testing.T) { <-ch assert.Nil(t, err) assert.NotNil(t, resp) - assert.DeepEqual(t, uint32(1), atomic.LoadUint32(&testint)) <-ch2 + assert.DeepEqual(t, uint32(1), atomic.LoadUint32(&testint)) } func TestWithSenseClientDisconnection(t *testing.T) { @@ -150,15 +153,16 @@ func TestWithSenseClientDisconnection(t *testing.T) { } }) go h.Spin() - time.Sleep(time.Second) + waitEngineRunning(h) + con, err := net.Dial("tcp", "127.0.0.1:6631") assert.Nil(t, err) _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) assert.Nil(t, err) - time.Sleep(time.Second) + time.Sleep(20 * time.Millisecond) assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(0)) assert.Nil(t, con.Close()) - time.Sleep(time.Second) + time.Sleep(20 * time.Millisecond) assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) } @@ -178,14 +182,15 @@ func TestWithSenseClientDisconnectionAndWithOnConnect(t *testing.T) { } }) go h.Spin() - time.Sleep(time.Second) + waitEngineRunning(h) + con, err := net.Dial("tcp", "127.0.0.1:6632") assert.Nil(t, err) _, err = con.Write([]byte("GET /ping HTTP/1.1\r\nHost: aa\r\n\r\n")) assert.Nil(t, err) - time.Sleep(time.Second) + time.Sleep(20 * time.Millisecond) assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(0)) assert.Nil(t, con.Close()) - time.Sleep(time.Second) + time.Sleep(20 * time.Millisecond) assert.DeepEqual(t, atomic.LoadInt32(&closeFlag), int32(1)) }