-
Notifications
You must be signed in to change notification settings - Fork 12
/
dispatch.go
197 lines (150 loc) · 3.92 KB
/
dispatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package litejob
import (
"fmt"
"runtime"
"runtime/debug"
"sync"
"time"
)
type JobCallbackConfigure struct {
After JobCallback
Before JobCallback
}
type EngineConfigure map[string]interface{}
func (this *EngineConfigure) Get(name string, defaultVal interface{}) interface{} {
if v, ok := (*this)[name]; ok {
return v
}
return defaultVal
}
// configure dispatch
type DispatchConfigure struct {
Engine string // storage engine name.eg:(redis,sqlite,memory)
MaxConcurrency uint32 // max concurrency goroutine
MaxReplyCount uint32 // max reply job to job list when job ask need again
HeartInterval time.Duration // sleep some time when empty job list
DumpInterval time.Duration // not used
Logfile string // log file path
EngineConfigure EngineConfigure // engine configure
Callback JobCallbackConfigure // job callbacks
}
//dispatch engine status for monitor
type DispatchStatus struct {
Running uint32 //runing goroutine count
Len uint32 //job list length
}
//
type Dispatch struct {
sync.Mutex
configure *DispatchConfigure
storage Storage
running uint32
handlerList map[string]JobHandler
log *Log
Count uint32
}
func NewDispatch(configure *DispatchConfigure) (*Dispatch, error) {
dispatch := new(Dispatch)
dispatch.configure = configure
dispatch.handlerList = make(map[string]JobHandler, 0)
dispatch.log = NewLog(dispatch.configure.Logfile)
storage, err := GetStorage(dispatch.configure.Engine, dispatch.configure)
if err != nil {
return nil, err
}
dispatch.storage = storage
return dispatch, nil
}
func (this *Dispatch) RegisterHandler(name string, handler JobHandler) {
this.handlerList[name] = handler
}
func (this *Dispatch) JobNew(name string, param interface{}) (*Job, error) {
job := &Job{
Id: Guid(),
Name: name,
Param: param,
Status: JobStatusWating,
CreateTime: time.Now(),
}
err := this.storage.JobPush(job)
if err != nil {
this.log.Error("push job error:" + err.Error())
}
return job, err
}
func (this *Dispatch) Start() {
this.Loop()
}
func (this *Dispatch) Loop() {
for {
//队列非空
if this.storage.JobLen() > 0 {
if this.running < this.configure.MaxConcurrency {
this.Lock()
this.running++
this.Unlock()
go this.next()
//remise CPU
}
runtime.Gosched()
continue
}
time.Sleep(this.configure.HeartInterval)
runtime.Gosched()
}
}
func (this *Dispatch) JobState(jobId string) (*JobState, error) {
return this.storage.JobState(jobId)
}
func (this *Dispatch) next() {
defer func() {
this.Lock()
this.running--
this.Unlock()
}()
defer func() {
if r := recover(); r != nil {
this.log.Error("run job error:" + fmt.Sprint(r))
debug.PrintStack()
}
}()
job, err := this.storage.JobPop()
if err != nil {
return
}
job.Status = JobStatusDoing
if this.configure.Callback.Before != nil {
this.configure.Callback.Before(job)
}
handler, ok := this.handlerList[job.Name]
if ok == false {
this.log.Error("unknow handler %s", job.Name)
return
}
startTime := time.Now()
ret := handler(job)
endTime := time.Now()
job.Status = ret.Status
if this.configure.Callback.After != nil {
this.configure.Callback.After(job)
}
if ret.Status == JobStatusAgain {
if job.ReplyCount < this.configure.MaxReplyCount {
job.ReplyCount++
this.storage.JobPush(job)
} else {
ret.Status = JobStatusFailed
ret.Msg = "over max reply times."
}
}
state := &JobState{
JobId: job.Id,
Status: ret.Status.String(),
Msg: ret.Msg,
WaitTime: int(endTime.Sub(job.CreateTime).Seconds()),
RunTime: int(endTime.Sub(startTime).Nanoseconds() / 1000000),
}
this.storage.JobStateUpdate(state)
this.Count++
this.log.Normal("%6d %3d %32s %32s %10s %8dms %8ds %s", this.Count, this.running, job.Name, job.Id, ret.Status, state.RunTime, state.WaitTime, ret.Msg)
}