forked from OneOfOne/otk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
workers.go
113 lines (102 loc) · 1.93 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package otk
import (
"context"
"errors"
"log"
"runtime"
"sync"
"sync/atomic"
"time"
)
var (
ErrClosedPool = errors.New("the pool is closed, go swim somewhere else")
ErrPoolIsFull = errors.New("the pool is full")
)
func NewWorkers(ctx context.Context, initial, increaseBy int) *Workers {
ctx, cfn := context.WithCancel(ctx)
w := &Workers{
ch: make(chan func(ctx context.Context), initial),
ctx: ctx,
cfn: cfn,
inc: increaseBy,
}
go w.init(initial)
return w
}
type Workers struct {
mux sync.Mutex
ch chan func(ctx context.Context)
ctx context.Context
cfn context.CancelFunc
inc int
total int64
}
func (w *Workers) Exec(fn func(context.Context)) (_ error) {
if err := w.ctx.Err(); err != nil {
return ErrClosedPool
}
for i := 0; i < 3; i++ {
select {
case w.ch <- fn:
return
case <-w.ctx.Done():
return ErrClosedPool
default:
w.spawn(w.inc)
runtime.Gosched()
}
}
log.Printf("workers: we are being overrun :(, count: %v, chan: %v", atomic.LoadInt64(&w.total), len(w.ch))
return ErrPoolIsFull
}
func (w *Workers) Close() error {
if w.ctx.Err() != nil {
return ErrClosedPool
}
w.cfn()
close(w.ch)
return nil
}
func (w *Workers) init(n int) {
w.spawn(n)
tk := time.NewTicker(time.Minute * 5)
defer tk.Stop()
for {
select {
case <-tk.C:
// skip if the channel isn't empty or the number of workers is less than the initial
if len(w.ch) > 0 || atomic.LoadInt64(&w.total) <= int64(cap(w.ch)) {
continue
}
for i := 0; i < w.inc; i++ {
select {
case <-w.ctx.Done():
return
case w.ch <- nil:
}
}
atomic.AddInt64(&w.total, -int64(w.inc))
case <-w.ctx.Done():
return
}
}
}
func (w *Workers) spawn(n int) {
for i := 0; i < n; i++ {
go w.worker()
}
atomic.AddInt64(&w.total, int64(n))
}
func (w *Workers) worker() {
for {
select {
case fn := <-w.ch:
if fn == nil {
return
}
fn(w.ctx)
case <-w.ctx.Done():
return
}
}
}