-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathelasticsearch_query.go
253 lines (232 loc) · 7.71 KB
/
elasticsearch_query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package esasg
import (
"context"
"errors"
"strings"
"sync"
elastic "github.com/olivere/elastic/v7" // Elasticsearch client
"go.uber.org/zap" // Logging
"github.com/mintel/elasticsearch-asg/pkg/es" // Elasticsearch client extensions
"github.com/mintel/elasticsearch-asg/pkg/str" // String utilities
)
// ErrInconsistentNodes is returned when ElasticsearchQueryService.Nodes()
// gets different sets of nodes from Elasticsearch across API calls.
var ErrInconsistentNodes = errors.New("got inconsistent nodes from Elasticsearch")
// In case of ErrInconsistentNodes, retry this many times before giving up.
const defaultInconsistentNodesRetries = 3
// ElasticsearchQueryService implements methods that read from Elasticsearch endpoints.
type ElasticsearchQueryService struct {
client *elastic.Client
logger *zap.Logger
}
// NewElasticsearchQueryService returns a new ElasticsearchQueryService.
func NewElasticsearchQueryService(client *elastic.Client) *ElasticsearchQueryService {
return &ElasticsearchQueryService{
client: client,
logger: zap.L().Named("ElasticsearchQueryService"),
}
}
// Node returns a single node with the given name.
// Will return nil if the node doesn't exist.
func (s *ElasticsearchQueryService) Node(ctx context.Context, name string) (*Node, error) {
nodes, err := s.Nodes(ctx, name)
if err != nil {
return nil, err
}
return nodes[name], nil
}
// Nodes returns info and stats about the nodes in the Elasticsearch cluster,
// as a map from node name to Node.
// If names are past, limit to nodes with those names.
// It's left up to the caller to check if all the names are in the response.
func (s *ElasticsearchQueryService) Nodes(ctx context.Context, names ...string) (map[string]*Node, error) {
var result map[string]*Node
var err error
tries := defaultInconsistentNodesRetries
for tryCounter := 0; tryCounter < tries; tryCounter++ {
if tryCounter > 0 {
zap.L().Warn("got error describing Elasticsearch nodes",
zap.Error(err),
zap.Int("try", tryCounter+1),
zap.Int("max_tries", tries),
)
}
result, err = s.nodes(ctx, names...)
if err == nil {
return result, nil
}
}
return result, err
}
func (s *ElasticsearchQueryService) nodes(ctx context.Context, names ...string) (map[string]*Node, error) {
// We collect information from 4 Elasticsearch endpoints.
// The requests are send concurrently by separate goroutines.
var statsResp *elastic.NodesStatsResponse // Node stats
var infoResp *elastic.NodesInfoResponse // Node info
var shardsResp es.CatShardsResponse // Shards
var settings *shardAllocationExcludeSettings // Cluster settings
wg := sync.WaitGroup{} // Counter of running goroutines that can be waited on.
wg.Add(4) // Add 4 because 4 goroutines.
done := make(chan struct{}) // Channel that gets closed to signal that all goroutines are done.
errc := make(chan error, 4) // Channel that is used by the goroutines to send any errors that occur.
go func() {
wg.Wait() // Once all goroutines finish...
close(done) // close done to signal the parent goroutine.
}()
ctx, cancel := context.WithCancel(ctx) // If there's an error in one of the goroutines, abort the rest by sharing a common context.
defer cancel() // Early return due to error will trigger this.
// Get node stats
go func() {
defer wg.Done() // Decrement goroutine counter on goroutine end.
var err error
statsResp, err = s.client.NodesStats().NodeId(names...).Do(ctx)
if err != nil {
errc <- err
}
}()
// Get node info
go func() {
defer wg.Done() // Decrement goroutine counter on goroutine end.
var err error
infoResp, err = s.client.NodesInfo().NodeId(names...).Do(ctx)
if err != nil {
errc <- err
}
}()
// Get shards
go func() {
defer wg.Done() // Decrement goroutine counter on goroutine end.
var err error
shardsResp, err = es.NewCatShardsService(s.client).Do(ctx)
if err != nil {
errc <- err
}
}()
// Get cluster settings
go func() {
defer wg.Done() // Decrement goroutine counter on goroutine end.
resp, err := es.NewClusterGetSettingsService(s.client).FilterPath("*." + shardAllocExcludeSetting + ".*").Do(ctx)
if err != nil {
errc <- err
return
}
settings = newShardAllocationExcludeSettings(resp.Persistent)
tSettings := newShardAllocationExcludeSettings(resp.Transient)
if len(tSettings.Name) > 0 {
settings.Name = tSettings.Name
}
if len(tSettings.Host) > 0 {
settings.Host = tSettings.Host
}
if len(tSettings.IP) > 0 {
settings.IP = tSettings.IP
}
for k, v := range tSettings.Attr {
if len(v) > 0 {
settings.Attr[k] = v
}
}
}()
// Wait for an error, or all the goroutines to finish.
select {
case err := <-errc:
return nil, err
case <-done:
close(errc) // Release resources.
}
// Check if results have the same number of nodes.
if len(statsResp.Nodes) != len(infoResp.Nodes) {
statsNodes := make([]string, 0, len(statsResp.Nodes))
for name := range statsResp.Nodes {
statsNodes = append(statsNodes, name)
}
infoNodes := make([]string, 0, len(infoResp.Nodes))
for name := range infoResp.Nodes {
infoNodes = append(infoNodes, name)
}
zap.L().Error("got info and stats responses of different lengths",
zap.Strings("stats_nodes", statsNodes),
zap.Strings("info_nodes", infoNodes),
)
return nil, ErrInconsistentNodes
}
// Merge all endpoint responses into a single set of Nodes.
nodes := make(map[string]*Node, len(statsResp.Nodes))
for _, ni := range infoResp.Nodes {
ip := strings.Split(ni.IP, ":")[0] // Remove port number
excluded := str.In(ni.Name, settings.Name...) || str.In(ip, settings.IP...) || str.In(ni.Host, settings.Host...)
if !excluded {
for a, v := range ni.Attributes {
if sv, ok := settings.Attr[a]; ok && str.In(v, sv...) {
excluded = true
break
}
}
}
nodes[ni.Name] = &Node{
ClusterName: infoResp.ClusterName,
NodesInfoNode: *ni,
ExcludedShardAllocation: excluded,
}
}
for _, ns := range statsResp.Nodes {
n, ok := nodes[ns.Name]
if !ok {
nodeNames := make([]string, 0, len(nodes))
for name := range nodes {
nodeNames = append(nodeNames, name)
}
zap.L().Error("got node in stats response that isn't in info response",
zap.String("name", ns.Name),
zap.Strings("nodes", nodeNames),
)
return nil, ErrInconsistentNodes
}
n.Stats = *ns
}
for _, sr := range shardsResp {
shardNodes, err := parseShardNodes(sr.Node)
if err != nil {
zap.L().Error(err.Error(), zap.String("name", sr.Node))
return nil, err
} else if len(shardNodes) == 0 {
// Unassigned shard. Ignore.
continue
}
for _, node := range shardNodes {
if n, ok := nodes[node]; ok {
n.Shards = append(n.Shards, sr)
} else if len(names) == 0 {
nodeNames := make([]string, 0, len(nodes))
for name := range nodes {
nodeNames = append(nodeNames, name)
}
zap.L().Error("got node in shards response that isn't in info or stats response",
zap.String("name", node),
zap.Strings("nodes", nodeNames),
)
return nil, ErrInconsistentNodes
}
}
}
return nodes, nil
}
// parseShardNodes parses the node name from the /_cat/shards endpoint response
//
// This could be one of:
// - An empty string for an unassigned shard.
// - A node name for an normal shard.
// - Multiple node names if the shard is being relocated.
func parseShardNodes(node string) ([]string, error) {
if node == "" {
return nil, nil
}
parts := strings.Fields(node)
switch len(parts) {
case 1:
return parts, nil
case 5: // Example: "i-0968d7621b79cd73d -> 10.2.4.58 kNe49LLvSqGXBn2s8Ffgyw i-0a2ed08df0e5cfff6"
return []string{parts[0], parts[4]}, nil
}
return nil, errors.New("couldn't parse /_cat/shards response node name")
}