Skip to content

Commit

Permalink
Merge pull request #107 from suyuan32/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
suyuan32 authored May 31, 2023
2 parents 1c118ac + 203d9c9 commit 52942f4
Show file tree
Hide file tree
Showing 39 changed files with 1,759 additions and 530 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ jobs:
run: |
go mod verify
go mod download
go test -v -race ./...
go test ./...
cd tools/goctl && go build -v goctl.go
13 changes: 5 additions & 8 deletions core/conf/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func TestConfigJson(t *testing.T) {
"c": "${FOO}",
"d": "abcd!@#$112"
}`
t.Setenv("FOO", "2")

for _, test := range tests {
test := test
t.Run(test, func(t *testing.T) {
os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
tmpfile, err := createTempFile(test, text)
assert.Nil(t, err)
defer os.Remove(tmpfile)
Expand Down Expand Up @@ -82,8 +82,7 @@ b = 1
c = "${FOO}"
d = "abcd!@#$112"
`
os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
t.Setenv("FOO", "2")
tmpfile, err := createTempFile(".toml", text)
assert.Nil(t, err)
defer os.Remove(tmpfile)
Expand Down Expand Up @@ -208,8 +207,7 @@ b = 1
c = "${FOO}"
d = "abcd!@#112"
`
os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
t.Setenv("FOO", "2")
tmpfile, err := createTempFile(".toml", text)
assert.Nil(t, err)
defer os.Remove(tmpfile)
Expand Down Expand Up @@ -240,11 +238,10 @@ func TestConfigJsonEnv(t *testing.T) {
"c": "${FOO}",
"d": "abcd!@#$a12 3"
}`
t.Setenv("FOO", "2")
for _, test := range tests {
test := test
t.Run(test, func(t *testing.T) {
os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
tmpfile, err := createTempFile(test, text)
assert.Nil(t, err)
defer os.Remove(tmpfile)
Expand Down
3 changes: 1 addition & 2 deletions core/conf/properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ func TestPropertiesEnv(t *testing.T) {
assert.Nil(t, err)
defer os.Remove(tmpfile)

os.Setenv("FOO", "2")
defer os.Unsetenv("FOO")
t.Setenv("FOO", "2")

props, err := LoadProperties(tmpfile, UseEnv())
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/executors/bulkexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestBulkExecutorFlush(t *testing.T) {
wait.Wait()
}

func TestBuldExecutorFlushSlowTasks(t *testing.T) {
func TestBulkExecutorFlushSlowTasks(t *testing.T) {
const total = 1500
lock := new(sync.Mutex)
result := make([]any, 0, 10000)
Expand Down
16 changes: 8 additions & 8 deletions core/executors/periodicalexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,23 @@ func TestPeriodicalExecutor_FlushPanic(t *testing.T) {

func TestPeriodicalExecutor_Wait(t *testing.T) {
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []any) {
executor := NewBulkExecutor(func(tasks []any) {
lock.Lock()
defer lock.Unlock()
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(time.Second))
for i := 0; i < 10; i++ {
executer.Add(1)
executor.Add(1)
}
executer.Flush()
executer.Wait()
executor.Flush()
executor.Wait()
}

func TestPeriodicalExecutor_WaitFast(t *testing.T) {
const total = 3
var cnt int
var lock sync.Mutex
executer := NewBulkExecutor(func(tasks []any) {
executor := NewBulkExecutor(func(tasks []any) {
defer func() {
cnt++
}()
Expand All @@ -193,10 +193,10 @@ func TestPeriodicalExecutor_WaitFast(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}, WithBulkTasks(1), WithBulkInterval(10*time.Millisecond))
for i := 0; i < total; i++ {
executer.Add(2)
executor.Add(2)
}
executer.Flush()
executer.Wait()
executor.Flush()
executor.Wait()
assert.Equal(t, total, cnt)
}

Expand Down
12 changes: 6 additions & 6 deletions core/fs/temps.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@ import (
// The file is kept as open, the caller should close the file handle,
// and remove the file by name.
func TempFileWithText(text string) (*os.File, error) {
tmpfile, err := os.CreateTemp(os.TempDir(), hash.Md5Hex([]byte(text)))
tmpFile, err := os.CreateTemp(os.TempDir(), hash.Md5Hex([]byte(text)))
if err != nil {
return nil, err
}

if err := os.WriteFile(tmpfile.Name(), []byte(text), os.ModeTemporary); err != nil {
if err := os.WriteFile(tmpFile.Name(), []byte(text), os.ModeTemporary); err != nil {
return nil, err
}

return tmpfile, nil
return tmpFile, nil
}

// TempFilenameWithText creates the file with the given content,
// and returns the filename (full path).
// The caller should remove the file after use.
func TempFilenameWithText(text string) (string, error) {
tmpfile, err := TempFileWithText(text)
tmpFile, err := TempFileWithText(text)
if err != nil {
return "", err
}

filename := tmpfile.Name()
if err = tmpfile.Close(); err != nil {
filename := tmpFile.Name()
if err = tmpFile.Close(); err != nil {
return "", err
}

Expand Down
80 changes: 74 additions & 6 deletions core/fx/retry.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,87 @@
package fx

import "github.com/zeromicro/go-zero/core/errorx"
import (
"context"
"errors"
"time"

"github.com/zeromicro/go-zero/core/errorx"
)

const defaultRetryTimes = 3

var errTimeout = errors.New("retry timeout")

type (
// RetryOption defines the method to customize DoWithRetry.
RetryOption func(*retryOptions)

retryOptions struct {
times int
times int
interval time.Duration
timeout time.Duration
}
)

// DoWithRetry runs fn, and retries if failed. Default to retry 3 times.
// Note that if the fn function accesses global variables outside the function
// and performs modification operations, it is best to lock them,
// otherwise there may be data race issues
func DoWithRetry(fn func() error, opts ...RetryOption) error {
return retry(func(errChan chan error, retryCount int) {
errChan <- fn()
}, opts...)
}

// DoWithRetryCtx runs fn, and retries if failed. Default to retry 3 times.
// fn retryCount indicates the current number of retries, starting from 0
// Note that if the fn function accesses global variables outside the function
// and performs modification operations, it is best to lock them,
// otherwise there may be data race issues
func DoWithRetryCtx(ctx context.Context, fn func(ctx context.Context, retryCount int) error,
opts ...RetryOption) error {
return retry(func(errChan chan error, retryCount int) {
errChan <- fn(ctx, retryCount)
}, opts...)
}

func retry(fn func(errChan chan error, retryCount int), opts ...RetryOption) error {
options := newRetryOptions()
for _, opt := range opts {
opt(options)
}

var berr errorx.BatchError
var cancelFunc context.CancelFunc
ctx := context.Background()
if options.timeout > 0 {
ctx, cancelFunc = context.WithTimeout(ctx, options.timeout)
defer cancelFunc()
}

errChan := make(chan error, 1)
for i := 0; i < options.times; i++ {
if err := fn(); err != nil {
berr.Add(err)
} else {
return nil
go fn(errChan, i)

select {
case err := <-errChan:
if err != nil {
berr.Add(err)
} else {
return nil
}
case <-ctx.Done():
berr.Add(errTimeout)
return berr.Err()
}

if options.interval > 0 {
select {
case <-ctx.Done():
berr.Add(errTimeout)
return berr.Err()
case <-time.After(options.interval):
}
}
}

Expand All @@ -39,6 +95,18 @@ func WithRetry(times int) RetryOption {
}
}

func WithInterval(interval time.Duration) RetryOption {
return func(options *retryOptions) {
options.interval = interval
}
}

func WithTimeout(timeout time.Duration) RetryOption {
return func(options *retryOptions) {
options.timeout = timeout
}
}

func newRetryOptions() *retryOptions {
return &retryOptions{
times: defaultRetryTimes,
Expand Down
92 changes: 83 additions & 9 deletions core/fx/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package fx

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -12,31 +14,103 @@ func TestRetry(t *testing.T) {
return errors.New("any")
}))

var times int
times1 := 0
assert.Nil(t, DoWithRetry(func() error {
times++
if times == defaultRetryTimes {
times1++
if times1 == defaultRetryTimes {
return nil
}
return errors.New("any")
}))

times = 0
times2 := 0
assert.NotNil(t, DoWithRetry(func() error {
times++
if times == defaultRetryTimes+1 {
times2++
if times2 == defaultRetryTimes+1 {
return nil
}
return errors.New("any")
}))

total := 2 * defaultRetryTimes
times = 0
times3 := 0
assert.Nil(t, DoWithRetry(func() error {
times++
if times == total {
times3++
if times3 == total {
return nil
}
return errors.New("any")
}, WithRetry(total)))
}

func TestRetryWithTimeout(t *testing.T) {
assert.Nil(t, DoWithRetry(func() error {
return nil
}, WithTimeout(time.Millisecond*500)))

times1 := 0
assert.Nil(t, DoWithRetry(func() error {
times1++
if times1 == 1 {
return errors.New("any ")
}
time.Sleep(time.Millisecond * 150)
return nil
}, WithTimeout(time.Millisecond*250)))

total := defaultRetryTimes
times2 := 0
assert.Nil(t, DoWithRetry(func() error {
times2++
if times2 == total {
return nil
}
time.Sleep(time.Millisecond * 50)
return errors.New("any")
}, WithTimeout(time.Millisecond*50*(time.Duration(total)+2))))

assert.NotNil(t, DoWithRetry(func() error {
return errors.New("any")
}, WithTimeout(time.Millisecond*250)))
}

func TestRetryWithInterval(t *testing.T) {
times1 := 0
assert.NotNil(t, DoWithRetry(func() error {
times1++
if times1 == 1 {
return errors.New("any")
}
time.Sleep(time.Millisecond * 150)
return nil
}, WithTimeout(time.Millisecond*250), WithInterval(time.Millisecond*150)))

times2 := 0
assert.NotNil(t, DoWithRetry(func() error {
times2++
if times2 == 2 {
return nil
}
time.Sleep(time.Millisecond * 150)
return errors.New("any ")
}, WithTimeout(time.Millisecond*250), WithInterval(time.Millisecond*150)))

}

func TestRetryCtx(t *testing.T) {
assert.NotNil(t, DoWithRetryCtx(context.Background(), func(ctx context.Context, retryCount int) error {
if retryCount == 0 {
return errors.New("any")
}
time.Sleep(time.Millisecond * 150)
return nil
}, WithTimeout(time.Millisecond*250), WithInterval(time.Millisecond*150)))

assert.NotNil(t, DoWithRetryCtx(context.Background(), func(ctx context.Context, retryCount int) error {
if retryCount == 1 {
return nil
}
time.Sleep(time.Millisecond * 150)
return errors.New("any ")
}, WithTimeout(time.Millisecond*250), WithInterval(time.Millisecond*150)))
}
Loading

0 comments on commit 52942f4

Please sign in to comment.