-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontext.go
117 lines (107 loc) · 2.61 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright 2022 YBCZ, Inc. All rights reserved.
//
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file in the root of the source
// tree.
package mqms
import (
"context"
"encoding/json"
"github.com/google/uuid"
"math"
"net/http"
"time"
)
// Context 函数上下文
type Context struct {
ctx context.Context
evt Event
engine *Engine
Http *http.Client
index int
handlers []HandlerFunc
err *string
stack []string
}
func (c *Context) reset() {
c.ctx = context.TODO()
c.index = -1
c.handlers = make([]HandlerFunc, 0)
c.err = nil
c.stack = nil
}
func (c *Context) Bind(o interface{}) (err error) {
return json.Unmarshal(c.evt.Body, o)
}
// Next 先执行下一个函数
func (c *Context) Next() (err error) {
c.index++
for c.index < len(c.handlers) {
err = c.handlers[c.index](c)
c.index++
}
return
}
// Abort 中止后面的调用链
func (c *Context) Abort() {
c.index = math.MaxInt16 >> 1
}
// Error 错误处理
func (c *Context) Error(es error) error {
if es != nil {
c.err = stringPtr(es.Error())
c.stack = stack()
}
return es
}
// Emit 内部发布事件,在新协程中直接执行
func (c *Context) Emit(path string, body interface{}) {
var evt Event
evt.TransactionID = c.evt.TransactionID
evt.ID = uuid.New()
evt.ParentID = &c.evt.ID
evt.Path = path
evt.CreateAt = time.Now()
evt.Body, _ = json.Marshal(body)
evt.CallerTrace = stack()
raw, _ := json.Marshal(evt)
defer c.engine.handler.Trace(Trace{
Status: TraceStatusEmit,
Event: evt,
BeginAt: time.Now(),
})
go c.engine.Handle(raw)
}
// EmitDefer 内部发布延时事件,按情况将消息送入队列或存储
func (c *Context) EmitDefer(path string, body interface{}, duration time.Duration) {
if duration == 0 {
c.Emit(path, body)
return
}
var evt Event
evt.Path = path
evt.TransactionID = c.evt.TransactionID
evt.ID = uuid.New()
evt.Delay = duration
evt.ParentID = &c.evt.ID
evt.CreateAt = time.Now()
evt.Body, _ = json.Marshal(body)
evt.CallerTrace = stack()
raw, _ := json.Marshal(evt)
defer c.engine.handler.Trace(Trace{
Status: TraceStatusEmit,
Event: evt,
BeginAt: time.Now(),
})
if duration > time.Minute {
if err := c.engine.handler.Save(evt.ID, raw, duration); err != nil {
c.engine.handler.Log(normalLogFormat("事件存储错误:%v", err.Error()))
c.engine.handler.Fail(evt.ID, raw, err, stack())
}
} else {
if err := c.engine.handler.Pub(raw, duration); err != nil {
c.engine.handler.Log(normalLogFormat("事件发布错误:%v", err.Error()))
c.engine.handler.Fail(evt.ID, raw, err, stack())
}
}
}