Skip to content

Commit

Permalink
storage_service: merge_topology_snapshot: freeze_gently
Browse files Browse the repository at this point in the history
Freezing large mutations synchronously may cause
reactor stalls, as seen in the test_add_many_nodes_under_load
dtest:
```
  ++[1#1/37 5%] addr=0x15b0bf total=99 count=2 avg=50: ?? ??:0
  | ++[2#1/2 67%] addr=0x15a331f total=66 count=1 avg=66:
  | |             bytes_ostream::write at ././bytes_ostream.hh:248
  | |             (inlined by) bytes_ostream::write at ././bytes_ostream.hh:263
  | |             (inlined by) ser::serialize_integral<unsigned int, bytes_ostream> at ././serializer.hh:203
  | |             (inlined by) ser::integral_serializer<unsigned int>::write<bytes_ostream> at ././serializer.hh:217
  | |             (inlined by) ser::serialize<unsigned int, bytes_ostream> at ././serializer.hh:254
  | |             (inlined by) ser::writer_of_column<bytes_ostream>::write_id at ./build/dev/gen/idl/mutation.dist.impl.hh:4680
  | | ++[3#1/1 100%] addr=0x159df71 total=132 count=2 avg=66:
  | | |              (anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}::operator() at ./mutation/mutation_partition_serializer.cc:99
  | | |              (inlined by) row::maybe_invoke_with_hash<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1} const, cell_and_hash const> at ./mutation/mutation_partition.hh:133
  | | |              (inlined by) row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}::operator() at ./mutation/mutation_partition.hh:152
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>::operator() at ././utils/compact-radix-tree.hh:1888
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::visit_slot<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&> at ././utils/compact-radix-tree.hh:1560
  | | ++           - addr=0x159d84d:
  | | |              compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&> at ././utils/compact-radix-tree.hh:1364
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::node_base<cell_and_hash, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> >::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> > at ././utils/compact-radix-tree.hh:799
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::node_base<cell_and_hash, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> >::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&> at ././utils/compact-radix-tree.hh:807
  | |   ++[4#1/1 100%] addr=0x1596f4a total=329 count=5 avg=66:
  | |   |              compact_radix_tree::tree<cell_and_hash, unsigned int>::node_head::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true> > at ././utils/compact-radix-tree.hh:473
  | |   |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true> > at ././utils/compact-radix-tree.hh:1626
  | |   |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::walk<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}> at ././utils/compact-radix-tree.hh:1909
  | |   |              (inlined by) row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}> at ./mutation/mutation_partition.hh:151
  | |   |              (inlined by) (anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:97
  | |   |              (inlined by) write_row<ser::writer_of_deletable_row<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:168
  | |     ++[5#1/2 80%] addr=0x15a310c total=263 count=4 avg=66:
  | |     |             mutation_partition_serializer::write_serialized<ser::writer_of_mutation_partition<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:180
  | |     | ++[6#1/2 62%] addr=0x14eb60a total=428 count=7 avg=61:
  | |     | |             frozen_mutation::frozen_mutation(mutation const&)::$_0::operator()<ser::writer_of_mutation_partition<bytes_ostream> > at ./mutation/frozen_mutation.cc:85
  | |     | |             (inlined by) ser::after_mutation__key<bytes_ostream>::partition<frozen_mutation::frozen_mutation(mutation const&)::$_0> at ./build/dev/gen/idl/mutation.dist.impl.hh:7058
  | |     | |             (inlined by) frozen_mutation::frozen_mutation at ./mutation/frozen_mutation.cc:84
  | |     | | ++[7#1/1 100%] addr=0x14ed388 total=532 count=9 avg=59:
  | |     | | |              freeze at ./mutation/frozen_mutation.cc:143
  | |     | |   ++[8#1/2 74%] addr=0x252cf55 total=394 count=6 avg=66:
  | |     | |   |             service::storage_service::merge_topology_snapshot at ./service/storage_service.cc:763
```

This change uses freeze_gently to freeze
the cdc_generations_v2 mutations one at a time
to prevent the stalls reported above.

Signed-off-by: Benny Halevy <[email protected]>
  • Loading branch information
bhalevy committed May 2, 2024
1 parent a016e1d commit bc1985b
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -782,19 +782,21 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
// Split big mutations into smaller ones, prepare frozen_muts_to_apply
std::vector<frozen_mutation> frozen_muts_to_apply;
{
std::vector<mutation> muts_to_apply;
muts_to_apply.reserve(std::distance(it, snp.mutations.end()));
frozen_muts_to_apply.reserve(std::distance(it, snp.mutations.end()));
const auto max_size = _db.local().schema_commitlog()->max_record_size() / 2;
for (auto i = it; i != snp.mutations.end(); i++) {
const auto& m = *i;
auto mut = co_await to_mutation_gently(m, s);
if (m.representation().size() <= max_size) {
muts_to_apply.push_back(std::move(mut));
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
} else {
co_await split_mutation(std::move(mut), muts_to_apply, max_size);
std::vector<mutation> split_muts;
co_await split_mutation(std::move(mut), split_muts, max_size);
for (auto& mut : split_muts) {
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
}
}
}
frozen_muts_to_apply = freeze(muts_to_apply);
}

// Apply non-atomically so as not to hit the commitlog size limit.
Expand Down

0 comments on commit bc1985b

Please sign in to comment.