Skip to content

Commit

Permalink
table: Optimize creation of reader excluding staging for view building
Browse files Browse the repository at this point in the history
View building from staging creates a reader from scratch (memtable
+ sstables - staging) for every partition, in order to calculate
the diff between new staging data and data in base sstable set,
and then pushes the result into the view replicas.

perf shows that the reader creation is very expensive:
+   12.15%    10.75%  reactor-3        scylla             [.] lexicographical_tri_compare<compound_type<(allow_prefixes)0>::iterator, compound_type<(allow_prefixes)0>::iterator, legacy_compound_view<compound_type<(allow_prefixes)0> >::tri_comparator::operator()(managed_bytes_basic_view<(mutable_view)0>, managed_bytes
+   10.01%     9.99%  reactor-3        scylla             [.] boost::icl::is_empty<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    8.95%     8.94%  reactor-3        scylla             [.] legacy_compound_view<compound_type<(allow_prefixes)0> >::tri_comparator::operator()
+    7.29%     7.28%  reactor-3        scylla             [.] dht::ring_position_tri_compare
+    6.28%     6.27%  reactor-3        scylla             [.] dht::tri_compare
+    4.11%     3.52%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst+    4.09%     4.07%  reactor-3        scylla             [.] sstables::index_consume_entry_context<sstables::index_consumer>::process_state
+    3.46%     0.93%  reactor-3        scylla             [.] sstables::sstable_run::will_introduce_overlapping
+    2.53%     2.53%  reactor-3        libstdc++.so.6     [.] std::_Rb_tree_increment
+    2.45%     2.45%  reactor-3        scylla             [.] boost::icl::non_empty::exclusive_less<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    2.14%     2.13%  reactor-3        scylla             [.] boost::icl::exclusive_less<boost::icl::continuous_interval<compatible_ring_position_or_view, std::less> >
+    2.07%     2.07%  reactor-3        scylla             [.] logalloc::region_impl::free
+    2.06%     1.91%  reactor-3        scylla             [.] sstables::index_consumer::consume_entry(sstables::parsed_partition_index_entry&&)::{lambda()#1}::operator()() const::{lambda()#1}::operator()
+    2.04%     2.04%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst+    1.87%     0.00%  reactor-3        [kernel.kallsyms]  [k] entry_SYSCALL_64_after_hwframe
+    1.86%     0.00%  reactor-3        [kernel.kallsyms]  [k] do_syscall_64
+    1.39%     1.38%  reactor-3        libc.so.6          [.] __memcmp_avx2_movbe
+    1.37%     0.92%  reactor-3        scylla             [.] boost::icl::segmental::join_left<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::
+    1.34%     1.33%  reactor-3        scylla             [.] logalloc::region_impl::alloc_small
+    1.33%     1.33%  reactor-3        scylla             [.] seastar::memory::small_pool::add_more_objects
+    1.30%     0.35%  reactor-3        scylla             [.] seastar::reactor::do_run
+    1.29%     1.29%  reactor-3        scylla             [.] seastar::memory::allocate
+    1.19%     0.05%  reactor-3        libc.so.6          [.] syscall
+    1.16%     1.04%  reactor-3        scylla             [.] boost::icl::interval_base_map<boost::icl::interval_map<compatible_ring_position_or_view, std::unordered_set<seastar::lw_shared_ptr<sstables::sstable>, std::hash<seastar::lw_shared_ptr<sstables::sstable> >, std::equal_to<seastar::lw_shared_ptr<sstables::sst
+    1.07%     0.79%  reactor-3        scylla             [.] sstables::partitioned_sstable_set::insert

That shows some significant amount of work for inserting sstables
into the interval map and maintaining the sstable run (which sorts
fragments by first key and checks for overlapping).

The interval map is known for having issues with L0 sstables, as
it will have to be replicated almost to every single interval
stored by the map, causing terrible space and time complexity.
With enough L0 sstables, it can fall into quadratic behavior.

This overhead is fixed by not building a new fresh sstable set
when recreating the reader, but rather supplying a predicate
to sstable set that will filter out staging sstables when
creating either a single-key or range scan reader.

This could have another benefit over today's approach which
may incorrectly consider a staging sstable as non-staging, if
the staging sst wasn't included in the current batch for view
building.

With this improvement, view building was measured to be 3x faster.

from
INFO  2023-06-16 12:36:40,014 [shard 0] view_update_generator - Processed keyspace1.standard1: 5 sstables in 963957ms = 50kB/s

to
INFO  2023-06-16 14:47:12,129 [shard 0] view_update_generator - Processed keyspace1.standard1: 5 sstables in 319899ms = 150kB/s

Refs scylladb#14089.
Fixes scylladb#14244.

Signed-off-by: Raphael S. Carvalho <[email protected]>
  • Loading branch information
raphaelsc committed Jun 27, 2023
1 parent 1ff8645 commit 1d8cb32
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 43 deletions.
8 changes: 4 additions & 4 deletions replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ private:
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const;
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& = sstables::default_sstable_predicate()) const;

lw_shared_ptr<sstables::sstable_set> make_maintenance_sstable_set() const;
lw_shared_ptr<sstables::sstable_set> make_compound_sstable_set();
Expand Down Expand Up @@ -667,9 +668,8 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
flat_mutation_reader_v2 make_reader_v2_excluding_sstables(schema_ptr schema,
flat_mutation_reader_v2 make_reader_v2_excluding_staging(schema_ptr schema,
reader_permit permit,
std::vector<sstables::shared_sstable>& sst,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state = nullptr,
Expand Down Expand Up @@ -706,7 +706,7 @@ public:
sstables::shared_sstable make_streaming_staging_sstable();

mutation_source as_mutation_source() const;
mutation_source as_mutation_source_excluding(std::vector<sstables::shared_sstable>& sst) const;
mutation_source as_mutation_source_excluding_staging() const;

void set_virtual_reader(mutation_source virtual_reader) {
_virtual_reader = std::move(virtual_reader);
Expand Down
31 changes: 13 additions & 18 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ table::make_sstable_reader(schema_ptr s,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstables::sstable_predicate& predicate) const {
// CAVEAT: if make_sstable_reader() is called on a single partition
// we want to optimize and read exactly this partition. As a
// consequence, fast_forward_to() will *NOT* work on the result,
Expand All @@ -109,10 +110,10 @@ table::make_sstable_reader(schema_ptr s,
}

return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr);
_stats.estimated_sstable_per_read, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
} else {
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice,
std::move(trace_state), fwd, fwd_mr);
std::move(trace_state), fwd, fwd_mr, default_read_monitor_generator(), predicate);
}
}

Expand Down Expand Up @@ -2551,9 +2552,8 @@ void table::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
}

flat_mutation_reader_v2
table::make_reader_v2_excluding_sstables(schema_ptr s,
table::make_reader_v2_excluding_staging(schema_ptr s,
reader_permit permit,
std::vector<sstables::shared_sstable>& excluded,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
Expand All @@ -2565,16 +2565,11 @@ table::make_reader_v2_excluding_sstables(schema_ptr s,
readers.reserve(memtable_count + 1);
});

auto excluded_ssts = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(excluded);
auto effective_sstables = make_lw_shared(_compaction_strategy.make_sstable_set(_schema));
_sstables->for_each_sstable([&excluded_ssts, &effective_sstables] (const sstables::shared_sstable& sst) mutable {
if (excluded_ssts.contains(sst)) {
return;
}
effective_sstables->insert(sst);
});
static std::predicate<const sstable&> auto excl_staging_predicate = [] (const sstable& sst) {
return !sst.requires_view_building();
};

readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, std::move(trace_state), fwd, fwd_mr));
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, std::move(trace_state), fwd, fwd_mr, excl_staging_predicate));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
}

Expand Down Expand Up @@ -2714,22 +2709,22 @@ table::stream_view_replica_updates(shared_ptr<db::view::view_update_generator> g
s,
std::move(m),
timeout,
as_mutation_source_excluding(excluded_sstables),
as_mutation_source_excluding_staging(),
tracing::trace_state_ptr(),
*_config.streaming_read_concurrency_semaphore,
query::partition_slice::option_set::of<query::partition_slice::option::bypass_cache>());
}

mutation_source
table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts) const {
return mutation_source([this, &ssts] (schema_ptr s,
table::as_mutation_source_excluding_staging() const {
return mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return this->make_reader_v2_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, std::move(trace_state), fwd, fwd_mr);
return this->make_reader_v2_excluding_staging(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
});
}

Expand Down
53 changes: 37 additions & 16 deletions sstables/sstable_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -830,10 +830,22 @@ make_pk_filter(const dht::ring_position& pos, const schema& schema) {
};
}

// Filter out sstables for reader using bloom filter
const sstable_predicate& default_sstable_predicate() {
static const sstable_predicate predicate = [] (const sstable&) { return true; };
return predicate;
}

static std::predicate<const sstable&> auto
make_sstable_filter(const dht::ring_position& pos, const schema& schema, const sstable_predicate& predicate) {
return [pk_filter = make_pk_filter(pos, schema), &predicate] (const sstable& sst) {
return predicate(sst) && pk_filter(sst);
};
}

// Filter out sstables for reader using bloom filter and supplied predicate
static std::vector<shared_sstable>
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos) {
auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); };
filter_sstable_for_reader(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos, const sstable_predicate& predicate) {
auto filter = [_filter = make_sstable_filter(pos, schema, predicate)] (const shared_sstable& sst) { return !_filter(*sst); };
sstables.erase(boost::remove_if(sstables, filter), sstables.end());
return std::move(sstables);
}
Expand Down Expand Up @@ -887,10 +899,11 @@ sstable_set_impl::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const
{
const auto& pos = pr.start()->value();
auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos);
auto selected_sstables = filter_sstable_for_reader(select(pr), *schema, pos, predicate);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader_v2(schema, permit);
Expand Down Expand Up @@ -929,7 +942,8 @@ time_series_sstable_set::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
const auto& pos = pr.start()->value();
// First check if the optimized algorithm for TWCS single partition queries can be applied.
// Multiple conditions must be satisfied:
Expand All @@ -951,11 +965,11 @@ time_series_sstable_set::create_single_key_sstable_reader(
// Some of the conditions were not satisfied so we use the standard query path.
return sstable_set_impl::create_single_key_sstable_reader(
cf, std::move(schema), std::move(permit), sstable_histogram,
pr, slice, std::move(trace_state), fwd_sm, fwd_mr);
pr, slice, std::move(trace_state), fwd_sm, fwd_mr, predicate);
}

auto pk_filter = make_pk_filter(pos, *schema);
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); });
auto sst_filter = make_sstable_filter(pos, *schema, predicate);
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return sst_filter(*e.second); });
if (it == _sstables->end()) {
// No sstables contain data for the queried partition.
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
Expand All @@ -968,6 +982,7 @@ time_series_sstable_set::create_single_key_sstable_reader(
return sst.make_reader(schema, permit, pr, slice, trace_state, fwd_sm);
};

auto pk_filter = make_pk_filter(pos, *schema);
auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); };

// We're going to pass this filter into sstable_position_reader_queue. The queue guarantees that
Expand Down Expand Up @@ -1168,7 +1183,8 @@ compound_sstable_set::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
auto sets = _sets;
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->size() > 0; });
auto non_empty_set_count = std::distance(sets.begin(), it);
Expand All @@ -1179,13 +1195,13 @@ compound_sstable_set::create_single_key_sstable_reader(
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
if (non_empty_set_count == 1) {
const auto& non_empty_set = *std::begin(sets);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr);
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
}

auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
boost::make_iterator_range(sets.begin(), it)
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr);
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, trace_state, fwd, fwd_mr, predicate);
})
);
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
Expand All @@ -1201,10 +1217,11 @@ sstable_set::create_single_key_sstable_reader(
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate) const {
assert(pr.is_singular() && pr.start()->value().has_key());
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr);
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate);
}

flat_mutation_reader_v2
Expand Down Expand Up @@ -1240,11 +1257,15 @@ sstable_set::make_local_shard_sstable_reader(
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor_generator& monitor_generator) const
read_monitor_generator& monitor_generator,
const sstable_predicate& predicate) const
{
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator]
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate]
(shared_sstable& sst, const dht::partition_range& pr) mutable {
assert(!sst->is_shared());
if (!predicate(*sst)) {
return make_empty_flat_reader_v2(s, permit);
}
return sst->make_reader(s, permit, pr, slice, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
if (_impl->size() == 1) [[unlikely]] {
Expand Down
13 changes: 10 additions & 3 deletions sstables/sstable_set.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public:
virtual std::tuple<dht::partition_range, std::vector<shared_sstable>, dht::ring_position_ext> select(const dht::ring_position_view&) = 0;
};

using sstable_predicate = noncopyable_function<bool(const sstable&)>;
// Default predicate includes everything
const sstable_predicate& default_sstable_predicate();

class sstable_set_impl {
public:
virtual ~sstable_set_impl() {}
Expand All @@ -78,7 +82,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
mutation_reader::forwarding,
const sstable_predicate&) const;
};

class sstable_set : public enable_lw_shared_from_this<sstable_set> {
Expand Down Expand Up @@ -167,7 +172,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
mutation_reader::forwarding,
const sstable_predicate& p = default_sstable_predicate()) const;

/// Read a range from the sstable set.
///
Expand All @@ -192,7 +198,8 @@ public:
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
read_monitor_generator& rmg = default_read_monitor_generator(),
const sstable_predicate& p = default_sstable_predicate()) const;

flat_mutation_reader_v2 make_crawling_reader(
schema_ptr,
Expand Down
6 changes: 4 additions & 2 deletions sstables/sstable_set_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const override;
mutation_reader::forwarding,
const sstable_predicate&) const override;

friend class sstable_position_reader_queue;
};
Expand Down Expand Up @@ -147,7 +148,8 @@ public:
const query::partition_slice&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const override;
mutation_reader::forwarding,
const sstable_predicate&) const override;

class incremental_selector;
};
Expand Down
Loading

0 comments on commit 1d8cb32

Please sign in to comment.