-
Notifications
You must be signed in to change notification settings - Fork 20
/
retry-concurrent.go
113 lines (100 loc) · 1.79 KB
/
retry-concurrent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"errors"
"log"
"math/rand"
"sync"
"time"
)
type Job struct {
run func() error
retry int
index int
}
func work() error {
if rand.Float32() > 0.2 {
return errors.New("an error occured")
}
return nil
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
concurrent := 3
producer := func(done <-chan interface{}) chan Job {
jobs := make(chan Job, 10)
go func() {
for i := 0; i < 10; i++ {
job := Job{
run: work,
retry: 3,
index: i,
}
select {
case <-done:
return
case jobs <- job:
}
}
}()
return jobs
}
worker := func(i int, done <-chan interface{}, count chan interface{}, jobs chan Job) {
go func() {
// defer close(jobs)
// defer close(count)
for {
select {
case <-done:
return
case j, ok := <-jobs:
if !ok {
log.Println("closing job channels")
return
}
if err := j.run(); err != nil {
j.retry--
if j.retry > 0 {
log.Printf("retrying %d, attempt %d worker %d", j.index, 3-j.retry, i)
// Push back to the jobs channel
jobs <- j
} else {
count <- j.index
}
} else {
count <- j.index
}
}
}
}()
}
cleaner := func(done <-chan interface{}, count chan interface{}, jobs chan Job) {
var wg sync.WaitGroup
wg.Add(10)
go func() {
for i := 0; i < 10; i++ {
select {
case <-done:
return
default:
log.Println("got", <-count)
wg.Done()
}
}
close(count)
close(jobs)
}()
wg.Wait()
log.Println("Complete")
}
done := make(chan interface{})
defer close(done)
prod := producer(done)
count := make(chan interface{}, 10)
for i := 0; i < concurrent; i++ {
worker(i, done, count, prod)
}
cleaner(done, count, prod)
log.Println("done")
}