This repository has been archived by the owner on May 31, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdefault_worker.go
132 lines (115 loc) · 2.71 KB
/
default_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package workers
import (
"context"
"errors"
"sync"
"time"
)
var (
ErrJobHandlerExists = errors.New("This workers job handler already exists")
ErrInvalidContext = errors.New("Cannot start worker with invalid context")
ErrJobChannelHasNoReceiver = errors.New("This workers job channel has no receiver")
)
// DefaultWorker uses the Worker interface.
type DefaultWorker struct {
sync.RWMutex
id string
jobChan chan interface{}
closeHandler func()
jobHandler func(interface{})
metadata interface{}
ctx context.Context
cancel context.CancelFunc
}
func (d *DefaultWorker) handleJobs() {
for {
select {
case <-d.ctx.Done():
return
case job := <-d.jobChan:
if d.jobHandler != nil {
d.jobHandler(job)
}
}
}
}
// GetID returns the ID of the worker.
func (d *DefaultWorker) GetID() string {
return d.id
}
// Context returns the workers context.
func (d *DefaultWorker) Context() context.Context {
return d.ctx
}
// Start starts the worker with a context.
func (d *DefaultWorker) Start(ctx context.Context) error {
// Invalid context
if ctx == nil || ctx.Err() != nil {
return ErrInvalidContext
}
// Set context and job channel.
d.ctx, d.cancel = context.WithCancel(ctx)
d.jobChan = make(chan interface{})
go d.handleJobs()
return nil
}
// Stop stops the worker and closes its job channels.
func (d *DefaultWorker) Stop() {
if d.ctx != nil && d.ctx.Err() == nil {
d.cancel()
}
if d.closeHandler != nil {
d.closeHandler()
}
}
// AddJob adds a job to the workers queue.
func (d *DefaultWorker) AddJob(j interface{}) error {
if d.ctx.Err() != nil {
return d.ctx.Err()
}
select {
case d.jobChan <- j:
return nil
default:
return ErrJobChannelHasNoReceiver
}
}
// ScheduleJob schedules a job for a later time.
func (d *DefaultWorker) ScheduleJob(j interface{}, t time.Time) {
go func() {
select {
case <-d.ctx.Done():
return
case <-GetScheduledTicker(t):
d.AddJob(j)
}
}()
}
// JobHandler adds a callback handler for new jobs.
func (d *DefaultWorker) JobHandler(jh func(interface{})) error {
if d.jobHandler != nil {
return ErrJobHandlerExists
}
d.jobHandler = jh
return nil
}
// CloseHandler sets a function to call when the worker closes.
func (d *DefaultWorker) CloseHandler(ch func()) {
d.closeHandler = ch
}
// SetMetadata sets arbitrary metadata values
func (d *DefaultWorker) SetMetadata(md interface{}) {
d.Lock()
defer d.Unlock()
d.metadata = md
}
// GetMetadata gets metadata
func (d *DefaultWorker) GetMetadata() interface{} {
d.Lock()
defer d.Unlock()
return d.metadata
}
// NewDefaultWorker creates an instance of the default worker.
func NewDefaultWorker(id string) Worker {
return &DefaultWorker{id: id}
}