-
Notifications
You must be signed in to change notification settings - Fork 1
/
selector.go
173 lines (133 loc) · 4.13 KB
/
selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package makroud
import (
"strings"
"sync"
"github.com/pkg/errors"
)
// DefaultSelector defines the default selector alias.
const DefaultSelector = MasterSelector
// MasterSelector defines the master alias.
const MasterSelector = "master"
// SlaveSelector defines the slave alias.
const SlaveSelector = "slave"
// SelectorConfigurations define a list of configurations for a pool of drivers.
type SelectorConfigurations map[string]*ClientOptions
// Selector contains a pool of drivers indexed by their name.
type Selector struct {
mutex sync.RWMutex
cache *DriverCache
configurations map[string]*ClientOptions
connections map[string]Driver
}
// NewSelector returns a new selector containing a pool of drivers with given configuration.
func NewSelector(configurations map[string]*ClientOptions) (*Selector, error) {
connections := map[string]Driver{}
selector := &Selector{
configurations: configurations,
cache: NewDriverCache(),
connections: connections,
}
return selector, nil
}
// NewSelectorWithDriver returns a new selector containing the given connection.
func NewSelectorWithDriver(driver Driver) (*Selector, error) {
selector := &Selector{
configurations: map[string]*ClientOptions{},
cache: driver.GetCache(),
connections: map[string]Driver{
DefaultSelector: driver,
},
}
return selector, nil
}
// NewSelectorWithDrivers returns a new selector containing the given connections.
func NewSelectorWithDrivers(drivers map[string]Driver) (*Selector, error) {
selector := &Selector{
configurations: map[string]*ClientOptions{},
cache: NewDriverCache(),
connections: drivers,
}
return selector, nil
}
// Using returns the underlying drivers if it's alias exists.
func (selector *Selector) Using(alias string) (Driver, error) {
alias = strings.ToLower(alias)
selector.mutex.RLock()
connection, found := selector.connections[alias]
selector.mutex.RUnlock()
if found {
return connection, nil
}
selector.mutex.Lock()
defer selector.mutex.Unlock()
connection, found = selector.connections[alias]
if found {
return connection, nil
}
for name, configuration := range selector.configurations {
if alias == strings.ToLower(name) {
connection, err := NewWithOptions(configuration)
if err != nil {
return nil, err
}
if selector.cache != nil {
connection.SetCache(selector.cache)
}
selector.connections[alias] = connection
return connection, nil
}
}
return nil, errors.Wrapf(ErrSelectorNotFoundConnection, "connection alias '%s' not found", alias)
}
// RetryAliases is an helper calling Retry with a list of aliases.
func (selector *Selector) RetryAliases(handler func(Driver) error, aliases ...string) error {
drivers := []Driver{}
for _, alias := range aliases {
connection, err := selector.Using(alias)
if err != nil {
continue
}
drivers = append(drivers, connection)
}
return Retry(handler, drivers...)
}
// RetryMaster is an helper calling RetryAliases with a slave then a master connection.
func (selector *Selector) RetryMaster(handler func(Driver) error) error {
return selector.RetryAliases(handler, SlaveSelector, MasterSelector)
}
// Close closes all drivers connections.
func (selector *Selector) Close() error {
selector.mutex.Lock()
defer selector.mutex.Unlock()
failures := []error{}
for alias, connection := range selector.connections {
err := connection.Close()
if err != nil {
failures = append(failures, errors.Wrapf(err, "cannot close drivers connection for %s", alias))
}
}
selector.connections = map[string]Driver{}
if len(failures) > 0 {
return failures[0]
}
return nil
}
// Ping checks if a connection is available.
func (selector *Selector) Ping() error {
return selector.RetryMaster(func(driver Driver) error {
return driver.Ping()
})
}
// Retry execute given handler on several drivers until it succeeds on a connection.
func Retry(handler func(Driver) error, drivers ...Driver) (err error) {
if len(drivers) == 0 {
return errors.WithStack(ErrSelectorMissingRetryConnection)
}
for _, driver := range drivers {
err = handler(driver)
if err == nil {
return nil
}
}
return err
}