From 47b0c7ecada9a5c518dcc082762c14c4dc0e6b10 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 23 Jun 2023 17:02:09 +0200 Subject: [PATCH 1/2] readers: evictable_reader: don't accidentally consume the entire partition The evictable reader must ensure that each buffer fill makes forward progress, i.e. the last fragment in the buffer has a position larger than the last fragment from the previous buffer-fill. Otherwise, the reader could get stuck in an infinite loop between buffer fills, if the reader is evicted in-between. The code guranteeing this forward progress had a bug: the comparison between the position after the last buffer-fill and the current last fragment position was done in the wrong direction. So if the condition that we wanted to achieve was already true, we would continue filling the buffer until partition end which may lead to OOMs such as in #13491. There was already a fix in this area to handle `partition_start` fragments correctly - #13563 - but it missed that the position comparison was done in the wrong order. Fix the comparison and adjust one of the tests (added in #13563) to detect this case. Fixes #13491 --- readers/multishard.cc | 2 +- test/boost/mutation_reader_test.cc | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/readers/multishard.cc b/readers/multishard.cc index 4c65b71e014a..3abfecc6318b 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -607,7 +607,7 @@ future<> evictable_reader_v2::fill_buffer() { // First make sure we've made progress w.r.t. _next_position_in_partition. // This loop becomes inifinite when next pos is a partition start. // In that case progress is guranteed anyway, so skip this loop entirely. - while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) { + while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(buffer().back().position(), _next_position_in_partition) <= 0) { push_mutation_fragment(_reader->pop_mutation_fragment()); next_mf = co_await _reader->peek(); } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 1773392d814d..be9b7cb736b8 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3648,9 +3648,13 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) { auto stop_rd = deferred_close(rd); rd.set_max_buffer_size(max_buf_size); + // #13491 - the reader must not consume the entire partition but a small batch of fragments based on the buffer size. + rd.fill_buffer().get(); rd.fill_buffer().get(); auto buf1 = rd.detach_buffer(); - BOOST_REQUIRE_EQUAL(buf1.size(), 3); + // There should be 6-7 fragments, but to avoid computing the exact number of fragments that should fit in `max_buf_size`, + // just ensure that there are <= 10 (consuming the whole partition would give ~1000 fragments). + BOOST_REQUIRE_LE(buf1.size(), 10); } struct mutation_bounds { From a4ba68ac8629977973fdfb7b4a13c7a6b201c8f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Tue, 27 Jun 2023 07:35:38 +0200 Subject: [PATCH 2/2] test: boost: mulitshard_combining_reader_as_mutation_source_test: allow redundant range tombstone changes test_range_tombstones_v2 is too strict for this reader -- it expects a particular sequence of range tombstone changes, but multishard_combining_reader, when tested with a small buffer, generates -- as expected -- additional (redundant) range tombstone change pairs (end+start). We would rather not modify test_range_tombstones_v2, because catching redundancy is good in general, so to fix this problem, we wrap the tested reader with a filter that removes these redundant range tombstone changes, when they are expected. --- ...ombining_reader_as_mutation_source_test.cc | 110 +++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/test/boost/multishard_combining_reader_as_mutation_source_test.cc b/test/boost/multishard_combining_reader_as_mutation_source_test.cc index 3c8c45c67839..108018fda358 100644 --- a/test/boost/multishard_combining_reader_as_mutation_source_test.cc +++ b/test/boost/multishard_combining_reader_as_mutation_source_test.cc @@ -30,6 +30,100 @@ // It has to be a container that does not invalidate pointers static std::list keep_alive_sharder; +// Filters out range tombstone changes which are redundant due to being +// followed by another range tombstone change with equal position. +// +// I.e. if the source reader emits multiple consecutive range range tombstone +// changes with equal positions, only the last will be ultimately emitted. +class trivial_rtc_removing_reader : public flat_mutation_reader_v2::impl { + // Source reader. + flat_mutation_reader_v2 _rd; + // Stores the latest mutation fragment, if it was a range tombstone change. + mutation_fragment_v2_opt _just_seen_rtc; +public: + trivial_rtc_removing_reader(flat_mutation_reader_v2 rd) + : impl(rd.schema(), rd.permit()) + , _rd(std::move(rd)) + {} + virtual future<> fill_buffer() override { + // The logic of this reader, in pseudocode: + // for each mf in _rd: + // if mf is a range tombstone change with equal position to _just_seen_rtc: + // discard _just_seen_rtc + // put mf into _just_seen_rtc + // else if mf is a range tombstone change with different position to _just_seen_rtc (or _just_seen_rtc is empty): + // emit _just_seen_rtc if present + // put mf into _just_seen_rtc + // else: + // emit _just_seen_rtc if present + // emit mf + while (!is_buffer_full()) { + if (!_rd.is_buffer_empty()) { + // for each mf in _rd: + auto mf = _rd.pop_mutation_fragment(); + if (mf.is_range_tombstone_change()) { + // if mf is a range_tombstone_change... + if (_just_seen_rtc + && position_in_partition::equal_compare(*_schema)( + mf.as_range_tombstone_change().position(), + _just_seen_rtc->as_range_tombstone_change().position()) + ) { + // ...with equal position to _just_seen_rtc: + // discard _just_seen_rtc + // (It will be overwritten by mf later). + } else { + // ...with different position to _just_seen_rtc (or _just_seen_rtc is empty): + // emit _just_seen_rtc if present + if (_just_seen_rtc) { + push_mutation_fragment(std::move(*std::exchange(_just_seen_rtc, {}))); + } + } + // put mf into _just_seen_rtc + _just_seen_rtc = std::move(mf); + } else { + // else: + // emit _just_seen_rtc if present + if (_just_seen_rtc) { + push_mutation_fragment(std::move(*std::exchange(_just_seen_rtc, {}))); + } + // emit mf + push_mutation_fragment(std::move(mf)); + } + } else if (!_rd.is_end_of_stream()) { + co_await _rd.fill_buffer(); + } else if (_just_seen_rtc) { + // If _just_seen_rtc was the last element in the stream, emit it. + push_mutation_fragment(std::move(*std::exchange(_just_seen_rtc, {}))); + } else { + _end_of_stream = true; + break; + } + co_await coroutine::maybe_yield(); + } + } + virtual future<> next_partition() override { + clear_buffer_to_next_partition(); + _just_seen_rtc = {}; + _end_of_stream = false; + return _rd.next_partition(); + } + virtual future<> fast_forward_to(const dht::partition_range& pr) override { + clear_buffer(); + _just_seen_rtc = {}; + _end_of_stream = false; + return _rd.fast_forward_to(pr); + } + virtual future<> fast_forward_to(position_range pr) override { + clear_buffer(); + _just_seen_rtc = {}; + _end_of_stream = false; + return _rd.fast_forward_to(std::move(pr)); + } + virtual future<> close() noexcept override { + return _rd.close(); + } +}; + static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer) { return [evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector& mutations, gc_clock::time_point) mutable { // We need to group mutations that have the same token so they land on the same shard. @@ -87,7 +181,21 @@ static auto make_populate(bool evict_paused_readers, bool single_fragment_buffer auto mr = make_multishard_combining_reader_v2_for_tests(keep_alive_sharder.back(), std::move(lifecycle_policy), s, std::move(permit), range, slice, trace_state, fwd_mr); if (fwd_sm == streamed_mutation::forwarding::yes) { - return make_forwardable(std::move(mr)); + mr = make_forwardable(std::move(mr)); + } + if (single_fragment_buffer) { + // Recreating the evictable_reader conservatively closes the active range tombstone + // (because it could have disappeared in the meantime), and -- if the data didn't + // change after all -- reopens it immediately. + // + // The single_fragment_buffer variant of the test causes such recreations, + // adding redundant range tombstone change pairs to the mutation fragment stream. + // Some tests expect a particular stream, so they reject those redundant RTCs. + // + // We could modify the tests to check the semantics of the stream instead of its + // exact form, but catching redundancy is a good thing in general, + // so we opt for removing the redundant pairs only here, where they are expected. + mr = make_flat_mutation_reader_v2(std::move(mr)); } return mr; });