-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathworkers.go
100 lines (84 loc) · 2.42 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main
import (
"net/url"
"time"
)
// workerManager manages worker enclaves.
type workerManager struct {
timeout time.Duration
reg, unreg chan *url.URL
len chan int
forAllFunc chan func(*url.URL)
}
// workers maps worker enclaves (identified by a URL) to a timestamp that keeps
// track of when we last got a heartbeat from the worker.
type workers map[url.URL]time.Time
func newWorkerManager(timeout time.Duration) *workerManager {
return &workerManager{
timeout: timeout,
reg: make(chan *url.URL),
unreg: make(chan *url.URL),
len: make(chan int),
forAllFunc: make(chan func(*url.URL)),
}
}
// start starts the worker manager's event loop.
func (w *workerManager) start(stop chan struct{}) {
var (
set = make(workers)
timer = time.NewTicker(w.timeout)
)
elog.Println("Starting worker event loop.")
defer elog.Println("Stopping worker event loop.")
for {
select {
case <-stop:
return
case <-timer.C:
now := time.Now()
for worker, lastSeen := range set {
if now.Sub(lastSeen) > w.timeout {
delete(set, worker)
elog.Printf("Pruned %s from worker set.", worker.Host)
}
}
case worker := <-w.reg:
set[*worker] = time.Now()
elog.Printf("(Re-)registered worker %s; %d worker(s) now registered.",
worker.Host, len(set))
case worker := <-w.unreg:
delete(set, *worker)
elog.Printf("Unregistered worker %s; %d worker(s) left.",
worker.Host, len(set))
case f := <-w.forAllFunc:
w.runForAll(f, set)
case <-w.len:
w.len <- len(set)
}
}
}
// runForAll runs the given function over all workers in our set. For key
// synchronization, this should never take more than a couple seconds.
func (w *workerManager) runForAll(f func(*url.URL), set workers) {
for worker := range set {
go f(&worker)
}
}
// length returns the number of workers that are currently registered.
func (w *workerManager) length() int {
w.len <- 0 // Signal to the event loop that we want the length.
return <-w.len
}
// forAll runs the given function over all registered workers.
func (w *workerManager) forAll(f func(*url.URL)) {
w.forAllFunc <- f
}
// register registers a new worker enclave. It is safe to repeatedly register
// the same worker enclave.
func (w *workerManager) register(worker *url.URL) {
w.reg <- worker
}
// unregister unregisters the given worker enclave.
func (w *workerManager) unregister(worker *url.URL) {
w.unreg <- worker
}