forked from Restream/reindexer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
describer.go
442 lines (409 loc) · 16 KB
/
describer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
package reindexer
import (
"context"
"strings"
)
const (
ConfigNamespaceName = "#config"
MemstatsNamespaceName = "#memstats"
NamespacesNamespaceName = "#namespaces"
PerfstatsNamespaceName = "#perfstats"
QueriesperfstatsNamespaceName = "#queriesperfstats"
ClientsStatsNamespaceName = "#clientsstats"
)
// Map from cond name to index type
var queryTypes = map[string]int{
"EQ": EQ,
"GT": GT,
"LT": LT,
"GE": GE,
"LE": LE,
"SET": SET,
"RANGE": RANGE,
"ANY": ANY,
"EMPTY": EMPTY,
"ALLSET": ALLSET,
"MATCH": EQ,
"LIKE": LIKE,
}
func GetCondType(name string) (int, error) {
cond, ok := queryTypes[strings.ToUpper(name)]
if ok {
return cond, nil
} else {
return 0, ErrCondType
}
}
// Map from index type to cond name
var queryNames = map[int]string{
EQ: "EQ",
GT: "GT",
LT: "LT",
GE: "GE",
LE: "LE",
SET: "SET",
RANGE: "RANGE",
ANY: "ANY",
EMPTY: "EMPTY",
LIKE: "LIKE",
}
type IndexDescription struct {
IndexDef
IsSortable bool `json:"is_sortable"`
IsFulltext bool `json:"is_fulltext"`
Conditions []string `json:"conditions"`
}
type NamespaceDescription struct {
Name string `json:"name"`
Indexes []IndexDescription `json:"indexes"`
StorageEnabled bool `json:"storage_enabled"`
}
// CacheMemStat information about reindexer's cache memory consumption
type CacheMemStat struct {
// Total memory consumption by this cache
TotalSize int64 `json:"total_size"`
// Count of used elements stored in this cache
ItemsCount int64 `json:"items_count"`
// Count of empty elements slots in this cache
EmptyCount int64 `json:"empty_count"`
// Number of hits of queries, to store results in cache
HitCountLimit int64 `json:"hit_count_limit"`
}
//Operation counter and server id
type LsnT struct {
// Operation counter
Counter int64 `json:"counter"`
// Node identifyer
ServerId int `json:"server_id"`
}
// MasterReplicationState information about reindexer-master state
type MasterReplicationState struct {
// Last Log Sequence Number (LSN) of applied namespace modification
LastLSN LsnT `json:"last_upstream_lsn"`
// Hashsum of all records in namespace
DataHash uint64 `json:"data_hash"`
// Data updated timestamp
UpdatedUnixNano int64 `json:"updated_unix_nano"`
// Items count in master namespace
DataCount int64 `json:"data_count"`
}
// NamespaceMemStat information about reindexer's namespace memory statisctics
// and located in '#memstats' system namespace
type NamespaceMemStat struct {
// Name of namespace
Name string `json:"name"`
// [[deperecated]]. do not use
StorageError string `json:"storage_error"`
// Filesystem path to namespace storage
StoragePath string `json:"storage_path"`
// Status of disk storage
StorageOK bool `json:"storage_ok"`
// Background indexes optimization has been completed
OptimizationCompleted bool `json:"optimization_completed"`
// Total count of documents in namespace
ItemsCount int64 `json:"items_count,omitempty"`
// Count of emopy(unused) slots in namespace
EmptyItemsCount int64 `json:"empty_items_count"`
// Raw size of documents, stored in the namespace, except string fields
DataSize int64 `json:"data_size"`
// Summary of total namespace memory consumption
Total struct {
// Total memory size of stored documents, including system structures
DataSize int64 `json:"data_size"`
// Total memory consumption of namespace's indexes
IndexesSize int64 `json:"indexes_size"`
// Total memory consumption of namespace's caches. e.g. idset and join caches
CacheSize int64 `json:"cache_size"`
} `json:"total"`
// Replication status of namespace
Replication struct {
// Last Log Sequence Number (LSN) of applied namespace modification
LastLSN LsnT `json:"last_lsn_v2"`
// If true, then namespace is in slave mode
SlaveMode bool `json:"slave_mode"`
// True enable replication
ReplicatorEnabled bool `json:"replicator_enabled"`
// Temporary namespace flag
Temporary bool `json:"temporary"`
// Number of storage's master <-> slave switches
IncarnationCounter int64 `json:"incarnation_counter"`
// Hashsum of all records in namespace
DataHash uint64 `json:"data_hash"`
// Data count
DataCount int `json:"data_count"`
// Write Ahead Log (WAL) records count
WalCount int64 `json:"wal_count"`
// Total memory consumption of Write Ahead Log (WAL)
WalSize int64 `json:"wal_size"`
// Data updated timestamp
UpdatedUnixNano int64 `json:"updated_unix_nano"`
// Current replication status
Status string `json:"status"`
// Origin LSN of last replication operation
OriginLSN LsnT `json:"origin_lsn"`
// Last LSN of api operation (not from replication)
LastSelfLSN LsnT `json:"last_self_lsn"`
// Last Log Sequence Number (LSN) of applied namespace modification
LastUpstreamLSN LsnT `json:"last_upstream_lsn"`
// Last replication error code
ErrorCode int64 `json:"error_code"`
// Last replication error message
ErrorMessage string `json:"error_message"`
// Master's state
MasterState MasterReplicationState `json:"master_state"`
} `json:"replication"`
// Indexes memory statistic
Indexes []struct {
// Name of index. There are special index with name `-tuple`. It's stores original document's json structure with non indexe fields
Name string `json:"name"`
// Count of unique keys values stored in index
UniqKeysCount int64 `json:"unique_keys_count"`
// Total memory consumption of documents's data, holded by index
DataSize int64 `json:"data_size"`
// Total memory consumption of SORT statement and `GT`, `LT` conditions optimized structures. Applicabe only to `tree` indexes
SortOrdresSize int64 `json:"sort_orders_size"`
// Total memory consumption of reverse index vectors. For `store` ndexes always 0
IDSetPlainSize int64 `json:"idset_plain_size"`
// Total memory consumption of reverse index b-tree structures. For `dense` and `store` indexes always 0
IDSetBTreeSize int64 `json:"idset_btree_size"`
// Total memory consumption of fulltext search structures
FulltextSize int64 `json:"fulltext_size"`
// Idset cache stats. Stores merged reverse index results of SELECT field IN(...) by IN(...) keys
IDSetCache CacheMemStat `json:"idset_cache"`
} `json:"indexes"`
// Join cache stats. Stores results of selects to right table by ON condition
JoinCache CacheMemStat `json:"join_cache"`
// Query cache stats. Stores results of SELECT COUNT(*) by Where conditions
QueryCache CacheMemStat `json:"query_cache"`
}
// PerfStat is information about different reinexer's objects performance statistics
type PerfStat struct {
// Total count of queries to this object
TotalQueriesCount int64 `json:"total_queries_count"`
// Average latency (execution time) for queries to this object
TotalAvgLatencyUs int64 `json:"total_avg_latency_us"`
// Average waiting time for acquiring lock to this object
TotalAvgLockTimeUs int64 `json:"total_avg_lock_time_us"`
// Count of queries to this object, requested at last second
LastSecQPS int64 `json:"last_sec_qps"`
// Average latency (execution time) for queries to this object at last second
LastSecAvgLatencyUs int64 `json:"last_sec_avg_latency_us"`
// Average waiting time for acquiring lock to this object at last second
LastSecAvgLockTimeUs int64 `json:"last_sec_avg_lock_time_us"`
// Minimal latency value
MinLatencyUs int64 `json:"min_latency_us"`
// Maximum latency value
MaxLatencyUs int64 `json:"max_latency_us"`
// Standard deviation of latency values
LatencyStddev int64 `json:"latency_stddev"`
}
// TxPerfStat is information about transactions performance statistics
type TxPerfStat struct {
// Total transactions count for namespace
TotalCount int64 `json:"total_count"`
// Total namespace copy operations
TotalCopyCount int64 `json:"total_copy_count"`
// Average steps count in transactions for this namespace
AvgStepsCount int64 `json:"avg_steps_count"`
// Minimum steps count in transactions for this namespace
MinStepsCount int64 `json:"min_steps_count"`
// Maximum steps count in transactions for this namespace
MaxStepsCount int64 `json:"max_steps_count"`
// Average transaction preparation time usec
AvgPrepareTimeUs int64 `json:"avg_prepare_time_us"`
// Minimum transaction preparation time usec
MinPrepareTimeUs int64 `json:"min_prepare_time_us"`
// Maximum transaction preparation time usec
MaxPrepareTimeUs int64 `json:"max_prepare_time_us"`
// Average transaction commit time usec
AvgCommitTimeUs int64 `json:"avg_commit_time_us"`
// Minimum transaction commit time usec
MinCommitTimeUs int64 `json:"min_commit_time_us"`
// Maximum transaction commit time usec
MaxCommitTimeUs int64 `json:"max_commit_time_us"`
// Average namespace copy time usec
AvgCopyTimeUs int64 `json:"avg_copy_time_us"`
// Maximum namespace copy time usec
MinCopyTimeUs int64 `json:"min_copy_time_us"`
// Minimum namespace copy time usec
MaxCopyTimeUs int64 `json:"max_copy_time_us"`
}
// NamespacePerfStat is information about namespace's performance statistics
// and located in '#perfstats' system namespace
type NamespacePerfStat struct {
// Name of namespace
Name string `json:"name"`
// Performance statistics for update operations
Updates PerfStat `json:"updates"`
// Performance statistics for select operations
Selects PerfStat `json:"selects"`
// Performance statistics for transactions
Transactions TxPerfStat `json:"transactions"`
}
// ClientConnectionStat is information about client connection
type ClientConnectionStat struct {
// Connection identifier
ConnectionId int64 `json:"connection_id"`
// client ip address
Ip string `json:"ip"`
// User name
UserName string `json:"user_name"`
// User right
UserRights string `json:"user_rights"`
// Database name
DbName string `json:"db_name"`
// Current activity
CurrentActivity string `json:"current_activity"`
// Server start time in unix timestamp
StartTime int64 `json:"start_time"`
// Receive bytes
RecvBytes int64 `json:"recv_bytes"`
// Sent bytes
SentBytes int64 `json:"sent_bytes"`
// Client version string
ClientVersion string `json:"client_version"`
// Send buffer size
SendBufBytes int64 `json:"send_buf_bytes"`
// Pended updates count
PendedUpdates int64 `json:"pended_updates"`
// Timestamp of last send operation (ms)
LastSendTs int64 `json:"last_send_ts"`
// Timestamp of last recv operation (ms)
LastRecvTs int64 `json:"last_recv_ts"`
// Current send rate (bytes/s)
SendRate int `json:"send_rate"`
// Current recv rate (bytes/s)
RecvRate int `json:"recv_rate"`
// Active transactions count
TxCount int `json:"tx_count"`
// Status of updates subscription
IsSubscribed bool `json:"is_subscribed"`
// Updates filter for this client
UpdatesFilter struct {
Namespaces []struct {
// Namespace name
Name string `json:"name"`
// Filtering conditions set
Filters []struct {
// Empty
} `json:"filters"`
} `json:"namespaces"`
} `json:"updates_filter"`
}
// QueryPerfStat is information about query's performance statistics
// and located in '#queriesperfstats' system namespace
type QueryPerfStat struct {
Query string `json:"query"`
PerfStat
}
// DBConfigItem is structure stored in system '#config` namespace
type DBConfigItem struct {
Type string `json:"type"`
Profiling *DBProfilingConfig `json:"profiling,omitempty"`
Namespaces *[]DBNamespacesConfig `json:"namespaces,omitempty"`
Replication *DBReplicationConfig `json:"replication,omitempty"`
}
// DBProfilingConfig is part of reindexer configuration contains profiling options
type DBProfilingConfig struct {
// Minimum query execution time to be recoreded in #queriesperfstats namespace
QueriesThresholdUS int `json:"queries_threshold_us"`
// Enables tracking memory statistics
MemStats bool `json:"memstats"`
// Enables tracking overal perofrmance statistics
PerfStats bool `json:"perfstats"`
// Enables record queries perofrmance statistics
QueriesPerfStats bool `json:"queriesperfstats"`
}
// DBNamespacesConfig is part of reindexer configuration contains namespaces options
type DBNamespacesConfig struct {
// Name of namespace, or `*` for setting to all namespaces
Namespace string `json:"namespace"`
// Log level of queries core logger
LogLevel string `json:"log_level"`
// Join cache mode. Can be one of on, off, aggressive
JoinCacheMode string `json:"join_cache_mode"`
// Enable namespace lazy load (namespace shoud be loaded from disk on first call, not at reindexer startup)
Lazyload bool `json:"lazyload"`
// Unload namespace data from RAM after this idle timeout in seconds. If 0, then data should not be unloaded
UnloadIdleThreshold int `json:"unload_idle_threshold"`
// Enable namespace copying for transaction with steps count greater than this value (if copy_politics_multiplier also allows this)
StartCopyPolicyTxSize int `json:"start_copy_policy_tx_size"`
// Disables copy policy if namespace size is greater than copy_policy_multiplier * start_copy_policy_tx_size
CopyPolicyMultiplier int `json:"copy_policy_multiplier"`
// Force namespace copying for transaction with steps count greater than this value
TxSizeToAlwaysCopy int `json:"tx_size_to_always_copy"`
// Timeout before background indexes optimization start after last update. 0 - disable optimizations
OptimizationTimeout int `json:"optimization_timeout_ms"`
// Maximum number of background threads of sort indexes optimization. 0 - disable sort optimizations
OptimizationSortWorkers int `json:"optimization_sort_workers"`
// Maximum WAL size for this namespace (maximum count of WAL records)
WALSize int64 `json:"wal_size"`
}
// DBReplicationConfig is part of reindexer configuration contains replication options
type DBReplicationConfig struct {
// Replication role. One of none, slave, master
Role string `json:"role"`
// DSN to master. Only cproto schema is supported
MasterDSN string `json:"master_dsn"`
// Cluster ID - must be same for client and for master
ClusterID int `json:"cluster_id"`
// force resync on logic error conditions
ForceSyncOnLogicError bool `json:"force_sync_on_logic_error"`
// force resync on wrong data hash conditions
ForceSyncOnWrongDataHash bool `json:"force_sync_on_wrong_data_hash"`
// List of namespaces for replication. If emply, all namespaces. All replicated namespaces will become read only for slave
Namespaces []string `json:"namespaces"`
}
// DescribeNamespaces makes a 'SELECT * FROM #namespaces' query to database.
// Return NamespaceDescription results, error
func (db *Reindexer) DescribeNamespaces() ([]*NamespaceDescription, error) {
result := []*NamespaceDescription{}
descs, err := db.Query(NamespacesNamespaceName).ExecCtx(db.ctx).FetchAll()
if err != nil {
return nil, err
}
for _, desc := range descs {
nsdesc, ok := desc.(*NamespaceDescription)
if ok {
result = append(result, nsdesc)
}
}
return result, nil
}
// DescribeNamespace makes a 'SELECT * FROM #namespaces' query to database.
// Return NamespaceDescription results, error
func (db *Reindexer) DescribeNamespace(namespace string) (*NamespaceDescription, error) {
return db.impl.describeNamespace(db.ctx, namespace)
}
func (db *reindexerImpl) describeNamespace(ctx context.Context, namespace string) (*NamespaceDescription, error) {
desc, err := db.query(NamespacesNamespaceName).Where("name", EQ, namespace).ExecCtx(ctx).FetchOne()
if err != nil {
return nil, err
}
return desc.(*NamespaceDescription), nil
}
// GetNamespacesMemStat makes a 'SELECT * FROM #memstats' query to database.
// Return NamespaceMemStat results, error
func (db *Reindexer) GetNamespacesMemStat() ([]*NamespaceMemStat, error) {
result := []*NamespaceMemStat{}
descs, err := db.Query(MemstatsNamespaceName).ExecCtx(db.ctx).FetchAll()
if err != nil {
return nil, err
}
for _, desc := range descs {
nsdesc, ok := desc.(*NamespaceMemStat)
if ok {
result = append(result, nsdesc)
}
}
return result, nil
}
// GetNamespaceMemStat makes a 'SELECT * FROM #memstat' query to database.
// Return NamespaceMemStat results, error
func (db *Reindexer) GetNamespaceMemStat(namespace string) (*NamespaceMemStat, error) {
desc, err := db.Query(MemstatsNamespaceName).Where("name", EQ, namespace).ExecCtx(db.ctx).FetchOne()
if err != nil {
return nil, err
}
return desc.(*NamespaceMemStat), nil
}