-
Notifications
You must be signed in to change notification settings - Fork 17
/
first-success.go
131 lines (117 loc) · 2.75 KB
/
first-success.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"context"
"strconv"
"strings"
"golang.org/x/sync/errgroup"
)
// FirstSuccess is a helper for running multiple functions concurrently and returning the first successful result.
// If all functions return an error, all the errors are returned as a ErrorSlice.
func FirstSuccess[T comparable](
ctx context.Context,
concurrency int,
fns ...JobFunc[T],
) (T, error) {
type result struct {
val T
err error
}
results := make(chan result, len(fns))
// NOTE: even after the first success, the other goroutines will still run until they finish.
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()
var wg errgroup.Group
if concurrency > 0 {
wg.SetLimit(concurrency)
}
for _, fn := range fns {
fn := fn
wg.Go(func() error {
if ctx.Err() != nil {
var empty T
results <- result{empty, ctx.Err()} // TODO: is this OK?
return nil
}
val, err := fn(ctx)
select {
case results <- result{val, err}:
case <-ctx.Done():
}
return nil
})
}
go func() {
wg.Wait()
close(results)
}()
var errs ErrorSlice
for res := range results {
if res.err == nil {
// NOTE: it will count as a success even if the value is the zero value (e.g. 0, "", nil, etc.)
return res.val, nil
}
errs = append(errs, res.err)
if len(errs) == len(fns) {
break
}
}
return *new(T), errs
}
func IsErrorSlice(err error) bool {
_, ok := err.(ErrorSlice)
return ok
}
type ErrorSlice []error
func (e ErrorSlice) Error() string {
// format like; ErrorSlice{"error1", "error2", "error3"}
if len(e) == 0 {
return "ErrorSlice{}"
}
builder := strings.Builder{}
builder.WriteString("ErrorSlice{")
for i, err := range e {
if i > 0 {
builder.WriteString(", ")
}
if err == nil {
builder.WriteString("nil")
continue
}
// write quoted string
builder.WriteString(strconv.Quote(err.Error()))
}
builder.WriteString("}")
return builder.String()
}
// Filter returns a new slice of errors that satisfy the predicate.
func (e ErrorSlice) Filter(predicate func(error) bool) ErrorSlice {
var errs ErrorSlice
for _, err := range e {
if predicate(err) {
errs = append(errs, err)
}
}
return errs
}
func (e ErrorSlice) All(predicate func(error) bool) bool {
for _, err := range e {
if !predicate(err) {
return false
}
}
return true
}
type JobFunc[T comparable] func(context.Context) (T, error)
func NewJobGroup[T comparable]() *JobGroup[T] {
return &JobGroup[T]{}
}
type JobGroup[T comparable] []JobFunc[T]
func (r *JobGroup[T]) Add(fn JobFunc[T]) {
*r = append(*r, fn)
}
func (r *JobGroup[T]) Run(ctx context.Context) (T, error) {
return FirstSuccess(ctx, -1, *r...)
}
func (r *JobGroup[T]) RunWithConcurrency(ctx context.Context, concurrency int) (T, error) {
return FirstSuccess(ctx, concurrency, *r...)
}