forked from linxGnu/mssqlx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbalancer.go
134 lines (110 loc) · 2.67 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package mssqlx
import (
"context"
"sync/atomic"
"time"
)
// database balancer and health checker.
type balancer struct {
ctx context.Context
cancel context.CancelFunc
driverName string
dbs *dbList
fail chan *wrapper
isWsrep bool
isMulti bool
numberOfHealthChecker int
_p1 [8]uint64 // prevent false sharing
healthCheckPeriod uint64
_p2 [8]uint64
}
// new balancer and start health checkers
func newBalancer(ctx context.Context, numHealthChecker int, numDbInstance int, isWsrep bool) *balancer {
if ctx == nil {
ctx = context.Background()
}
if numHealthChecker <= 0 {
numHealthChecker = 2 // at least two checkers
}
c := &balancer{
numberOfHealthChecker: numHealthChecker,
dbs: &dbList{},
fail: make(chan *wrapper, numDbInstance),
isWsrep: isWsrep,
isMulti: numDbInstance > 1,
healthCheckPeriod: DefaultHealthCheckPeriodInMilli,
}
// setup context
c.ctx, c.cancel = context.WithCancel(ctx)
// run health checker
for i := 0; i < numHealthChecker; i++ {
go c.healthChecker()
}
return c
}
func (c *balancer) size() int {
return c.dbs.size()
}
func (c *balancer) getHealthCheckPeriod() uint64 {
return atomic.LoadUint64(&c.healthCheckPeriod)
}
func (c *balancer) setHealthCheckPeriod(period uint64) {
if period == 0 {
period = DefaultHealthCheckPeriodInMilli
}
atomic.StoreUint64(&c.healthCheckPeriod, period)
}
// add a db connection to handle in balancer
func (c *balancer) add(w *wrapper) {
c.dbs.add(w)
}
// get a db to handle our query
func (c *balancer) get(shouldBalancing bool) *wrapper {
if shouldBalancing {
return c.dbs.next()
}
return c.dbs.current()
}
// failure make a db node become failure and auto health tracking
func (c *balancer) failure(w *wrapper) {
if c.dbs.remove(w) { // remove this node
c.sendFailure(w)
}
}
func (c *balancer) sendFailure(w *wrapper) {
select {
case <-c.ctx.Done():
return
case c.fail <- w: // give to health checker
}
}
// healthChecker daemon to check health of db connection
func (c *balancer) healthChecker() {
doneCh := c.ctx.Done()
var db *wrapper
for {
select {
case <-doneCh:
return
case db = <-c.fail:
if ping(db) == nil && (!c.isWsrep || db.checkWsrepReady()) {
c.dbs.add(db)
continue
}
select {
case <-doneCh:
return
case <-time.After(time.Duration(c.getHealthCheckPeriod()) * time.Millisecond):
}
select {
case <-doneCh:
return
case c.fail <- db:
}
}
}
}
func (c *balancer) destroy() {
c.dbs.clear()
c.cancel()
}