Skip to content

Commit

Permalink
Merge pull request #80 from suyuan32/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
suyuan32 authored May 9, 2023
2 parents 3c4a2e9 + 39d8d6d commit 758a84b
Show file tree
Hide file tree
Showing 54 changed files with 337 additions and 180 deletions.
12 changes: 5 additions & 7 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,11 @@ func (c *cluster) watchConnState(cli EtcdClient) {
// DialClient dials an etcd cluster with given endpoints.
func DialClient(endpoints []string) (EtcdClient, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
AutoSyncInterval: autoSyncInterval,
DialTimeout: DialTimeout,
DialKeepAliveTime: dialKeepAliveTime,
DialKeepAliveTimeout: DialTimeout,
RejectOldCluster: true,
PermitWithoutStream: true,
Endpoints: endpoints,
AutoSyncInterval: autoSyncInterval,
DialTimeout: DialTimeout,
RejectOldCluster: true,
PermitWithoutStream: true,
}
if account, ok := GetAccount(endpoints); ok {
cfg.Username = account.User
Expand Down
1 change: 0 additions & 1 deletion core/discov/internal/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const (
autoSyncInterval = time.Minute
coolDownInterval = time.Second
dialTimeout = 5 * time.Second
dialKeepAliveTime = 5 * time.Second
requestTimeout = 3 * time.Second
endpointsSeparator = ","
)
Expand Down
9 changes: 2 additions & 7 deletions core/logx/logtest/logtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (b *Buffer) Bytes() []byte {
func (b *Buffer) Content() string {
var m map[string]interface{}
if err := json.Unmarshal(b.buf.Bytes(), &m); err != nil {
b.t.Error(err)
return ""
}

Expand All @@ -59,12 +58,8 @@ func (b *Buffer) Content() string {
case string:
return val
default:
bs, err := json.Marshal(content)
if err != nil {
b.t.Error(err)
return ""
}

// err is impossible to be not nil, unmarshaled from b.buf.Bytes()
bs, _ := json.Marshal(content)
return string(bs)
}
}
Expand Down
16 changes: 16 additions & 0 deletions core/logx/logtest/logtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,31 @@ func TestCollector(t *testing.T) {
logx.Info(input)
assert.Equal(t, input, c.Content())
assert.Contains(t, c.String(), input)
c.Reset()
assert.Empty(t, c.Bytes())
}

func TestPanicOnFatal(t *testing.T) {
const input = "hello"
Discard(t)
logx.Info(input)

PanicOnFatal(t)
PanicOnFatal(t)
assert.Panics(t, func() {
logx.Must(errors.New("foo"))
})
}

func TestCollectorContent(t *testing.T) {
const input = "hello"
c := NewCollector(t)
c.buf.WriteString(input)
assert.Empty(t, c.Content())
c.Reset()
c.buf.WriteString(`{}`)
assert.Empty(t, c.Content())
c.Reset()
c.buf.WriteString(`{"content":1}`)
assert.Equal(t, "1", c.Content())
}
2 changes: 1 addition & 1 deletion core/logx/richlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (l *richLogger) Errorf(format string, v ...any) {
}

func (l *richLogger) Errorv(v any) {
l.err(fmt.Sprint(v))
l.err(v)
}

func (l *richLogger) Errorw(msg string, fields ...LogField) {
Expand Down
45 changes: 42 additions & 3 deletions core/logx/richlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func TestTraceDebug(t *testing.T) {
l.WithDuration(time.Second).Debugv(testlog)
validate(t, w.String(), true, true)
w.Reset()
l.WithDuration(time.Second).Debugv(testobj)
validateContentType(t, w.String(), map[string]any{}, true, true)
w.Reset()
l.WithDuration(time.Second).Debugw(testlog, Field("foo", "bar"))
validate(t, w.String(), true, true)
assert.True(t, strings.Contains(w.String(), "foo"), w.String())
Expand Down Expand Up @@ -103,6 +106,9 @@ func TestTraceError(t *testing.T) {
l.WithDuration(time.Second).Errorv(testlog)
validate(t, w.String(), true, true)
w.Reset()
l.WithDuration(time.Second).Errorv(testobj)
validateContentType(t, w.String(), map[string]any{}, true, true)
w.Reset()
l.WithDuration(time.Second).Errorw(testlog, Field("basket", "ball"))
validate(t, w.String(), true, true)
assert.True(t, strings.Contains(w.String(), "basket"), w.String())
Expand Down Expand Up @@ -137,6 +143,9 @@ func TestTraceInfo(t *testing.T) {
l.WithDuration(time.Second).Infov(testlog)
validate(t, w.String(), true, true)
w.Reset()
l.WithDuration(time.Second).Infov(testobj)
validateContentType(t, w.String(), map[string]any{}, true, true)
w.Reset()
l.WithDuration(time.Second).Infow(testlog, Field("basket", "ball"))
validate(t, w.String(), true, true)
assert.True(t, strings.Contains(w.String(), "basket"), w.String())
Expand Down Expand Up @@ -173,6 +182,9 @@ func TestTraceInfoConsole(t *testing.T) {
w.Reset()
l.WithDuration(time.Second).Infov(testlog)
validate(t, w.String(), true, true)
w.Reset()
l.WithDuration(time.Second).Infov(testobj)
validateContentType(t, w.String(), map[string]any{}, true, true)
}

func TestTraceSlow(t *testing.T) {
Expand Down Expand Up @@ -204,6 +216,9 @@ func TestTraceSlow(t *testing.T) {
l.WithDuration(time.Second).Slowv(testlog)
validate(t, w.String(), true, true)
w.Reset()
l.WithDuration(time.Second).Slowv(testobj)
validateContentType(t, w.String(), map[string]any{}, true, true)
w.Reset()
l.WithDuration(time.Second).Sloww(testlog, Field("basket", "ball"))
validate(t, w.String(), true, true)
assert.True(t, strings.Contains(w.String(), "basket"), w.String())
Expand Down Expand Up @@ -311,8 +326,32 @@ func validate(t *testing.T, body string, expectedTrace, expectedSpan bool) {
assert.Equal(t, expectedSpan, len(val.Span) > 0, body)
}

func validateContentType(t *testing.T, body string, expectedType any, expectedTrace, expectedSpan bool) {
var val mockValue
dec := json.NewDecoder(strings.NewReader(body))

for {
var doc mockValue
err := dec.Decode(&doc)
if err == io.EOF {
// all done
break
}
if err != nil {
continue
}

val = doc
}

assert.IsType(t, expectedType, val.Content, body)
assert.Equal(t, expectedTrace, len(val.Trace) > 0, body)
assert.Equal(t, expectedSpan, len(val.Span) > 0, body)
}

type mockValue struct {
Trace string `json:"trace"`
Span string `json:"span"`
Foo string `json:"foo"`
Trace string `json:"trace"`
Span string `json:"span"`
Foo string `json:"foo"`
Content any `json:"content"`
}
2 changes: 2 additions & 0 deletions core/logx/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

const testlog = "Stay hungry, stay foolish."

var testobj = map[string]any{"foo": "bar"}

func TestCollectSysLog(t *testing.T) {
CollectSysLog()
content := getContent(captureOutput(func() {
Expand Down
4 changes: 2 additions & 2 deletions core/mr/mapreduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mr
import (
"context"
"errors"
"io/ioutil"
"io"
"log"
"runtime"
"sync/atomic"
Expand All @@ -17,7 +17,7 @@ import (
var errDummy = errors.New("dummy")

func init() {
log.SetOutput(ioutil.Discard)
log.SetOutput(io.Discard)
}

func TestFinish(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion core/rescue/recover.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package rescue

import "github.com/zeromicro/go-zero/core/logx"
import (
"context"
"runtime/debug"

"github.com/zeromicro/go-zero/core/logx"
)

// Recover is used with defer to do cleanup on panics.
// Use it like:
Expand All @@ -15,3 +20,14 @@ func Recover(cleanups ...func()) {
logx.ErrorStack(p)
}
}

// RecoverCtx is used with defer to do cleanup on panics.
func RecoverCtx(ctx context.Context, cleanups ...func()) {
for _, cleanup := range cleanups {
cleanup()
}

if p := recover(); p != nil {
logx.WithContext(ctx).Errorf("%+v\n%s", p, debug.Stack())
}
}
15 changes: 15 additions & 0 deletions core/rescue/recover_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rescue

import (
"context"
"sync/atomic"
"testing"

Expand All @@ -25,3 +26,17 @@ func TestRescue(t *testing.T) {
})
assert.Equal(t, int32(5), atomic.LoadInt32(&count))
}

func TestRescueCtx(t *testing.T) {
var count int32
assert.NotPanics(t, func() {
defer RecoverCtx(context.Background(), func() {
atomic.AddInt32(&count, 2)
}, func() {
atomic.AddInt32(&count, 3)
})

panic("hello")
})
assert.Equal(t, int32(5), atomic.LoadInt32(&count))
}
4 changes: 2 additions & 2 deletions core/stat/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *metricsContainer) Execute(v any) {
report.Median = float32(medianTask.Duration) / float32(time.Millisecond)
tenPercent := fiftyPercent / 5
if tenPercent > 0 {
top10pTasks := topK(tasks, tenPercent)
top10pTasks := topK(top50pTasks, tenPercent)
task90th := top10pTasks[0]
report.Top90th = float32(task90th.Duration) / float32(time.Millisecond)
onePercent := tenPercent / 10
Expand All @@ -163,7 +163,7 @@ func (c *metricsContainer) Execute(v any) {
report.Top99p9th = mostDuration
}
} else {
mostDuration := getTopDuration(tasks)
mostDuration := getTopDuration(top50pTasks)
report.Top90th = mostDuration
report.Top99th = mostDuration
report.Top99p9th = mostDuration
Expand Down
13 changes: 13 additions & 0 deletions core/threading/routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package threading

import (
"bytes"
"context"
"runtime"
"strconv"

Expand All @@ -13,6 +14,11 @@ func GoSafe(fn func()) {
go RunSafe(fn)
}

// GoSafeCtx runs the given fn using another goroutine, recovers if fn panics with ctx.
func GoSafeCtx(ctx context.Context, fn func()) {
go RunSafeCtx(ctx, fn)
}

// RoutineId is only for debug, never use it in production.
func RoutineId() uint64 {
b := make([]byte, 64)
Expand All @@ -31,3 +37,10 @@ func RunSafe(fn func()) {

fn()
}

// RunSafeCtx runs the given fn, recovers if fn panics with ctx.
func RunSafeCtx(ctx context.Context, fn func()) {
defer rescue.RecoverCtx(ctx)

fn()
}
51 changes: 51 additions & 0 deletions core/threading/routines_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package threading

import (
"bytes"
"context"
"io"
"log"
"testing"

"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
)

func TestRoutineId(t *testing.T) {
Expand Down Expand Up @@ -34,3 +37,51 @@ func TestRunSafe(t *testing.T) {
<-ch
i++
}

func TestRunSafeCtx(t *testing.T) {
var buf bytes.Buffer
logx.SetWriter(logx.NewWriter(&buf))
ctx := context.Background()
ch := make(chan lang.PlaceholderType)

i := 0

defer func() {
assert.Equal(t, 1, i)
}()

go RunSafeCtx(ctx, func() {
defer func() {
ch <- lang.Placeholder
}()

panic("panic")
})

<-ch
i++
}

func TestGoSafeCtx(t *testing.T) {
var buf bytes.Buffer
logx.SetWriter(logx.NewWriter(&buf))
ctx := context.Background()
ch := make(chan lang.PlaceholderType)

i := 0

defer func() {
assert.Equal(t, 1, i)
}()

GoSafeCtx(ctx, func() {
defer func() {
ch <- lang.Placeholder
}()

panic("panic")
})

<-ch
i++
}
Loading

0 comments on commit 758a84b

Please sign in to comment.