forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultishard_mutation_query.cc
1012 lines (878 loc) · 42.3 KB
/
multishard_mutation_query.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "schema/schema_registry.hh"
#include "multishard_mutation_query.hh"
#include "mutation_query.hh"
#include "replica/database.hh"
#include "db/config.hh"
#include "query-result-writer.hh"
#include "query_result_merger.hh"
#include "readers/multishard.hh"
#include <fmt/core.h>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/as_future.hh>
#include <boost/range/adaptor/reversed.hpp>
#include <fmt/ostream.h>
logging::logger mmq_log("multishard_mutation_query");
template <typename T>
using foreign_unique_ptr = foreign_ptr<std::unique_ptr<T>>;
/// Context object for a multishard read.
///
/// Handles logic related to looking up, creating, saving and cleaning up remote
/// (shard) readers for the `multishard_mutation_reader`.
/// Has a state machine for each of the shard readers. See the state transition
/// diagram below, above the declaration of `reader state`.
/// The `read_context` is a short-lived object that is only kept around for the
/// duration of a single page. A new `read_context` is created on each page and
/// is discarded at the end of the page, after the readers are either saved
/// or the process of their safe disposal was started in the background.
/// Intended usage:
/// * Create the `read_context`.
/// * Call `read_context::lookup_readers()` to find any saved readers from the
/// previous page.
/// * Create the `multishard_mutation_reader`.
/// * Fill the page.
/// * Destroy the `multishard_mutation_reader` to trigger the disposal of the
/// shard readers.
/// * Call `read_context::save_readers()` if the read didn't finish yet, that is
/// more pages are expected.
/// * Call `read_context::stop()` to initiate the cleanup of any unsaved readers
/// and their dependencies.
/// * Destroy the `read_context`.
///
/// Note:
/// 1) Each step can only be started when the previous phase has finished.
/// 2) This usage is implemented in the `do_query()` function below.
/// 3) Both, `read_context::lookup_readers()` and `read_context::save_readers()`
/// knows to do nothing when the query is not stateful and just short
/// circuit.
class read_context : public reader_lifecycle_policy_v2 {
// ( ) (O)
// | ^
// | |
// +--- inexistent ---+
// | |
// (1) | (3) |
// | |
// successful_lookup |
// | | |
// | | |
// | | (3) |
// | +---------> used
// (2) | |
// | (4) |
// | |
// +---------------> saving_state
// |
// (O)
//
// 1) lookup_readers()
// 2) save_readers()
// 3) create_reader()
// 4) destroy_reader()
enum class reader_state {
inexistent,
successful_lookup,
used,
saving,
};
struct reader_meta {
struct remote_parts {
reader_permit permit;
lw_shared_ptr<const dht::partition_range> range;
std::unique_ptr<const query::partition_slice> slice;
utils::phased_barrier::operation read_operation;
std::optional<reader_concurrency_semaphore::inactive_read_handle> handle;
std::optional<mutation_reader::tracked_buffer> buffer;
remote_parts(
reader_permit permit,
lw_shared_ptr<const dht::partition_range> range = nullptr,
std::unique_ptr<const query::partition_slice> slice = nullptr,
utils::phased_barrier::operation read_operation = {},
std::optional<reader_concurrency_semaphore::inactive_read_handle> handle = {})
: permit(std::move(permit))
, range(std::move(range))
, slice(std::move(slice))
, read_operation(std::move(read_operation))
, handle(std::move(handle)) {
}
};
reader_state state = reader_state::inexistent;
foreign_unique_ptr<remote_parts> rparts;
std::optional<mutation_reader::tracked_buffer> dismantled_buffer;
reader_meta() = default;
// Remote constructor.
reader_meta(reader_state s, std::optional<remote_parts> rp = {})
: state(s) {
if (rp) {
rparts = make_foreign(std::make_unique<remote_parts>(std::move(*rp)));
}
}
mutation_reader::tracked_buffer& get_dismantled_buffer(const reader_permit& permit) {
if (!dismantled_buffer) {
dismantled_buffer.emplace(permit);
}
return *dismantled_buffer;
}
};
struct dismantle_buffer_stats {
size_t partitions = 0;
size_t fragments = 0;
size_t bytes = 0;
size_t discarded_partitions = 0;
size_t discarded_fragments = 0;
size_t discarded_bytes = 0;
void add(const mutation_fragment_v2& mf) {
partitions += unsigned(mf.is_partition_start());
++fragments;
bytes += mf.memory_usage();
}
void add(const schema& s, const range_tombstone_change& rtc) {
++fragments;
bytes += rtc.memory_usage(s);
}
void add(const schema& s, const static_row& sr) {
++fragments;
bytes += sr.memory_usage(s);
}
void add(const schema& s, const partition_start& ps) {
++partitions;
++fragments;
bytes += ps.memory_usage(s);
}
void add_discarded(const mutation_fragment_v2& mf) {
discarded_partitions += unsigned(mf.is_partition_start());
++discarded_fragments;
discarded_bytes += mf.memory_usage();
}
void add_discarded(const schema& s, const range_tombstone_change& rtc) {
++discarded_fragments;
discarded_bytes += rtc.memory_usage(s);
}
void add_discarded(const schema& s, const static_row& sr) {
++discarded_fragments;
discarded_bytes += sr.memory_usage(s);
}
void add_discarded(const schema& s, const partition_start& ps) {
++discarded_partitions;
++discarded_fragments;
discarded_bytes += ps.memory_usage(s);
}
};
distributed<replica::database>& _db;
schema_ptr _schema;
locator::effective_replication_map_ptr _erm;
reader_permit _permit;
const query::read_command& _cmd;
const dht::partition_range_vector& _ranges;
tracing::trace_state_ptr _trace_state;
// One for each shard. Index is shard id.
std::vector<reader_meta> _readers;
std::vector<reader_concurrency_semaphore*> _semaphores;
static std::string_view reader_state_to_string(reader_state rs);
dismantle_buffer_stats dismantle_combined_buffer(mutation_reader::tracked_buffer combined_buffer, const dht::decorated_key& pkey);
dismantle_buffer_stats dismantle_compaction_state(detached_compaction_state compaction_state);
future<> save_reader(shard_id shard, full_position_view last_pos);
friend fmt::formatter<dismantle_buffer_stats>;
public:
read_context(distributed<replica::database>& db, schema_ptr s, locator::effective_replication_map_ptr erm,
const query::read_command& cmd, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout)
: _db(db)
, _schema(std::move(s))
, _erm(std::move(erm))
, _permit(_db.local().get_reader_concurrency_semaphore().make_tracking_only_permit(_schema, "multishard-mutation-query", timeout, trace_state))
, _cmd(cmd)
, _ranges(ranges)
, _trace_state(std::move(trace_state))
, _semaphores(smp::count, nullptr) {
_readers.resize(smp::count);
_permit.set_max_result_size(get_max_result_size());
if (!_erm->get_replication_strategy().is_vnode_based()) {
// The algorithm is full of assumptions about shard assignment being round-robin, static, and full.
// This does not hold for tablets. We chose to avoid this algorithm rather than adapting it.
// The coordinator should split ranges accordingly.
on_internal_error(mmq_log, format("multishard reader cannot be used on tables with non-static sharding: {}.{}",
_schema->ks_name(), _schema->cf_name()));
}
}
read_context(read_context&&) = delete;
read_context(const read_context&) = delete;
read_context& operator=(read_context&&) = delete;
read_context& operator=(const read_context&) = delete;
distributed<replica::database>& db() {
return _db;
}
reader_permit permit() const {
return _permit;
}
const locator::effective_replication_map_ptr& erm() const {
return _erm;
}
query::max_result_size get_max_result_size() {
return _cmd.max_result_size ? *_cmd.max_result_size : _db.local().get_query_max_result_size();
}
virtual mutation_reader create_reader(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) override;
virtual const dht::partition_range* get_read_range() const override;
virtual void update_read_range(lw_shared_ptr<const dht::partition_range> range) override;
virtual future<> destroy_reader(stopped_reader reader) noexcept override;
virtual reader_concurrency_semaphore& semaphore() override {
const auto shard = this_shard_id();
if (!_semaphores[shard]) {
_semaphores[shard] = &_db.local().get_reader_concurrency_semaphore();
}
return *_semaphores[shard];
}
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr) override {
const auto shard = this_shard_id();
auto& rm = _readers[shard];
if (rm.state == reader_state::successful_lookup) {
rm.rparts->permit.set_max_result_size(get_max_result_size());
co_return rm.rparts->permit;
}
auto permit = co_await _db.local().obtain_reader_permit(std::move(schema), description, timeout, std::move(trace_ptr));
permit.set_max_result_size(get_max_result_size());
co_return permit;
}
future<> lookup_readers(db::timeout_clock::time_point timeout) noexcept;
future<> save_readers(mutation_reader::tracked_buffer unconsumed_buffer, std::optional<detached_compaction_state> compaction_state,
full_position last_pos) noexcept;
future<> stop();
};
template <> struct fmt::formatter<read_context::dismantle_buffer_stats> : fmt::formatter<string_view> {
auto format(const read_context::dismantle_buffer_stats& s, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(),
"kept {} partitions/{} fragments/{} bytes, discarded {} partitions/{} fragments/{} bytes",
s.partitions,
s.fragments,
s.bytes,
s.discarded_partitions,
s.discarded_fragments,
s.discarded_bytes);
}
};
std::string_view read_context::reader_state_to_string(reader_state rs) {
switch (rs) {
case reader_state::inexistent:
return "inexistent";
case reader_state::successful_lookup:
return "successful_lookup";
case reader_state::used:
return "used";
case reader_state::saving:
return "saving";
}
// If we got here, we are logging an error anyway, so the above layers
// (should) have detected the invalid state.
return "invalid";
}
mutation_reader read_context::create_reader(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
const auto shard = this_shard_id();
auto& rm = _readers[shard];
if (rm.state != reader_state::used && rm.state != reader_state::successful_lookup && rm.state != reader_state::inexistent) {
auto msg = seastar::format("Unexpected request to create reader for shard {}."
" The reader is expected to be in either `used`, `successful_lookup` or `inexistent` state,"
" but is in `{}` state instead.", shard, reader_state_to_string(rm.state));
mmq_log.warn("{}", msg);
throw std::logic_error(msg.c_str());
}
// The reader is either in inexistent or successful lookup state.
if (rm.state == reader_state::successful_lookup) {
if (auto reader_opt = semaphore().unregister_inactive_read(std::move(*rm.rparts->handle))) {
rm.state = reader_state::used;
// The saved reader permit is expected to be the same one passed to create_reader,
// as returned from obtain_reader_permit()
if (reader_opt->permit() != permit) {
on_internal_error(mmq_log, "read_context::create_reader(): passed-in permit is different than saved reader's permit");
}
permit.set_trace_state(trace_state);
return std::move(*reader_opt);
}
}
auto& table = _db.local().find_column_family(schema);
auto remote_parts = reader_meta::remote_parts(
std::move(permit),
make_lw_shared<const dht::partition_range>(pr),
std::make_unique<const query::partition_slice>(ps),
table.read_in_progress());
if (!rm.rparts) {
rm.rparts = make_foreign(std::make_unique<reader_meta::remote_parts>(std::move(remote_parts)));
} else {
*rm.rparts = std::move(remote_parts);
}
rm.state = reader_state::used;
return table.as_mutation_source().make_reader_v2(std::move(schema), rm.rparts->permit, *rm.rparts->range, *rm.rparts->slice,
std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
}
const dht::partition_range* read_context::get_read_range() const {
auto& rm = _readers[this_shard_id()];
if (rm.rparts) {
return rm.rparts->range.get();
}
return nullptr;
}
void read_context::update_read_range(lw_shared_ptr<const dht::partition_range> range) {
auto& rm = _readers[this_shard_id()];
rm.rparts->range = std::move(range);
}
future<> read_context::destroy_reader(stopped_reader reader) noexcept {
auto& rm = _readers[this_shard_id()];
if (rm.state == reader_state::used) {
rm.state = reader_state::saving;
rm.rparts->handle = std::move(reader.handle);
rm.rparts->buffer = std::move(reader.unconsumed_fragments);
} else {
mmq_log.warn(
"Unexpected request to dismantle reader in state `{}`."
" Reader was not created nor is in the process of being created.",
reader_state_to_string(rm.state));
}
return make_ready_future<>();
}
future<> read_context::stop() {
return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) {
if (_readers[shard].rparts) {
return _db.invoke_on(shard, [&rparts_fptr = _readers[shard].rparts] (replica::database& db) mutable {
auto rparts = rparts_fptr.release();
if (rparts->handle) {
auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle));
if (reader_opt) {
return reader_opt->close().then([rparts = std::move(rparts)] { });
}
}
return make_ready_future<>();
});
}
return make_ready_future<>();
});
}
read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(mutation_reader::tracked_buffer combined_buffer,
const dht::decorated_key& pkey) {
auto& sharder = _erm->get_sharder(*_schema);
std::vector<mutation_fragment_v2> tmp_buffer;
dismantle_buffer_stats stats;
auto rit = std::reverse_iterator(combined_buffer.end());
const auto rend = std::reverse_iterator(combined_buffer.begin());
for (;rit != rend; ++rit) {
if (rit->is_partition_start()) {
const auto shard = sharder.shard_for_reads(rit->as_partition_start().key().token());
// It is possible that the reader this partition originates from
// does not exist anymore. Either because we failed stopping it or
// because it was evicted.
if (_readers[shard].state != reader_state::saving) {
for (auto& smf : tmp_buffer) {
stats.add_discarded(smf);
}
stats.add_discarded(*rit);
tmp_buffer.clear();
continue;
}
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
for (auto& smf : tmp_buffer) {
stats.add(smf);
shard_buffer.emplace_front(std::move(smf));
}
stats.add(*rit);
shard_buffer.emplace_front(std::move(*rit));
tmp_buffer.clear();
} else {
tmp_buffer.emplace_back(std::move(*rit));
}
}
const auto shard = sharder.shard_for_reads(pkey.token());
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
for (auto& smf : tmp_buffer) {
stats.add(smf);
shard_buffer.emplace_front(std::move(smf));
}
return stats;
}
read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(detached_compaction_state compaction_state) {
auto stats = dismantle_buffer_stats();
auto& sharder = _erm->get_sharder(*_schema);
const auto shard = sharder.shard_for_reads(compaction_state.partition_start.key().token());
auto& rtc_opt = compaction_state.current_tombstone;
// It is possible that the reader this partition originates from does not
// exist anymore. Either because we failed stopping it or because it was
// evicted.
if (_readers[shard].state != reader_state::saving) {
if (rtc_opt) {
stats.add_discarded(*_schema, *rtc_opt);
}
if (compaction_state.static_row) {
stats.add_discarded(*_schema, *compaction_state.static_row);
}
stats.add_discarded(*_schema, compaction_state.partition_start);
return stats;
}
auto& shard_buffer = _readers[shard].get_dismantled_buffer(_permit);
if (rtc_opt) {
stats.add(*_schema, *rtc_opt);
shard_buffer.emplace_front(*_schema, _permit, std::move(*rtc_opt));
}
if (compaction_state.static_row) {
stats.add(*_schema, *compaction_state.static_row);
shard_buffer.emplace_front(*_schema, _permit, std::move(*compaction_state.static_row));
}
stats.add(*_schema, compaction_state.partition_start);
shard_buffer.emplace_front(*_schema, _permit, std::move(compaction_state.partition_start));
return stats;
}
future<> read_context::save_reader(shard_id shard, full_position_view last_pos) {
return do_with(std::exchange(_readers[shard], {}), [this, shard, last_pos] (reader_meta& rm) mutable {
return _db.invoke_on(shard, [query_uuid = _cmd.query_uuid, query_ranges = _ranges, &rm,
last_pos, gts = tracing::global_trace_state_ptr(_trace_state)] (replica::database& db) mutable {
try {
auto rparts = rm.rparts.release(); // avoid another round-trip when destroying rparts
auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle));
if (!reader_opt) {
return make_ready_future<>();
}
mutation_reader_opt reader = std::move(*reader_opt);
size_t fragments = 0;
const auto size_before = reader->buffer_size();
const auto& schema = *reader->schema();
if (rparts->buffer) {
fragments += rparts->buffer->size();
auto rit = std::reverse_iterator(rparts->buffer->end());
auto rend = std::reverse_iterator(rparts->buffer->begin());
for (; rit != rend; ++rit) {
reader->unpop_mutation_fragment(std::move(*rit));
}
}
if (rm.dismantled_buffer) {
fragments += rm.dismantled_buffer->size();
auto rit = std::reverse_iterator(rm.dismantled_buffer->cend());
auto rend = std::reverse_iterator(rm.dismantled_buffer->cbegin());
for (; rit != rend; ++rit) {
// Copy the fragment, the buffer is on another shard.
reader->unpop_mutation_fragment(mutation_fragment_v2(schema, rparts->permit, *rit));
}
}
const auto size_after = reader->buffer_size();
auto querier = query::shard_mutation_querier(
std::move(query_ranges),
std::move(rparts->range),
std::move(rparts->slice),
std::move(*reader),
std::move(rparts->permit),
last_pos);
db.get_querier_cache().insert_shard_querier(query_uuid, std::move(querier), gts.get());
db.get_stats().multishard_query_unpopped_fragments += fragments;
db.get_stats().multishard_query_unpopped_bytes += (size_after - size_before);
return make_ready_future<>();
} catch (...) {
// We don't want to fail a read just because of a failure to
// save any of the readers.
mmq_log.debug("Failed to save reader: {}", std::current_exception());
++db.get_stats().multishard_query_failed_reader_saves;
return make_ready_future<>();
}
}).handle_exception([this, shard] (std::exception_ptr e) {
// We don't want to fail a read just because of a failure to
// save any of the readers.
mmq_log.debug("Failed to save reader on shard {}: {}", shard, e);
// This will account the failure on the local shard but we don't
// know where exactly the failure happened anyway.
++_db.local().get_stats().multishard_query_failed_reader_saves;
});
});
}
future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noexcept {
if (!_cmd.query_uuid || _cmd.is_first_page) {
return make_ready_future<>();
}
try {
return _db.invoke_on_all([this, cmd = &_cmd, ranges = &_ranges, gs = global_schema_ptr(_schema),
gts = tracing::global_trace_state_ptr(_trace_state), timeout] (replica::database& db) mutable -> future<> {
auto schema = gs.get();
auto& table = db.find_column_family(schema);
auto& semaphore = this->semaphore();
auto shard = this_shard_id();
auto querier_opt = db.get_querier_cache().lookup_shard_mutation_querier(cmd->query_uuid, *schema, *ranges, cmd->slice, semaphore, gts.get(), timeout);
if (!querier_opt) {
_readers[shard] = reader_meta(reader_state::inexistent);
co_return;
}
auto& q = *querier_opt;
auto handle = semaphore.register_inactive_read(std::move(q).reader());
_readers[shard] = reader_meta(
reader_state::successful_lookup,
reader_meta::remote_parts(q.permit(), std::move(q).reader_range(), std::move(q).reader_slice(), table.read_in_progress(),
std::move(handle)));
});
} catch (...) {
return current_exception_as_future();
}
}
future<> read_context::save_readers(mutation_reader::tracked_buffer unconsumed_buffer, std::optional<detached_compaction_state> compaction_state,
full_position last_pos) noexcept {
if (!_cmd.query_uuid) {
co_return;
}
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), dht::decorate_key(*_schema, last_pos.partition));
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
auto cs_stats = dismantle_buffer_stats{};
if (compaction_state) {
cs_stats = dismantle_compaction_state(std::move(*compaction_state));
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
} else {
tracing::trace(_trace_state, "No compaction state to dismantle, partition exhausted", cs_stats);
}
co_await parallel_for_each(std::views::iota(0u, smp::count), [this, &last_pos] (shard_id shard) {
auto& rm = _readers[shard];
if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) {
return save_reader(shard, last_pos);
}
return make_ready_future<>();
});
}
namespace {
template <typename ResultBuilder>
requires std::is_nothrow_move_constructible_v<typename ResultBuilder::result_type>
struct page_consume_result {
typename ResultBuilder::result_type result;
mutation_reader::tracked_buffer unconsumed_fragments;
lw_shared_ptr<compact_for_query_state_v2> compaction_state;
page_consume_result(typename ResultBuilder::result_type&& result, mutation_reader::tracked_buffer&& unconsumed_fragments,
lw_shared_ptr<compact_for_query_state_v2>&& compaction_state) noexcept
: result(std::move(result))
, unconsumed_fragments(std::move(unconsumed_fragments))
, compaction_state(std::move(compaction_state)) {
}
};
// A special-purpose multi-range reader for multishard reads
//
// It is different from the "stock" multi-range reader
// (make_flat_multi_range_reader()) in the following ways:
// * It guarantees that a buffer never crosses two ranges.
// * It guarantees that after calling `fill_buffer()` the underlying reader's
// buffer's *entire* content is moved into its own buffer. In other words,
// calling detach_buffer() after fill_buffer() is guranted to get all
// fragments fetched in that call, none will be left in the underlying
// reader's one.
class multi_range_reader : public mutation_reader::impl {
mutation_reader _reader;
dht::partition_range_vector::const_iterator _it;
const dht::partition_range_vector::const_iterator _end;
public:
multi_range_reader(schema_ptr s, reader_permit permit, mutation_reader rd, const dht::partition_range_vector& ranges)
: impl(std::move(s), std::move(permit)) , _reader(std::move(rd)) , _it(ranges.begin()) , _end(ranges.end()) { }
virtual future<> fill_buffer() override {
if (is_end_of_stream()) {
co_return;
}
while (is_buffer_empty()) {
if (_reader.is_buffer_empty() && _reader.is_end_of_stream()) {
if (++_it == _end) {
_end_of_stream = true;
break;
} else {
co_await _reader.fast_forward_to(*_it);
}
}
if (_reader.is_buffer_empty()) {
co_await _reader.fill_buffer();
}
_reader.move_buffer_content_to(*this);
}
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
return _reader.next_partition();
}
return make_ready_future<>();
}
virtual future<> close() noexcept override {
return _reader.close();
}
};
} // anonymous namespace
template <typename ResultBuilder>
future<page_consume_result<ResultBuilder>> read_page(
shared_ptr<read_context> ctx,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
noncopyable_function<ResultBuilder()> result_builder_factory) {
auto compaction_state = make_lw_shared<compact_for_query_state_v2>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(),
cmd.partition_limit);
auto reader = make_multishard_combining_reader_v2(ctx, s, ctx->erm(), ctx->permit(), ranges.front(), cmd.slice,
trace_state, mutation_reader::forwarding(ranges.size() > 1));
if (ranges.size() > 1) {
reader = make_mutation_reader<multi_range_reader>(s, ctx->permit(), std::move(reader), ranges);
}
// Use coroutine::as_future to prevent exception on timesout.
auto f = co_await coroutine::as_future(query::consume_page(reader, compaction_state, cmd.slice, result_builder_factory(), cmd.get_row_limit(),
cmd.partition_limit, cmd.timestamp));
if (!f.failed()) {
// no exceptions are thrown in this block
auto result = std::move(f).get();
if (compaction_state->are_limits_reached() || result.is_short_read()) {
ResultBuilder::maybe_set_last_position(result, compaction_state->current_full_position());
}
const auto& cstats = compaction_state->stats();
tracing::trace(trace_state, "Page stats: {} partition(s), {} static row(s) ({} live, {} dead), {} clustering row(s) ({} live, {} dead) and {} range tombstone(s)",
cstats.partitions,
cstats.static_rows.total(),
cstats.static_rows.live,
cstats.static_rows.dead,
cstats.clustering_rows.total(),
cstats.clustering_rows.live,
cstats.clustering_rows.dead,
cstats.range_tombstones);
auto buffer = reader.detach_buffer();
co_await reader.close();
// page_consume_result cannot fail so there's no risk of double-closing reader.
co_return page_consume_result<ResultBuilder>(std::move(result), std::move(buffer), std::move(compaction_state));
}
co_await reader.close();
co_return coroutine::exception(f.get_exception());
}
template <typename ResultBuilder>
future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> do_query_vnodes(
distributed<replica::database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
noncopyable_function<ResultBuilder()> result_builder_factory) {
auto& table = db.local().find_column_family(s);
auto erm = table.get_effective_replication_map();
auto ctx = seastar::make_shared<read_context>(db, s, erm, cmd, ranges, trace_state, timeout);
// Use coroutine::as_future to prevent exception on timesout.
auto f = co_await coroutine::as_future(ctx->lookup_readers(timeout).then([&, result_builder_factory = std::move(result_builder_factory)] () mutable {
return read_page<ResultBuilder>(ctx, s, cmd, ranges, trace_state, std::move(result_builder_factory));
}).then([&] (page_consume_result<ResultBuilder> r) -> future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> {
if (r.compaction_state->are_limits_reached() || r.result.is_short_read()) {
// Must call before calling `detach_state()`.
auto last_pos = *r.compaction_state->current_full_position();
co_await ctx->save_readers(std::move(r.unconsumed_fragments), std::move(*r.compaction_state).detach_state(), std::move(last_pos));
}
co_return make_foreign(make_lw_shared<typename ResultBuilder::result_type>(std::move(r.result)));
}));
co_await ctx->stop();
if (f.failed()) {
co_return coroutine::exception(f.get_exception());
}
co_return f.get();
}
template <typename ResultBuilder>
future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> do_query_tablets(
distributed<replica::database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
noncopyable_function<ResultBuilder()> result_builder_factory) {
auto& table = db.local().find_column_family(s);
auto erm = table.get_effective_replication_map();
const auto& token_metadata = erm->get_token_metadata();
const auto& tablets = token_metadata.tablets().get_tablet_map(s->id());
const auto this_node_id = token_metadata.get_topology().this_node()->host_id();
auto query_cmd = cmd;
auto result_builder = result_builder_factory();
std::vector<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> results;
locator::tablet_range_splitter range_splitter{s, tablets, this_node_id, ranges};
while (auto range_opt = range_splitter()) {
auto& r = *results.emplace_back(co_await db.invoke_on(range_opt->shard,
[&result_builder, gs = global_schema_ptr(s), &query_cmd, &range_opt, gts = tracing::global_trace_state_ptr(trace_state), timeout] (replica::database& db) {
return result_builder.query(db, gs, query_cmd, range_opt->range, gts, timeout);
}));
// Subtract result from limit, watch for underflow
query_cmd.partition_limit -= std::min(query_cmd.partition_limit, ResultBuilder::get_partition_count(r));
query_cmd.set_row_limit(query_cmd.get_row_limit() - std::min(query_cmd.get_row_limit(), ResultBuilder::get_row_count(r)));
if (!query_cmd.partition_limit || !query_cmd.get_row_limit() || r.is_short_read()) {
break;
}
}
co_return result_builder.merge(std::move(results));
}
template <typename ResultBuilder>
static future<std::tuple<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>, cache_temperature>> do_query_on_all_shards(
distributed<replica::database>& db,
schema_ptr s,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
std::function<ResultBuilder(query::result_memory_accounter&&)> result_builder_factory) {
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
co_return std::tuple(
make_foreign(make_lw_shared<typename ResultBuilder::result_type>()),
db.local().find_column_family(s).get_global_cache_hit_rate());
}
auto& local_db = db.local();
auto& stats = local_db.get_stats();
const auto short_read_allowed = query::short_read(cmd.slice.options.contains<query::partition_slice::option::allow_short_read>());
const auto& keyspace = local_db.find_keyspace(s->ks_name());
auto query_method = keyspace.get_replication_strategy().uses_tablets() ? do_query_tablets<ResultBuilder> : do_query_vnodes<ResultBuilder>;
try {
auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed);
auto result = co_await query_method(db, s, cmd, ranges, std::move(trace_state), timeout,
[result_builder_factory, accounter = std::move(accounter)] () mutable {
return result_builder_factory(std::move(accounter));
});
++stats.total_reads;
stats.short_mutation_queries += bool(result->is_short_read());
auto hit_rate = local_db.find_column_family(s).get_global_cache_hit_rate();
co_return std::tuple(std::move(result), hit_rate);
} catch (...) {
++stats.total_reads_failed;
throw;
}
}
namespace {
class mutation_query_result_builder {
public:
using result_type = reconcilable_result;
private:
reconcilable_result_builder _builder;
schema_ptr _s;
public:
mutation_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_memory_accounter&& accounter)
: _builder(s, slice, std::move(accounter))
, _s(s.shared_from_this())
{ }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
void consume(tombstone t) { _builder.consume(t); }
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); }
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
result_type consume_end_of_stream() { return _builder.consume_end_of_stream(); }
future<foreign_ptr<lw_shared_ptr<result_type>>> query(
replica::database& db,
schema_ptr schema,
const query::read_command& cmd,
const dht::partition_range& range,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
auto res = co_await db.query_mutations(std::move(schema), cmd, range, std::move(trace_state), timeout);
co_return make_foreign(make_lw_shared<result_type>(std::get<0>(std::move(res))));
}
foreign_ptr<lw_shared_ptr<result_type>> merge(std::vector<foreign_ptr<lw_shared_ptr<result_type>>> results) {
if (results.empty()) {
return make_foreign(make_lw_shared<result_type>());
}
auto& first = results.front();
for (auto it = results.begin() + 1; it != results.end(); ++it) {
first->merge_disjoint(_s, **it);
}
return std::move(first);
}
static void maybe_set_last_position(result_type& r, std::optional<full_position> full_position) { }
static uint32_t get_partition_count(result_type& r) { return r.partitions().size(); }
static uint64_t get_row_count(result_type& r) { return r.row_count(); }
};
class data_query_result_builder {
public:
using result_type = query::result;
private:
std::unique_ptr<query::result::builder> _res_builder;
query_result_builder _builder;
query::result_options _opts;
public:
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts,
query::result_memory_accounter&& accounter, uint64_t tombstone_limit)
: _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter), tombstone_limit))
, _builder(s, *_res_builder)
, _opts(opts)
{ }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
void consume(tombstone t) { _builder.consume(t); }
stop_iteration consume(static_row&& sr, tombstone t, bool is_alive) { return _builder.consume(std::move(sr), t, is_alive); }
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { return _builder.consume(std::move(cr), t, is_alive); }
stop_iteration consume(range_tombstone_change&& rtc) { return _builder.consume(std::move(rtc)); }
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
result_type consume_end_of_stream() {
_builder.consume_end_of_stream();
return _res_builder->build();
}
future<foreign_ptr<lw_shared_ptr<result_type>>> query(
replica::database& db,
schema_ptr schema,
const query::read_command& cmd,
const dht::partition_range& range,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
dht::partition_range_vector ranges;
ranges.emplace_back(range);
auto res = co_await db.query(std::move(schema), cmd, _opts, ranges, std::move(trace_state), timeout);
co_return std::get<0>(std::move(res));
}
foreign_ptr<lw_shared_ptr<result_type>> merge(std::vector<foreign_ptr<lw_shared_ptr<result_type>>> results) {
if (results.empty()) {
return make_foreign(make_lw_shared<result_type>());
}
query::result_merger merger(query::max_rows, query::max_partitions);
merger.reserve(results.size());
for (auto&& r: results) {
merger(std::move(r));
}
return merger.get();
}
static void maybe_set_last_position(result_type& r, std::optional<full_position> full_position) {
r.set_last_position(std::move(full_position));
}
static uint32_t get_partition_count(result_type& r) {
r.ensure_counts();
return *r.partition_count();
}
static uint64_t get_row_count(result_type& r) {
r.ensure_counts();
return *r.row_count();
}
};
} // anonymous namespace
future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>> query_mutations_on_all_shards(
distributed<replica::database>& db,
schema_ptr query_schema,
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout) {
return do_query_on_all_shards<mutation_query_result_builder>(db, query_schema, cmd, ranges, std::move(trace_state), timeout,
[query_schema, &cmd] (query::result_memory_accounter&& accounter) {
return mutation_query_result_builder(*query_schema, cmd.slice, std::move(accounter));
});
}
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> query_data_on_all_shards(