-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutil.go
117 lines (107 loc) · 2.8 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package event
import (
"context"
"fmt"
"log"
"os"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync/atomic"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const (
spanKeyEventID = "event.id"
spanKeyEventName = "event.name"
spanKeyEventSource = "event.source"
spanKeyEventRegistry = "event.registry"
spanKeyEventSubscriptionID = "subscription.id"
)
var (
counter uint64
// DefaultLoggerFlags default flags for logger
DefaultLoggerFlags = log.LstdFlags | log.Lshortfile | log.Lmsgprefix
)
// NewID generate new event id
func NewID() string {
u, err := uuid.NewRandom()
if err == nil {
return u.String()
}
return strconv.FormatUint(atomic.AddUint64(&counter, 1), 10)
}
// Sanitize strings and remove special chars
func Sanitize(s string) string {
var result strings.Builder
result.Grow(len(s))
for i := 0; i < len(s); i++ {
b := s[i]
if ('a' <= b && b <= 'z') ||
('A' <= b && b <= 'Z') ||
('0' <= b && b <= '9') {
result.WriteByte(b)
} else {
result.WriteByte(byte('_'))
}
}
return result.String()
}
// Logger get logger
func Logger(prefix string) *log.Logger {
return log.New(os.Stdout, prefix, DefaultLoggerFlags)
}
// AsyncHandler convert event handler to async
func AsyncHandler(handler Handler, copyContextFns ...func(to, from context.Context) context.Context) Handler {
return func(ctx context.Context, ev Event, data Data) {
// Call handler with go routine
go func() {
defer func() {
_, _, l, _ := runtime.Caller(1)
if err := recover(); err != nil {
flag := ev.Name()
logger.Printf("Event[%s] Recover panic line => %v\n", flag, l)
logger.Printf("Event[%s] Recover err => %v\n", flag, err)
debug.PrintStack()
}
}()
// Create a new copy of context
spanCtx := trace.SpanContextFromContext(ctx)
// Create a new context
newCtx := NewContext(ctx)
for _, fn := range copyContextFns {
// Copy other data
newCtx = fn(newCtx, ctx)
}
// enable tracing
if tracer := otel.Tracer("event"); tracer != nil {
var span trace.Span
newCtx, span = tracer.Start(newCtx, fmt.Sprintf("%s.subscribe.async", ev.Name()),
trace.WithAttributes(attribute.String(spanKeyEventID, ContextEventID(ctx)),
attribute.String(spanKeyEventSource, ContextSource(ctx)),
attribute.String(spanKeyEventName, ev.Name())),
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithLinks(trace.Link{
SpanContext: spanCtx,
}))
defer span.End()
}
handler(newCtx, ev, data)
}()
}
}
// Caller get caller function name
func Caller(depth int) string {
pc, _, _, ok := runtime.Caller(depth)
if !ok {
return ""
}
details := runtime.FuncForPC(pc)
if details != nil {
return details.Name()
}
return ""
}