Skip to content

Commit

Permalink
test: boost: mulitshard_combining_reader_as_mutation_source_test: all…
Browse files Browse the repository at this point in the history
…ow redundant range tombstone changes

test_range_tombstones_v2 is too strict for this reader -- it expects a
particular sequence of range tombstone changes, but multishard_combining_reader,
when tested with a small buffer, generates -- as expected -- additional
(redundant) range tombstone change pairs (end+start).

We would rather not modify test_range_tombstones_v2, because catching
redundancy is good in general, so to fix this problem, we wrap the
tested reader with a filter that removes these redundant range tombstone
changes, when they are expected.
  • Loading branch information
michoecho authored and kbr-scylla committed Jun 27, 2023
1 parent be5b61b commit 112e02e
Showing 1 changed file with 109 additions and 1 deletion.
110 changes: 109 additions & 1 deletion test/boost/multishard_combining_reader_as_mutation_source_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,100 @@
// It has to be a container that does not invalidate pointers
static std::list<dummy_sharder> keep_alive_sharder;

// Filters out range tombstone changes which are redundant due to being
// followed by another range tombstone change with equal position.
//
// I.e. if the source reader emits multiple consecutive range range tombstone
// changes with equal positions, only the last will be ultimately emitted.
class trivial_rtc_removing_reader : public flat_mutation_reader_v2::impl {
// Source reader.
flat_mutation_reader_v2 _rd;
// Stores the latest mutation fragment, if it was a range tombstone change.
mutation_fragment_v2_opt _just_seen_rtc;
public:
trivial_rtc_removing_reader(flat_mutation_reader_v2 rd)
: impl(rd.schema(), rd.permit())
, _rd(std::move(rd))
{}
virtual future<> fill_buffer() override {
// The logic of this reader, in pseudocode:
// for each mf in _rd:
// if mf is a range tombstone change with equal position to _just_seen_rtc:
// discard _just_seen_rtc
// put mf into _just_seen_rtc
// else if mf is a range tombstone change with different position to _just_seen_rtc (or _just_seen_rtc is empty):
// emit _just_seen_rtc if present
// put mf into _just_seen_rtc
// else:
// emit _just_seen_rtc if present
// emit mf
while (!is_buffer_full()) {
if (!_rd.is_buffer_empty()) {
// for each mf in _rd:
auto mf = _rd.pop_mutation_fragment();
if (mf.is_range_tombstone_change()) {
// if mf is a range_tombstone_change...
if (_just_seen_rtc
&& position_in_partition::equal_compare(*_schema)(
mf.as_range_tombstone_change().position(),
_just_seen_rtc->as_range_tombstone_change().position())
) {
// ...with equal position to _just_seen_rtc:
// discard _just_seen_rtc
// (It will be overwritten by mf later).
} else {
// ...with different position to _just_seen_rtc (or _just_seen_rtc is empty):
// emit _just_seen_rtc if present
if (_just_seen_rtc) {
push_mutation_fragment(std::move(*std::exchange(_just_seen_rtc, {})));
}
}
// put mf into _just_seen_rtc
_just_seen_rtc = std::move(mf);
} else {
// else:
// emit _just_seen_rtc if present
if (_just_seen_rtc) {
push_mutation_fragment(std::move(*std::exchange(_just_seen_rtc, {})));
}
// emit mf
push_mutation_fragment(std::move(mf));
}
} else if (!_rd.is_end_of_stream()) {
co_await _rd.fill_buffer();
} else if (_just_seen_rtc) {
// If _just_seen_rtc was the last element in the stream, emit it.
push_mutation_fragment(std::move(*std::exchange(_just_seen_rtc, {})));
} else {
_end_of_stream = true;
break;
}
co_await coroutine::maybe_yield();
}
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
_just_seen_rtc = {};
_end_of_stream = false;
return _rd.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
_just_seen_rtc = {};
_end_of_stream = false;
return _rd.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
clear_buffer();
_just_seen_rtc = {};
_end_of_stream = false;
return _rd.fast_forward_to(std::move(pr));
}
virtual future<> close() noexcept override {
return _rd.close();
}
};

static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) {
return [evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations, gc_clock::time_point) mutable {
// We need to group mutations that have the same token so they land on the same shard.
Expand Down Expand Up @@ -87,7 +181,21 @@ static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer
auto mr = make_multishard_combining_reader_v2_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s,
std::move(permit), range, slice, trace_state, fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
mr = make_forwardable(std::move(mr));
}
if (single_fragment_buffer) {
// Recreating the evictable_reader conservatively closes the active range tombstone
// (because it could have disappeared in the meantime), and -- if the data didn't
// change after all -- reopens it immediately.
//
// The single_fragment_buffer variant of the test causes such recreations,
// adding redundant range tombstone change pairs to the mutation fragment stream.
// Some tests expect a particular stream, so they reject those redundant RTCs.
//
// We could modify the tests to check the semantics of the stream instead of its
// exact form, but catching redundancy is a good thing in general,
// so we opt for removing the redundant pairs only here, where they are expected.
mr = make_flat_mutation_reader_v2<trivial_rtc_removing_reader>(std::move(mr));
}
return mr;
});
Expand Down

0 comments on commit 112e02e

Please sign in to comment.