forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
partition_snapshot_row_cursor.hh
822 lines (766 loc) · 35 KB
/
partition_snapshot_row_cursor.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "mutation/partition_version.hh"
#include "row_cache.hh"
#include "utils/assert.hh"
#include "utils/small_vector.hh"
#include <fmt/core.h>
#include <ranges>
class partition_snapshot_row_cursor;
// A non-owning reference to a row inside partition_snapshot which
// maintains it's position and thus can be kept across reference invalidation points.
class partition_snapshot_row_weakref final {
mutation_partition::rows_type::iterator _it;
partition_snapshot::change_mark _change_mark;
position_in_partition _pos = position_in_partition::min();
bool _in_latest = false;
public:
partition_snapshot_row_weakref() = default;
// Makes this object point to a row pointed to by given partition_snapshot_row_cursor.
explicit partition_snapshot_row_weakref(const partition_snapshot_row_cursor&);
explicit partition_snapshot_row_weakref(std::nullptr_t) {}
partition_snapshot_row_weakref(partition_snapshot& snp, mutation_partition::rows_type::iterator it, bool in_latest)
: _it(it)
, _change_mark(snp.get_change_mark())
, _pos(it->position())
, _in_latest(in_latest)
{ }
partition_snapshot_row_weakref& operator=(const partition_snapshot_row_cursor&);
partition_snapshot_row_weakref& operator=(std::nullptr_t) noexcept {
_change_mark = {};
return *this;
}
// Returns true iff the pointer is pointing at a row.
explicit operator bool() const { return _change_mark != partition_snapshot::change_mark(); }
public:
// Sets the iterator in latest version for the current position.
void set_latest(mutation_partition::rows_type::iterator it) {
_it = std::move(it);
_in_latest = true;
}
public:
// Returns the position of the row.
// Call only when pointing at a row.
const position_in_partition& position() const { return _pos; }
// Returns true iff the object is valid.
bool valid(partition_snapshot& snp) { return snp.get_change_mark() == _change_mark; }
// Call only when valid.
bool is_in_latest_version() const { return _in_latest; }
// Brings the object back to validity and returns true iff the snapshot contains the row.
// When not pointing at a row, returns false.
bool refresh(partition_snapshot& snp) {
auto snp_cm = snp.get_change_mark();
if (snp_cm == _change_mark) {
return true;
}
if (!_change_mark) {
return false;
}
_change_mark = snp_cm;
rows_entry::tri_compare cmp(*snp.schema());
_in_latest = true;
for (auto&& v : snp.versions()) {
auto rows = v.partition().clustered_rows();
_it = rows.find(_pos, cmp);
if (_it != rows.end()) {
return true;
}
_in_latest = false;
}
return false;
}
rows_entry* operator->() const {
return &*_it;
}
rows_entry& operator*() const {
return *_it;
}
};
// Allows iterating over rows of mutation_partition represented by given partition_snapshot.
//
// The cursor initially has a position before all rows and is not pointing at any row.
// To position the cursor, use advance_to().
//
// All methods should be called with the region of the snapshot locked. The cursor is invalidated
// when that lock section is left, or if the snapshot is modified.
//
// When the cursor is invalidated, it still maintains its previous position. It can be brought
// back to validity by calling maybe_refresh(), or advance_to().
//
// Insertion of row entries after cursor's position invalidates the cursor.
// Exceptions thrown from mutators invalidate the cursor.
//
// Range tombstone information is accessible via range_tombstone() and range_tombstone_for_row()
// functions. range_tombstone() returns the tombstone for the interval which strictly precedes
// the current row, and range_tombstone_for_row() returns the information for the row itself.
// If the interval which precedes the row is not continuous, then range_tombstone() is empty.
// If range_tombstone() is not empty then the interval is continuous.
class partition_snapshot_row_cursor final {
friend class partition_snapshot_row_weakref;
struct position_in_version {
mutation_partition::rows_type::iterator it;
utils::immutable_collection<mutation_partition::rows_type> rows;
int version_no;
const schema* schema;
bool unique_owner = false;
is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain).
// Range tombstone in the direction of lower keys (in cursor schema domain).
// Excludes the row. In the reverse mode, the row may have a different range tombstone.
tombstone rt;
};
const schema& _schema; // query domain
partition_snapshot& _snp;
// _heap contains iterators which are ahead of the cursor.
// _current_row contains iterators which are directly below the cursor.
utils::small_vector<position_in_version, 2> _heap; // query domain order
utils::small_vector<position_in_version, 2> _current_row;
// For !_reversed cursors points to the entry which
// is the lower_bound() of the current position in table schema order.
// For _reversed cursors it can be either lower_bound() in table order
// or lower_bound() in cursor's order, so should not be relied upon.
// if current entry is in the latest version then _latest_it points to it,
// also in _reversed mode.
std::optional<mutation_partition::rows_type::iterator> _latest_it;
// Continuity and range tombstone corresponding to ranges which are not represented in _heap because the cursor
// went pass all the entries in those versions.
bool _background_continuity = false;
tombstone _background_rt;
bool _continuous{};
bool _dummy{};
const bool _unique_owner;
const bool _reversed;
const bool _digest_requested;
tombstone _range_tombstone;
tombstone _range_tombstone_for_row;
position_in_partition _position; // table domain
partition_snapshot::change_mark _change_mark;
position_in_partition_view to_table_domain(position_in_partition_view pos) const {
if (_reversed) [[unlikely]] {
return pos.reversed();
}
return pos;
}
position_in_partition_view to_query_domain(position_in_partition_view pos) const {
if (_reversed) [[unlikely]] {
return pos.reversed();
}
return pos;
}
struct version_heap_less_compare {
rows_entry::tri_compare _cmp;
partition_snapshot_row_cursor& _cur;
public:
explicit version_heap_less_compare(partition_snapshot_row_cursor& cur)
: _cmp(cur._schema)
, _cur(cur)
{ }
bool operator()(const position_in_version& a, const position_in_version& b) {
auto res = _cmp(_cur.to_query_domain(a.it->position()), _cur.to_query_domain(b.it->position()));
return res > 0 || (res == 0 && a.version_no > b.version_no);
}
};
// Removes the next row from _heap and puts it into _current_row
bool recreate_current_row() {
_current_row.clear();
_continuous = _background_continuity;
_range_tombstone = _background_rt;
_range_tombstone_for_row = _background_rt;
_dummy = true;
if (_heap.empty()) {
if (_reversed) {
_position = position_in_partition::before_all_clustered_rows();
} else {
_position = position_in_partition::after_all_clustered_rows();
}
return false;
}
version_heap_less_compare heap_less(*this);
position_in_partition::equal_compare eq(*_snp.schema());
do {
std::ranges::pop_heap(_heap, heap_less);
memory::on_alloc_point();
position_in_version& v = _heap.back();
rows_entry& e = *v.it;
if (_digest_requested) {
e.row().cells().prepare_hash(*v.schema, column_kind::regular_column);
}
_dummy &= bool(e.dummy());
_continuous |= bool(v.continuous);
_range_tombstone_for_row.apply(e.range_tombstone());
if (v.continuous) {
_range_tombstone.apply(v.rt);
}
_current_row.push_back(v);
_heap.pop_back();
} while (!_heap.empty() && eq(_current_row[0].it->position(), _heap[0].it->position()));
// FIXME: Optimize by dropping dummy() entries.
for (position_in_version& v : _heap) {
_continuous |= bool(v.continuous);
if (v.continuous) {
_range_tombstone.apply(v.rt);
_range_tombstone_for_row.apply(v.rt);
}
}
_position = position_in_partition(_current_row[0].it->position());
return true;
}
// lower_bound is in the query schema domain
void prepare_heap(position_in_partition_view lower_bound) {
lower_bound = to_table_domain(lower_bound);
memory::on_alloc_point();
rows_entry::tri_compare cmp(*_snp.schema());
version_heap_less_compare heap_less(*this);
_heap.clear();
_latest_it.reset();
_background_continuity = false;
_background_rt = {};
int version_no = 0;
bool unique_owner = _unique_owner;
bool first = true;
for (auto&& v : _snp.versions()) {
unique_owner = unique_owner && (first || !v.is_referenced());
auto rows = v.partition().clustered_rows();
auto pos = rows.lower_bound(lower_bound, cmp);
if (first) {
_latest_it = pos;
}
if (pos) {
is_continuous cont;
tombstone rt;
if (_reversed) [[unlikely]] {
if (cmp(pos->position(), lower_bound) != 0) {
cont = pos->continuous();
rt = pos->range_tombstone();
if (pos != rows.begin()) {
--pos;
} else {
_background_continuity |= bool(cont);
if (cont) {
_background_rt = rt;
}
pos = {};
}
} else {
auto next_entry = std::next(pos);
if (next_entry == rows.end()) {
// Positions past last dummy are complete since mutation sources
// can't contain any keys which are larger.
cont = is_continuous::yes;
rt = {};
} else {
cont = next_entry->continuous();
rt = next_entry->range_tombstone();
}
}
} else {
cont = pos->continuous();
rt = pos->range_tombstone();
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, cont, rt});
}
} else {
if (_reversed) [[unlikely]] {
if (!rows.empty()) {
pos = std::prev(rows.end());
} else {
_background_continuity = true;
}
} else {
_background_continuity = true; // Default continuity past the last entry
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, is_continuous::yes});
}
}
++version_no;
first = false;
}
std::ranges::make_heap(_heap, heap_less);
_change_mark = _snp.get_change_mark();
}
// Advances the cursor to the next row.
// The @keep denotes whether the entries should be kept in partition version.
// If there is no next row, returns false and the cursor is no longer pointing at a row.
// Can be only called on a valid cursor pointing at a row.
// When throws, the cursor is invalidated and its position is not changed.
bool advance(bool keep) {
memory::on_alloc_point();
version_heap_less_compare heap_less(*this);
SCYLLA_ASSERT(iterators_valid());
for (auto&& curr : _current_row) {
if (!keep && curr.unique_owner) {
mutation_partition::rows_type::key_grabber kg(curr.it);
kg.release(current_deleter<rows_entry>());
if (_reversed && curr.it) [[unlikely]] {
if (curr.rows.begin() == curr.it) {
_background_continuity |= bool(curr.it->continuous());
if (curr.it->continuous()) {
_background_rt.apply(curr.it->range_tombstone());
}
curr.it = {};
} else {
curr.continuous = curr.it->continuous();
curr.rt = curr.it->range_tombstone();
--curr.it;
}
}
} else {
if (_reversed) [[unlikely]] {
if (curr.rows.begin() == curr.it) {
_background_continuity |= bool(curr.it->continuous());
if (curr.it->continuous()) {
_background_rt.apply(curr.it->range_tombstone());
}
curr.it = {};
} else {
curr.continuous = curr.it->continuous();
curr.rt = curr.it->range_tombstone();
--curr.it;
}
} else {
++curr.it;
if (curr.it) {
curr.continuous = curr.it->continuous();
curr.rt = curr.it->range_tombstone();
}
}
}
if (curr.it) {
if (curr.version_no == 0) {
_latest_it = curr.it;
}
_heap.push_back(curr);
std::ranges::push_heap(_heap, heap_less);
}
}
return recreate_current_row();
}
bool is_in_latest_version() const noexcept { return at_a_row() && _current_row[0].version_no == 0; }
public:
// When reversed is true then the cursor will operate in reversed direction.
// When reversed, s must be a reversed schema relative to snp->schema()
// Positions and fragments accepted and returned by the cursor are from the domain of s.
// Iterators are from the table's schema domain.
partition_snapshot_row_cursor(const schema& s, partition_snapshot& snp, bool unique_owner = false, bool reversed = false, bool digest_requested = false)
: _schema(s)
, _snp(snp)
, _unique_owner(unique_owner)
, _reversed(reversed)
, _digest_requested(digest_requested)
, _position(position_in_partition::static_row_tag_t{})
{ }
// If is_in_latest_version() then this returns an iterator to the entry under cursor in the latest version.
mutation_partition::rows_type::iterator get_iterator_in_latest_version() const {
SCYLLA_ASSERT(_latest_it);
return *_latest_it;
}
// Returns true iff the iterators obtained since the cursor was last made valid
// are still valid. Note that this doesn't mean that the cursor itself is valid.
bool iterators_valid() const {
return _snp.get_change_mark() == _change_mark;
}
// Marks the iterators as valid without refreshing them.
// Call only when the iterators are known to be valid.
void force_valid() {
_change_mark = _snp.get_change_mark();
}
// Advances cursor to the first entry with position >= pos, if such entry exists.
// If no such entry exists, the cursor is positioned at an extreme position in the direction of
// the cursor (min for reversed cursor, max for forward cursor) and not pointing at a row
// but still valid.
//
// continuous() is always valid after the call, even if not pointing at a row.
// Returns true iff the cursor is pointing at a row after the call.
bool maybe_advance_to(position_in_partition_view pos) {
prepare_heap(pos);
return recreate_current_row();
}
// Brings back the cursor to validity.
// Can be only called when cursor is pointing at a row.
//
// Semantically equivalent to:
//
// advance_to(position());
//
// but avoids work if not necessary.
//
// Changes to attributes of the current row (e.g. continuity) don't have to be reflected.
bool maybe_refresh() {
if (!iterators_valid()) {
auto pos = position_in_partition(position()); // advance_to() modifies position() so copy
return advance_to(pos);
}
// Refresh latest version's iterator in case there was an insertion
// before it and after cursor's position. There cannot be any
// insertions for non-latest versions, so we don't have to update them.
if (!is_in_latest_version()) {
rows_entry::tri_compare cmp(*_snp.schema());
version_heap_less_compare heap_less(*this);
auto rows = _snp.version()->partition().clustered_rows();
bool match;
auto it = rows.lower_bound(_position, match, cmp);
_latest_it = it;
auto heap_i = std::ranges::find_if(_heap, [](auto&& v) { return v.version_no == 0; });
is_continuous cont;
tombstone rt;
if (it) {
cont = it->continuous();
rt = it->range_tombstone();
if (_reversed) [[unlikely]] {
if (!match) {
// lower_bound() in reverse order points to predecessor of it unless the keys are equal.
if (it == rows.begin()) {
if (it->continuous()) {
_background_continuity = true;
_background_rt.apply(it->range_tombstone());
}
it = {};
} else {
--it;
}
} else {
// We can put anything in the match case since this continuity will not be used
// when advancing the cursor. Same applies to rt.
cont = is_continuous::no;
rt = {};
}
}
} else {
_background_continuity = true; // Default continuity
}
if (!it) {
if (heap_i != _heap.end()) {
_heap.erase(heap_i);
std::ranges::make_heap(_heap, heap_less);
}
} else if (match) {
_current_row.insert(_current_row.begin(), position_in_version{
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
if (heap_i != _heap.end()) {
_heap.erase(heap_i);
std::ranges::make_heap(_heap, heap_less);
}
} else {
if (heap_i != _heap.end()) {
heap_i->it = it;
heap_i->continuous = cont;
heap_i->rt = rt;
std::ranges::make_heap(_heap, heap_less);
} else {
_heap.push_back(position_in_version{
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
std::ranges::push_heap(_heap, heap_less);
}
}
}
return true;
}
// Brings back the cursor to validity, pointing at the first row with position not smaller
// than the current position. Returns false iff no such row exists.
// Assumes that rows are not inserted into the snapshot (static). They can be removed.
bool maybe_refresh_static() {
if (!iterators_valid()) {
return maybe_advance_to(position());
}
return true;
}
// Moves the cursor to the first entry with position >= pos.
// If no such entry exists, the cursor is still moved, although
// it won't be pointing at a row. Still, continuous() will be valid.
//
// Returns true iff there can't be any clustering row entries
// between lower_bound (inclusive) and the position to which the cursor
// was advanced.
//
// May be called when cursor is not valid.
// The cursor is valid after the call.
// Must be called under reclaim lock.
// When throws, the cursor is invalidated and its position is not changed.
bool advance_to(position_in_partition_view lower_bound) {
maybe_advance_to(lower_bound);
return no_clustering_row_between_weak(_schema, lower_bound, position());
}
// Call only when valid.
// Returns true iff the cursor is pointing at a row.
bool at_a_row() const { return !_current_row.empty(); }
// Advances to the next row, if any.
// If there is no next row, advances to the extreme position in the direction of the cursor
// (position_in_partition::before_all_clustering_rows() or position_in_partition::after_all_clustering_rows)
// and does not point at a row.
// Information about the range, continuous() and range_tombstone(), is still valid in this case.
// Call only when valid, not necessarily pointing at a row.
bool next() { return advance(true); }
bool erase_and_advance() { return advance(false); }
// Can be called when cursor is pointing at a row.
// Returns true iff the key range adjacent to the cursor's position from the side of smaller keys
// is marked as continuous.
bool continuous() const { return _continuous; }
// Can be called when cursor is valid, not necessarily pointing at a row.
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
tombstone range_tombstone() const { return _range_tombstone; }
// Can be called when cursor is pointing at a row.
// Returns the range tombstone covering the row under the cursor.
tombstone range_tombstone_for_row() const { return _range_tombstone_for_row; }
// Can be called when cursor is pointing at a row.
bool dummy() const { return _dummy; }
// Can be called only when cursor is valid and pointing at a row, and !dummy().
const clustering_key& key() const { return _position.key(); }
// Can be called only when cursor is valid and pointing at a row.
clustering_row row() const {
// Note: if the precondition ("cursor is valid and pointing at a row") is fulfilled
// then _current_row is not empty, so the below is valid.
clustering_row cr(key(), deletable_row(_schema, *_current_row[0].schema, _current_row[0].it->row()));
for (size_t i = 1; i < _current_row.size(); ++i) {
cr.apply(_schema, *_current_row[i].schema, _current_row[i].it->row());
}
return cr;
}
const schema& latest_row_schema() const noexcept {
return *_current_row[0].schema;
}
// Can be called only when cursor is valid and pointing at a row.
deletable_row& latest_row() const noexcept {
return _current_row[0].it->row();
}
// Can be called only when cursor is valid and pointing at a row.
void latest_row_prepare_hash() const {
_current_row[0].it->row().cells().prepare_hash(*_current_row[0].schema, column_kind::regular_column);
}
// Can be called only when cursor is valid and pointing at a row.
// Monotonic exception guarantees.
template <typename Consumer>
requires std::is_invocable_v<Consumer, deletable_row&&>
void consume_row(Consumer&& consumer) {
for (position_in_version& v : _current_row) {
if (v.unique_owner && (_schema.version() == v.schema->version())) [[likely]] {
consumer(std::move(v.it->row()));
} else {
consumer(deletable_row(_schema, *v.schema, v.it->row()));
}
}
}
// Returns memory footprint of row entries under the cursor.
// Can be called only when cursor is valid and pointing at a row.
size_t memory_usage() const {
size_t result = 0;
for (const position_in_version& v : _current_row) {
result += v.it->memory_usage(*v.schema);
}
return result;
}
struct ensure_result {
rows_entry& row;
mutation_partition_v2::rows_type::iterator it;
bool inserted = false;
};
// Makes sure that a rows_entry for the row under the cursor exists in the latest version.
// Doesn't change logical value or continuity of the snapshot.
// Can be called only when cursor is valid and pointing at a row.
// The cursor remains valid after the call and points at the same row as before.
// Use only with evictable snapshots.
ensure_result ensure_entry_in_latest() {
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
if (is_in_latest_version()) {
auto latest_i = get_iterator_in_latest_version();
rows_entry& latest = *latest_i;
if (_snp.at_latest_version()) {
_snp.tracker()->touch(latest);
}
return {latest, latest_i, false};
} else {
// Copy row from older version because rows in evictable versions must
// hold values which are independently complete to be consistent on eviction.
auto e = [&] {
if (!at_a_row()) {
return alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_snp.schema(), _position,
is_dummy(!_position.is_clustering_row()), is_continuous::no));
} else {
return alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].schema, *_current_row[0].it));
}
}();
rows_entry& re = *e;
if (_reversed) { // latest_i is not reliably a successor
re.set_continuous(false);
e->set_range_tombstone(range_tombstone_for_row());
rows_entry::tri_compare cmp(*_snp.schema());
auto res = rows.insert(std::move(e), cmp);
if (auto l = std::next(res.first); l != rows.end() && l->continuous()) {
re.set_continuous(true);
re.set_range_tombstone(l->range_tombstone());
}
if (res.second) {
_snp.tracker()->insert(re);
}
return {*res.first, res.first, res.second};
} else {
auto latest_i = get_iterator_in_latest_version();
if (latest_i && latest_i->continuous()) {
e->set_continuous(true);
// See the "information monotonicity" rule.
e->set_range_tombstone(latest_i->range_tombstone());
} else {
e->set_continuous(false);
e->set_range_tombstone(range_tombstone_for_row());
}
auto i = rows.insert_before(latest_i, std::move(e));
_snp.tracker()->insert(re);
return {re, i, true};
}
}
}
// Returns a pointer to rows_entry with given position in latest version or
// creates a neutral one, provided that it belongs to a continuous range.
// Otherwise returns nullptr.
// Doesn't change logical value of mutation_partition or continuity of the snapshot.
// The cursor doesn't have to be valid.
// The cursor is invalid after the call.
// When returns an engaged optional, the attributes of the cursor: continuous() and range_tombstone()
// are valid, as if the cursor was advanced to the requested position.
// Assumes the snapshot is evictable and not populated by means other than ensure_entry_if_complete().
// Subsequent calls to ensure_entry_if_complete() or advance_to() must be given weakly monotonically increasing
// positions unless iterators are invalidated across the calls.
// The cursor must not be a reversed-order cursor.
// Use only with evictable snapshots.
std::optional<ensure_result> ensure_entry_if_complete(position_in_partition_view pos) {
if (_reversed) { // latest_i is unreliable
throw_with_backtrace<std::logic_error>("ensure_entry_if_complete() called on reverse cursor");
}
position_in_partition::less_compare less(_schema);
if (!iterators_valid() || less(position(), pos)) {
auto has_entry = maybe_advance_to(pos);
SCYLLA_ASSERT(has_entry); // evictable snapshots must have a dummy after all rows.
}
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
auto latest_i = get_iterator_in_latest_version();
position_in_partition::equal_compare eq(_schema);
if (eq(position(), pos)) {
// Check if entry was already inserted by previous call to ensure_entry_if_complete()
if (latest_i != rows.begin()) {
auto prev_i = std::prev(latest_i);
if (eq(prev_i->position(), pos)) {
return ensure_result{*prev_i, prev_i, false};
}
}
return ensure_entry_in_latest();
} else if (!continuous()) {
return std::nullopt;
}
// Check if entry was already inserted by previous call to ensure_entry_if_complete()
if (latest_i != rows.begin()) {
auto prev_i = std::prev(latest_i);
if (eq(prev_i->position(), pos)) {
return ensure_result{*prev_i, prev_i, false};
}
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_snp.version()->get_schema(), pos,
is_dummy(!pos.is_clustering_row()),
is_continuous::no));
if (latest_i && latest_i->continuous()) {
e->set_continuous(true);
e->set_range_tombstone(latest_i->range_tombstone()); // See the "information monotonicity" rule.
} else {
// Even if the range in the latest version is not continuous, the row itself is assumed to be complete,
// so it must inherit the current range tombstone.
e->set_range_tombstone(range_tombstone());
}
auto e_i = rows.insert_before(latest_i, std::move(e));
_snp.tracker()->insert(*e_i);
return ensure_result{*e_i, e_i, true};
}
// Brings the entry pointed to by the cursor to the front of the LRU
// Cursor must be valid and pointing at a row.
// Use only with evictable snapshots.
void touch() {
// We cannot bring entries from non-latest versions to the front because that
// could result violate ordering invariant for the LRU, which states that older versions
// must be evicted first. Needed to keep the snapshot consistent.
if (_snp.at_latest_version() && is_in_latest_version()) {
_snp.tracker()->touch(*get_iterator_in_latest_version());
}
}
// Position of the cursor in the cursor schema domain.
// Can be called when cursor is pointing at a row, even when invalid, or when valid.
position_in_partition_view position() const {
return to_query_domain(_position);
}
// Position of the cursor in the table schema domain.
// Can be called when cursor is pointing at a row, even when invalid, or when valid.
position_in_partition_view table_position() const {
return _position;
}
friend fmt::formatter<partition_snapshot_row_cursor>;
};
template <> struct fmt::formatter<partition_snapshot_row_cursor> : fmt::formatter<string_view> {
auto format(const partition_snapshot_row_cursor& cur, fmt::format_context& ctx) const {
auto out = ctx.out();
out = fmt::format_to(out, "{{cursor: position={}, cont={}, rt={}}}",
cur._position, cur.continuous(), cur.range_tombstone());
if (cur.range_tombstone() != cur.range_tombstone_for_row()) {
out = fmt::format_to(out, ", row_rt={}", cur.range_tombstone_for_row());
}
out = fmt::format_to(out, ", ");
if (cur._reversed) {
out = fmt::format_to(out, "reversed, ");
}
if (!cur.iterators_valid()) {
return fmt::format_to(out, " iterators invalid}}");
}
out = fmt::format_to(out, "snp={}, current=[", fmt::ptr(&cur._snp));
bool first = true;
for (auto&& v : cur._current_row) {
if (!first) {
out = fmt::format_to(out, ", ");
}
first = false;
out = fmt::format_to(out, "{{v={}, pos={}, cont={}, rt={}, row_rt={}}}",
v.version_no, v.it->position(), v.continuous, v.rt, v.it->range_tombstone());
}
out = fmt::format_to(out, "], heap[\n ");
first = true;
for (auto&& v : cur._heap) {
if (!first) {
out = fmt::format_to(out, ",\n ");
}
first = false;
out = fmt::format_to(out, "{{v={}, pos={}, cont={}, rt={}, row_rt={}}}",
v.version_no, v.it->position(), v.continuous, v.rt, v.it->range_tombstone());
}
out = fmt::format_to(out, "], latest_iterator=[");
if (cur._latest_it) {
mutation_partition::rows_type::iterator i = *cur._latest_it;
if (!i) {
out = fmt::format_to(out, "end");
} else {
out = fmt::format_to(out, "{}", i->position());
}
} else {
out = fmt::format_to(out, "<none>");
}
return fmt::format_to(out, "]}}");
}
};
inline
partition_snapshot_row_weakref::partition_snapshot_row_weakref(const partition_snapshot_row_cursor& c)
: _it(c._current_row[0].it)
, _change_mark(c._change_mark)
, _pos(c._position)
, _in_latest(c.is_in_latest_version())
{ }
inline
partition_snapshot_row_weakref& partition_snapshot_row_weakref::operator=(const partition_snapshot_row_cursor& c) {
auto tmp = partition_snapshot_row_weakref(c);
this->~partition_snapshot_row_weakref();
new (this) partition_snapshot_row_weakref(std::move(tmp));
return *this;
}