From 1d8cb32a5d467d483ae3b84af70b94eccfc3f847 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 19 Jun 2023 17:56:15 -0300 Subject: [PATCH] table: Optimize creation of reader excluding staging for view building View building from staging creates a reader from scratch (memtable + sstables - staging) for every partition, in order to calculate the diff between new staging data and data in base sstable set, and then pushes the result into the view replicas. perf shows that the reader creation is very expensive: + 12.15% 10.75% reactor-3 scylla [.] lexicographical_tri_compare::iterator, compound_type<(allow_prefixes)0>::iterator, legacy_compound_view >::tri_comparator::operator()(managed_bytes_basic_view<(mutable_view)0>, managed_bytes + 10.01% 9.99% reactor-3 scylla [.] boost::icl::is_empty > + 8.95% 8.94% reactor-3 scylla [.] legacy_compound_view >::tri_comparator::operator() + 7.29% 7.28% reactor-3 scylla [.] dht::ring_position_tri_compare + 6.28% 6.27% reactor-3 scylla [.] dht::tri_compare + 4.11% 3.52% reactor-3 scylla [.] boost::icl::interval_base_map, std::hash >, std::equal_to::process_state + 3.46% 0.93% reactor-3 scylla [.] sstables::sstable_run::will_introduce_overlapping + 2.53% 2.53% reactor-3 libstdc++.so.6 [.] std::_Rb_tree_increment + 2.45% 2.45% reactor-3 scylla [.] boost::icl::non_empty::exclusive_less > + 2.14% 2.13% reactor-3 scylla [.] boost::icl::exclusive_less > + 2.07% 2.07% reactor-3 scylla [.] logalloc::region_impl::free + 2.06% 1.91% reactor-3 scylla [.] sstables::index_consumer::consume_entry(sstables::parsed_partition_index_entry&&)::{lambda()#1}::operator()() const::{lambda()#1}::operator() + 2.04% 2.04% reactor-3 scylla [.] boost::icl::interval_base_map, std::hash >, std::equal_to, std::hash >, std::equal_to, std::hash >, std::equal_to --- replica/database.hh | 8 +-- replica/table.cc | 31 +++++------ sstables/sstable_set.cc | 53 ++++++++++++------ sstables/sstable_set.hh | 13 +++-- sstables/sstable_set_impl.hh | 6 ++- test/boost/sstable_datafile_test.cc | 84 +++++++++++++++++++++++++++++ 6 files changed, 152 insertions(+), 43 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index d8f29a9ac47b..4ee69903a03a 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -590,7 +590,8 @@ private: const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const; + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const; lw_shared_ptr make_maintenance_sstable_set() const; lw_shared_ptr make_compound_sstable_set(); @@ -667,9 +668,8 @@ public: tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; - flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema, + flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema, reader_permit permit, - std::vector& sst, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state = nullptr, @@ -706,7 +706,7 @@ public: sstables::shared_sstable make_streaming_staging_sstable(); mutation_source as_mutation_source() const; - mutation_source as_mutation_source_excluding(std::vector& sst) const; + mutation_source as_mutation_source_excluding_staging() const; void set_virtual_reader(mutation_source virtual_reader) { _virtual_reader = std::move(virtual_reader); diff --git a/replica/table.cc b/replica/table.cc index 3cbd8b9930fe..ab78a28f1fc3 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -97,7 +97,8 @@ table::make_sstable_reader(schema_ptr s, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstables::sstable_predicate& predicate) const { // CAVEAT: if make_sstable_reader() is called on a single partition // we want to optimize and read exactly this partition. As a // consequence, fast_forward_to() will *NOT* work on the result, @@ -109,10 +110,10 @@ table::make_sstable_reader(schema_ptr s, } return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), - _stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr); + _stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate); } else { return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, - std::move(trace_state), fwd, fwd_mr); + std::move(trace_state), fwd, fwd_mr, default_read_monitor_generator(), predicate); } } @@ -2551,9 +2552,8 @@ void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept { } flat_mutation_reader_v2 -table::make_reader_v2_excluding_sstables(schema_ptr s, +table::make_reader_v2_excluding_staging(schema_ptr s, reader_permit permit, - std::vector& excluded, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, @@ -2565,16 +2565,11 @@ table::make_reader_v2_excluding_sstables(schema_ptr s, readers.reserve(memtable_count + 1); }); - auto excluded_ssts = boost::copy_range>(excluded); - auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema)); - _sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable { - if (excluded_ssts.contains(sst)) { - return; - } - effective_sstables->insert(sst); - }); + static std::predicate auto excl_staging_predicate = [] (const sstable& sst) { + return !sst.requires_view_building(); + }; - readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate)); return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr); } @@ -2714,22 +2709,22 @@ table::stream_view_replica_updates(shared_ptr g s, std::move(m), timeout, - as_mutation_source_excluding(excluded_sstables), + as_mutation_source_excluding_staging(), tracing::trace_state_ptr(), *_config.streaming_read_concurrency_semaphore, query::partition_slice::option_set::of()); } mutation_source -table::as_mutation_source_excluding(std::vector& ssts) const { - return mutation_source([this, &ssts] (schema_ptr s, +table::as_mutation_source_excluding_staging() const { + return mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr); }); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 2b07bacb712f..bacd457ae70f 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -830,10 +830,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) { }; } -// Filter out sstables for reader using bloom filter +const sstable_predicate& default_sstable_predicate() { + static const sstable_predicate predicate = [] (const sstable&) { return true; }; + return predicate; +} + +static std::predicate auto +make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) { + return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) { + return predicate(sst) && pk_filter(sst); + }; +} + +// Filter out sstables for reader using bloom filter and supplied predicate static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { - auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; +filter_sstable_for_reader(std::vector&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) { + auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); }; sstables.erase(boost::remove_if(sstables, filter), sstables.end()); return std::move(sstables); } @@ -887,10 +899,11 @@ sstable_set_impl::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); - auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos); + auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate); auto num_sstables = selected_sstables.size(); if (!num_sstables) { return make_empty_flat_reader_v2(schema, permit); @@ -929,7 +942,8 @@ time_series_sstable_set::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd_sm, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { const auto& pos = pr.start()->value(); // First check if the optimized algorithm for TWCS single partition queries can be applied. // Multiple conditions must be satisfied: @@ -951,11 +965,11 @@ time_series_sstable_set::create_single_key_sstable_reader( // Some of the conditions were not satisfied so we use the standard query path. return sstable_set_impl::create_single_key_sstable_reader( cf, std::move(schema), std::move(permit), sstable_histogram, - pr, slice, std::move(trace_state), fwd_sm, fwd_mr); + pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate); } - auto pk_filter = make_pk_filter(pos, *schema); - auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); }); + auto sst_filter = make_sstable_filter(pos, *schema, predicate); + auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); }); if (it == _sstables->end()) { // No sstables contain data for the queried partition. return make_empty_flat_reader_v2(std::move(schema), std::move(permit)); @@ -968,6 +982,7 @@ time_series_sstable_set::create_single_key_sstable_reader( return sst.make_reader(schema, permit, pr, slice, trace_state, fwd_sm); }; + auto pk_filter = make_pk_filter(pos, *schema); auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); }; // We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that @@ -1168,7 +1183,8 @@ compound_sstable_set::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { auto sets = _sets; auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; }); auto non_empty_set_count = std::distance(sets.begin(), it); @@ -1179,13 +1195,13 @@ compound_sstable_set::create_single_key_sstable_reader( // optimize for common case where only 1 set is populated, avoiding the expensive combined reader if (non_empty_set_count == 1) { const auto& non_empty_set = *std::begin(sets); - return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate); } auto readers = boost::copy_range>( boost::make_iterator_range(sets.begin(), it) | boost::adaptors::transformed([&] (const lw_shared_ptr& non_empty_set) { - return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr); + return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate); }) ); return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr); @@ -1201,10 +1217,11 @@ sstable_set::create_single_key_sstable_reader( const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const { + mutation_reader::forwarding fwd_mr, + const sstable_predicate& predicate) const { assert(pr.is_singular() && pr.start()->value().has_key()); return _impl->create_single_key_sstable_reader(cf, std::move(schema), - std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr); + std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate); } flat_mutation_reader_v2 @@ -1240,11 +1257,15 @@ sstable_set::make_local_shard_sstable_reader( tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) const + read_monitor_generator& monitor_generator, + const sstable_predicate& predicate) const { - auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator] + auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate] (shared_sstable& sst, const dht::partition_range& pr) mutable { assert(!sst->is_shared()); + if (!predicate(*sst)) { + return make_empty_flat_reader_v2(s, permit); + } return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst)); }; if (_impl->size() == 1) [[unlikely]] { diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 33b23319db46..e3176fba2885 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -55,6 +55,10 @@ public: virtual std::tuple, dht::ring_position_ext> select(const dht::ring_position_view&) = 0; }; +using sstable_predicate = noncopyable_function; +// Default predicate includes everything +const sstable_predicate& default_sstable_predicate(); + class sstable_set_impl { public: virtual ~sstable_set_impl() {} @@ -78,7 +82,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate&) const; }; class sstable_set : public enable_lw_shared_from_this { @@ -167,7 +172,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const; + mutation_reader::forwarding, + const sstable_predicate& p = default_sstable_predicate()) const; /// Read a range from the sstable set. /// @@ -192,7 +198,8 @@ public: tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()) const; + read_monitor_generator& rmg = default_read_monitor_generator(), + const sstable_predicate& p = default_sstable_predicate()) const; flat_mutation_reader_v2 make_crawling_reader( schema_ptr, diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh index e236567c7800..b52f285f8d24 100644 --- a/sstables/sstable_set_impl.hh +++ b/sstables/sstable_set_impl.hh @@ -114,7 +114,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; friend class sstable_position_reader_queue; }; @@ -147,7 +148,8 @@ public: const query::partition_slice&, tracing::trace_state_ptr, streamed_mutation::forwarding, - mutation_reader::forwarding) const override; + mutation_reader::forwarding, + const sstable_predicate&) const override; class incremental_selector; }; diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index f96ee272fa96..81ae7928f9b1 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -3108,3 +3108,87 @@ SEASTAR_TEST_CASE(test_sstable_bytes_on_disk_correctness) { SEASTAR_TEST_CASE(test_sstable_bytes_on_s3_correctness) { return test_sstable_bytes_correctness(get_name() + "_s3", test_env_config{ .storage = make_test_object_storage_options() }); } + +SEASTAR_TEST_CASE(test_sstable_set_predicate) { + return test_env::do_with_async([] (test_env& env) { + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(2, 4), + std::uniform_int_distribution(2, 8), + std::uniform_int_distribution(2, 8)); + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + auto s = random_schema.schema(); + + testlog.info("Random schema:\n{}", random_schema.cql()); + + const auto muts = tests::generate_random_mutations(random_schema, 20).get(); + + auto sst = make_sstable_containing(env.make_sstable(s), muts); + + auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); + sstable_set set = cs.make_sstable_set(s); + set.insert(sst); + + auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key()); + + auto make_point_query_reader = [&] (std::predicate auto& pred) { + auto t = env.make_table_for_tests(s); + auto close_t = deferred_stop(t); + utils::estimated_histogram eh; + return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh, + first_key_pr, + s->full_slice(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + pred); + }; + + auto make_full_scan_reader = [&] (std::predicate auto& pred) { + return set.make_local_shard_sstable_reader(s, env.make_reader_permit(), + query::full_partition_range, + s->full_slice(), + tracing::trace_state_ptr(), + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no, + default_read_monitor_generator(), + pred); + }; + + auto verify_reader_result = [&] (flat_mutation_reader_v2 sst_mr, bool expect_eos) { + auto close_mr = deferred_close(sst_mr); + auto sst_mut = read_mutation_from_flat_mutation_reader(sst_mr).get0(); + + if (expect_eos) { + BOOST_REQUIRE(sst_mr.is_buffer_empty()); + BOOST_REQUIRE(sst_mr.is_end_of_stream()); + BOOST_REQUIRE(!sst_mut); + } else { + BOOST_REQUIRE(sst_mut); + } + }; + + { + static std::predicate auto excluding_pred = [] (const sstable&) { + return false; + }; + + testlog.info("excluding_pred: point query"); + verify_reader_result(make_point_query_reader(excluding_pred), true); + testlog.info("excluding_pred: range query"); + verify_reader_result(make_full_scan_reader(excluding_pred), true); + } + + { + static std::predicate auto inclusive_pred = [] (const sstable&) { + return true; + }; + + testlog.info("inclusive_pred: point query"); + verify_reader_result(make_point_query_reader(inclusive_pred), false); + testlog.info("inclusive_pred: range query"); + verify_reader_result(make_full_scan_reader(inclusive_pred), false); + } + }); +}