-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdynamic_workers_pool.go
126 lines (105 loc) · 2.82 KB
/
dynamic_workers_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package dynamicworkerspool
import (
"context"
"errors"
"sync"
"go.uber.org/atomic"
)
// DynamicWorkersPool пул воркеров с возможностью на ходу менять количество работающих воркеров
type DynamicWorkersPool struct {
current *atomic.Uint64
mx sync.Mutex
tasks chan func()
wg *sync.WaitGroup
quit chan struct{}
closer func()
once sync.Once
}
// NewDynamicWorkersPool создает пул воркеров
func NewDynamicWorkersPool(workersNum, taskPoolSize uint) (*DynamicWorkersPool, error) {
if workersNum == 0 {
return nil, errors.New("workers ammount is requiered")
}
if taskPoolSize == 0 {
return nil, errors.New("task pool size is requiered")
}
wp := &DynamicWorkersPool{
current: atomic.NewUint64(0),
tasks: make(chan func(), taskPoolSize),
wg: new(sync.WaitGroup),
quit: make(chan struct{}),
}
wp.closer = func() {
close(wp.tasks)
wp.wg.Wait()
close(wp.quit)
wp.current.Store(0)
}
_ = wp.ResetWorkersNum(workersNum)
return wp, nil
}
// Close - закрывает пул и ждет пока все воркеры доработают
// После закрытия пул не пригоден для работы, при попытке добавить задание упадет с паникой
func (dwp *DynamicWorkersPool) Close() error {
if dwp.closer != nil {
dwp.once.Do(dwp.closer)
}
return nil
}
// PushTask - добавляет задачу в очередь.
// PushTask блокирующая операция, и стоит использовать context.WithCancel/context.WithTimeout
func (dwp *DynamicWorkersPool) PushTask(ctx context.Context, task func()) error {
select {
case dwp.tasks <- task:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// ResetWorkersNum переопределение числа воркеров
func (dwp *DynamicWorkersPool) ResetWorkersNum(workersNum uint) error {
if workersNum == 0 {
return errors.New("negative workers num")
}
dwp.mx.Lock()
defer dwp.mx.Unlock()
curr := uint(dwp.current.Load())
if workersNum > curr {
dwp.increaseWorkers(workersNum - curr)
} else if workersNum < curr {
dwp.decreaseWorkers(curr - workersNum)
}
return nil
}
func (dwp *DynamicWorkersPool) worker() {
defer dwp.wg.Done()
for {
select {
case <-dwp.quit:
return
case task, ok := <-dwp.tasks:
if ok {
task()
} else {
return
}
}
}
}
func (dwp *DynamicWorkersPool) decreaseWorkers(delta uint) {
curr := uint(dwp.current.Load())
if delta > curr {
delta = curr
}
dwp.current.Sub(uint64(delta))
for i := uint(0); i < delta; i++ {
dwp.quit <- struct{}{}
}
}
func (dwp *DynamicWorkersPool) increaseWorkers(delta uint) {
dwp.current.Add(uint64(delta))
dwp.wg.Add(int(delta))
for i := uint(0); i < delta; i++ {
go dwp.worker()
}
}