forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery-result.hh
455 lines (395 loc) · 17.1 KB
/
query-result.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "bytes_ostream.hh"
#include "utils/digest_algorithm.hh"
#include "query-request.hh"
#include "full_position.hh"
#include <optional>
#include <fmt/ostream.h>
#include <seastar/util/bool_class.hh>
#include "seastarx.hh"
namespace query {
struct short_read_tag { };
using short_read = bool_class<short_read_tag>;
// result_memory_limiter, result_memory_accounter and result_memory_tracker
// form an infrastructure for limiting size of query results.
//
// result_memory_limiter is a shard-local object which ensures that all results
// combined do not use more than 10% of the shard memory.
//
// result_memory_accounter is used by result producers, updates the shard-local
// limits as well as keeps track of the individual maximum result size limit
// which is 1 MB.
//
// result_memory_tracker is just an object that makes sure the
// result_memory_limiter is notified when memory is released (but not sooner).
class result_memory_accounter;
class result_memory_limiter {
const size_t _maximum_total_result_memory;
semaphore _memory_limiter;
public:
static constexpr size_t minimum_result_size = 4 * 1024;
static constexpr size_t maximum_result_size = 1 * 1024 * 1024;
static constexpr size_t unlimited_result_size = std::numeric_limits<size_t>::max();
public:
explicit result_memory_limiter(size_t maximum_total_result_memory)
: _maximum_total_result_memory(maximum_total_result_memory)
, _memory_limiter(_maximum_total_result_memory)
{ }
result_memory_limiter(const result_memory_limiter&) = delete;
result_memory_limiter(result_memory_limiter&&) = delete;
ssize_t total_used_memory() const {
return _maximum_total_result_memory - _memory_limiter.available_units();
}
// Reserves minimum_result_size and creates new memory accounter for
// mutation query. Uses the specified maximum result size and may be
// stopped before reaching it due to memory pressure on shard.
future<result_memory_accounter> new_mutation_read(query::max_result_size max_result_size, short_read short_read_allowed);
// Reserves minimum_result_size and creates new memory accounter for
// data query. Uses the specified maximum result size, result will *not*
// be stopped due to on shard memory pressure in order to avoid digest
// mismatches.
future<result_memory_accounter> new_data_read(query::max_result_size max_result_size, short_read short_read_allowed);
// Creates a memory accounter for digest reads. Such accounter doesn't
// contribute to the shard memory usage, but still stops producing the
// result after individual limit has been reached.
future<result_memory_accounter> new_digest_read(query::max_result_size max_result_size, short_read short_read_allowed);
// Checks whether the result can grow any more, takes into account only
// the per shard limit.
stop_iteration check() const {
return stop_iteration(_memory_limiter.current() <= 0);
}
// Consumes n bytes from memory limiter and checks whether the result
// can grow any more (considering just the per-shard limit).
stop_iteration update_and_check(size_t n) {
_memory_limiter.consume(n);
return check();
}
void release(size_t n) noexcept {
_memory_limiter.signal(n);
}
semaphore& sem() noexcept { return _memory_limiter; }
};
class result_memory_tracker {
semaphore_units<> _units;
size_t _used_memory;
private:
static thread_local semaphore _dummy;
public:
result_memory_tracker() noexcept : _units(_dummy, 0), _used_memory(0) { }
result_memory_tracker(semaphore& sem, size_t blocked, size_t used) noexcept
: _units(sem, blocked), _used_memory(used) { }
size_t used_memory() const { return _used_memory; }
};
class result_memory_accounter {
result_memory_limiter* _limiter = nullptr;
size_t _blocked_bytes = 0;
size_t _used_memory = 0;
size_t _total_used_memory = 0;
query::max_result_size _maximum_result_size;
stop_iteration _stop_on_global_limit;
short_read _short_read_allowed;
mutable bool _below_soft_limit = true;
private:
// Mutation query accounter. Uses provided individual result size limit and
// will stop when shard memory pressure grows too high.
struct mutation_query_tag { };
explicit result_memory_accounter(mutation_query_tag, result_memory_limiter& limiter, query::max_result_size max_size, short_read short_read_allowed) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
, _maximum_result_size(max_size)
, _stop_on_global_limit(true)
, _short_read_allowed(short_read_allowed)
{ }
// Data query accounter. Uses provided individual result size limit and
// will *not* stop even though shard memory pressure grows too high.
struct data_query_tag { };
explicit result_memory_accounter(data_query_tag, result_memory_limiter& limiter, query::max_result_size max_size, short_read short_read_allowed) noexcept
: _limiter(&limiter)
, _blocked_bytes(result_memory_limiter::minimum_result_size)
, _maximum_result_size(max_size)
, _short_read_allowed(short_read_allowed)
{ }
// Digest query accounter. Uses provided individual result size limit and
// will *not* stop even though shard memory pressure grows too high. This
// accounter does not contribute to the shard memory limits.
struct digest_query_tag { };
explicit result_memory_accounter(digest_query_tag, result_memory_limiter&, query::max_result_size max_size, short_read short_read_allowed) noexcept
: _blocked_bytes(0)
, _maximum_result_size(max_size)
, _short_read_allowed(short_read_allowed)
{ }
stop_iteration check_local_limit() const;
friend class result_memory_limiter;
public:
explicit result_memory_accounter(size_t max_size) noexcept
: _blocked_bytes(0)
, _maximum_result_size(max_size) {
}
result_memory_accounter(result_memory_accounter&& other) noexcept
: _limiter(std::exchange(other._limiter, nullptr))
, _blocked_bytes(other._blocked_bytes)
, _used_memory(other._used_memory)
, _total_used_memory(other._total_used_memory)
, _maximum_result_size(other._maximum_result_size)
, _stop_on_global_limit(other._stop_on_global_limit)
, _short_read_allowed(other._short_read_allowed)
, _below_soft_limit(other._below_soft_limit)
{ }
result_memory_accounter& operator=(result_memory_accounter&& other) noexcept {
if (this != &other) {
this->~result_memory_accounter();
new (this) result_memory_accounter(std::move(other));
}
return *this;
}
~result_memory_accounter() {
if (_limiter) {
_limiter->release(_blocked_bytes);
}
}
size_t used_memory() const { return _used_memory; }
// Consume n more bytes for the result. Returns stop_iteration::yes if
// the result cannot grow any more (taking into account both individual
// and per-shard limits).
stop_iteration update_and_check(size_t n) {
_used_memory += n;
_total_used_memory += n;
auto stop = check_local_limit();
if (_limiter && _used_memory > _blocked_bytes) {
auto to_block = std::min(_used_memory - _blocked_bytes, n);
_blocked_bytes += to_block;
stop = (_limiter->update_and_check(to_block) && _stop_on_global_limit) || stop;
if (stop && !_short_read_allowed) {
// If we are here we stopped because of the global limit.
throw std::runtime_error("Maximum amount of memory for building query results is exhausted, unpaged query cannot be finished");
}
}
return stop;
}
// Checks whether the result can grow any more.
stop_iteration check() const {
auto stop = check_local_limit();
if (!stop && _used_memory >= _blocked_bytes && _limiter) {
return _limiter->check() && _stop_on_global_limit;
}
return stop;
}
// Consume n more bytes for the result.
void update(size_t n) {
update_and_check(n);
}
result_memory_tracker done() && {
if (!_limiter) {
return { };
}
auto& sem = std::exchange(_limiter, nullptr)->sem();
return result_memory_tracker(sem, _blocked_bytes, _used_memory);
}
};
inline future<result_memory_accounter> result_memory_limiter::new_mutation_read(query::max_result_size max_size, short_read short_read_allowed) {
return _memory_limiter.wait(minimum_result_size).then([this, max_size, short_read_allowed] {
return result_memory_accounter(result_memory_accounter::mutation_query_tag(), *this, max_size, short_read_allowed);
});
}
inline future<result_memory_accounter> result_memory_limiter::new_data_read(query::max_result_size max_size, short_read short_read_allowed) {
return _memory_limiter.wait(minimum_result_size).then([this, max_size, short_read_allowed] {
return result_memory_accounter(result_memory_accounter::data_query_tag(), *this, max_size, short_read_allowed);
});
}
inline future<result_memory_accounter> result_memory_limiter::new_digest_read(query::max_result_size max_size, short_read short_read_allowed) {
return make_ready_future<result_memory_accounter>(result_memory_accounter(result_memory_accounter::digest_query_tag(), *this, max_size, short_read_allowed));
}
enum class result_request {
only_result,
only_digest,
result_and_digest,
};
struct result_options {
result_request request = result_request::only_result;
digest_algorithm digest_algo = query::digest_algorithm::none;
static result_options only_result() {
return result_options{};
}
static result_options only_digest(digest_algorithm da) {
return {result_request::only_digest, da};
}
};
class result_digest {
public:
using type = std::array<uint8_t, 16>;
private:
type _digest;
public:
result_digest() = default;
result_digest(type&& digest) : _digest(std::move(digest)) {}
const type& get() const { return _digest; }
bool operator==(const result_digest& rh) const = default;
};
//
// The query results are stored in a serialized form. This is in order to
// address the following problems, which a structured format has:
//
// - high level of indirection (vector of vectors of vectors of blobs), which
// is not CPU cache friendly
//
// - high allocation rate due to fine-grained object structure
//
// On replica side, the query results are probably going to be serialized in
// the transport layer anyway, so serializing the results up-front doesn't add
// net work. There is no processing of the query results on replica other than
// concatenation in case of range queries and checksum calculation. If query
// results are collected in serialized form from different cores, we can
// concatenate them without copying by simply appending the fragments into the
// packet.
//
// On coordinator side, the query results would have to be parsed from the
// transport layer buffers anyway, so the fact that iterators parse it also
// doesn't add net work, but again saves allocations and copying. The CQL
// server doesn't need complex data structures to process the results, it just
// goes over it linearly consuming it.
//
// The coordinator side could be optimized even further for CQL queries which
// do not need processing (eg. select * from cf where ...). We could make the
// replica send the query results in the format which is expected by the CQL
// binary protocol client. So in the typical case the coordinator would just
// pass the data using zero-copy to the client, prepending a header.
//
// Users which need more complex structure of query results can convert this
// to query::result_set.
//
// Related headers:
// - query-result-reader.hh
// - query-result-writer.hh
class result {
bytes_ostream _w;
std::optional<result_digest> _digest;
std::optional<uint32_t> _row_count_low_bits;
api::timestamp_type _last_modified = api::missing_timestamp;
short_read _short_read;
query::result_memory_tracker _memory_tracker;
std::optional<uint32_t> _partition_count;
std::optional<uint32_t> _row_count_high_bits;
std::optional<full_position> _last_position;
public:
class builder;
class partition_writer;
friend class result_merger;
result();
result(bytes_ostream&& w, short_read sr, std::optional<uint32_t> c_low_bits, std::optional<uint32_t> pc,
std::optional<uint32_t> c_high_bits, std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _row_count_low_bits(c_low_bits)
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(c_high_bits)
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(bytes_ostream&& w, std::optional<result_digest> d, api::timestamp_type last_modified,
short_read sr, std::optional<uint32_t> c_low_bits, std::optional<uint32_t> pc, std::optional<uint32_t> c_high_bits,
std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _digest(d)
, _row_count_low_bits(c_low_bits)
, _last_modified(last_modified)
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(c_high_bits)
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(bytes_ostream&& w, short_read sr, uint64_t c, std::optional<uint32_t> pc,
std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _row_count_low_bits(static_cast<uint32_t>(c))
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(static_cast<uint32_t>(c >> 32))
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(bytes_ostream&& w, std::optional<result_digest> d, api::timestamp_type last_modified,
short_read sr, uint64_t c, std::optional<uint32_t> pc, std::optional<full_position> last_position, result_memory_tracker memory_tracker = { })
: _w(std::move(w))
, _digest(d)
, _row_count_low_bits(static_cast<uint32_t>(c))
, _last_modified(last_modified)
, _short_read(sr)
, _memory_tracker(std::move(memory_tracker))
, _partition_count(pc)
, _row_count_high_bits(static_cast<uint32_t>(c >> 32))
, _last_position(std::move(last_position))
{
w.reduce_chunk_count();
}
result(result&&) = default;
result& operator=(result&&) = default;
const bytes_ostream& buf() const {
return _w;
}
const std::optional<result_digest>& digest() const {
return _digest;
}
const std::optional<uint32_t> row_count_low_bits() const {
return _row_count_low_bits;
}
const std::optional<uint32_t> row_count_high_bits() const {
return _row_count_high_bits;
}
const std::optional<uint64_t> row_count() const {
if (!_row_count_low_bits) {
return _row_count_low_bits;
}
return (static_cast<uint64_t>(_row_count_high_bits.value_or(0)) << 32) | _row_count_low_bits.value();
}
void set_row_count(std::optional<uint64_t> row_count) {
if (!row_count) {
_row_count_low_bits = std::nullopt;
_row_count_high_bits = std::nullopt;
} else {
_row_count_low_bits = std::make_optional(static_cast<uint32_t>(row_count.value()));
_row_count_high_bits = std::make_optional(static_cast<uint32_t>(row_count.value() >> 32));
}
}
api::timestamp_type last_modified() const {
return _last_modified;
}
short_read is_short_read() const {
return _short_read;
}
const std::optional<uint32_t>& partition_count() const {
return _partition_count;
}
void ensure_counts();
const std::optional<full_position>& last_position() const {
return _last_position;
}
void set_last_position(std::optional<full_position> last_position) {
_last_position = std::move(last_position);
}
// Return _last_position if replica filled it, otherwise calculate it based
// on the content (by looking up the last row in the last partition).
full_position get_or_calculate_last_position() const;
struct printer {
schema_ptr s;
const query::partition_slice& slice;
const query::result& res;
};
sstring pretty_print(schema_ptr, const query::partition_slice&) const;
printer pretty_printer(schema_ptr, const query::partition_slice&) const;
};
std::ostream& operator<<(std::ostream& os, const query::result::printer&);
}
template <> struct fmt::formatter<query::result::printer> : fmt::ostream_formatter {};