forked from constabulary/gb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecutor.go
103 lines (87 loc) · 2.1 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package gb
import (
"errors"
"sync"
)
// Execute executes a tree of *Actions sequentually in depth first order.
func Execute(a *Action) error {
seen := make(map[*Action]error)
return execute(seen, a)
}
func execute(seen map[*Action]error, a *Action) error {
// step 0, have we been here before
if err, ok := seen[a]; ok {
return err
}
// step 1, build all dependencies
for _, d := range a.Deps {
if err := execute(seen, d); err != nil {
return err
}
}
// step 2, now execute ourselves
err := a.Run()
seen[a] = err
return err
}
// ExecuteConcurrent executes all actions in a tree concurrently.
// Each Action will wait until its dependant actions are complete.
func ExecuteConcurrent(a *Action, n int, interrupt <-chan struct{}) error {
var mu sync.Mutex // protects seen
seen := make(map[*Action]chan error)
get := func(result chan error) error {
err := <-result
result <- err
return err
}
permits := make(chan bool, n)
for i := 0; i < cap(permits); i++ {
permits <- true
}
// wg tracks all the outstanding actions
var wg sync.WaitGroup
var execute func(map[*Action]chan error, *Action) chan error
execute = func(seen map[*Action]chan error, a *Action) chan error {
// step 0, have we seen this action before ?
mu.Lock()
if result, ok := seen[a]; ok {
// yes! return the result channel so others can wait
// on our action
mu.Unlock()
return result
}
// step 1, we're the first to run this action
result := make(chan error, 1)
seen[a] = result
mu.Unlock()
// queue all dependant actions.
var results []chan error
for _, dep := range a.Deps {
results = append(results, execute(seen, dep))
}
wg.Add(1)
go func() {
defer wg.Done()
// wait for dependant actions
for _, r := range results {
if err := get(r); err != nil {
result <- err
return
}
}
// wait for a permit and execute our action
select {
case <-permits:
result <- a.Run()
permits <- true
case <-interrupt:
result <- errors.New("interrupted")
return
}
}()
return result
}
err := get(execute(seen, a))
wg.Wait()
return err
}