diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 5b6313e0ace8..dcd923411df7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1431,14 +1431,18 @@ class compaction_manager::cleanup_sstables_compaction_task : public compaction_m co_return std::nullopt; } private: - // Releases reference to cleaned files such that respective used disk space can be freed. - void release_exhausted(std::vector exhausted_sstables) { - _compacting.release_compacting(exhausted_sstables); - } - future<> run_cleanup_job(sstables::compaction_descriptor descriptor) { co_await coroutine::switch_to(_cm.compaction_sg().cpu); + // Releases reference to cleaned files such that respective used disk space can be freed. + auto release_exhausted = [this, &descriptor] (std::vector exhausted_sstables) mutable { + auto exhausted = boost::copy_range>(exhausted_sstables); + std::erase_if(descriptor.sstables, [&] (const sstables::shared_sstable& sst) { + return exhausted.contains(sst); + }); + _compacting.release_compacting(exhausted_sstables); + }; + for (;;) { compaction_backlog_tracker user_initiated(std::make_unique(_cm._compaction_controller.backlog_of_shares(200), _cm.available_memory())); _cm.register_backlog_tracker(user_initiated); @@ -1446,8 +1450,7 @@ class compaction_manager::cleanup_sstables_compaction_task : public compaction_m std::exception_ptr ex; try { setup_new_compaction(descriptor.run_identifier); - co_await compact_sstables_and_update_history(descriptor, _compaction_data, - std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1)); + co_await compact_sstables_and_update_history(descriptor, _compaction_data, release_exhausted); finish_compaction(); _cm.reevaluate_postponed_compactions(); co_return; // done with current job diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 8654b6bb6a27..ed4fd88e0edc 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -5100,3 +5100,102 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { run_cleanup_strategy_test(sstables::compaction_strategy_type::leveled, 64, empty_opts, 0ms, 1); }); } + +SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) { + return test_env::do_with_async([] (test_env& env) { + auto builder = schema_builder("tests", "test") + .with_column("id", utf8_type, column_kind::partition_key) + .with_column("value", int32_type); + builder.set_gc_grace_seconds(10000); + builder.set_compaction_strategy(sstables::compaction_strategy_type::leveled); + std::map opts = { + { "sstable_size_in_mb", "0" }, // makes sure that every mutation produces one fragment, to trigger incremental compaction + }; + builder.set_compaction_strategy_options(std::move(opts)); + auto s = builder.build(); + auto tmp = tmpdir(); + auto sst_gen = [&env, s, &tmp, gen = make_lw_shared(1)] () { + return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::get_highest_sstable_version(), big); + }; + + auto make_insert = [&] (partition_key key) { + mutation m(s, key); + m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), api::new_timestamp()); + return m; + }; + + std::vector> observers; + std::vector ssts; + size_t sstables_closed = 0; + size_t sstables_closed_during_cleanup = 0; + static constexpr size_t sstables_nr = 10; + + dht::token_range_vector owned_token_ranges; + + std::set merged; + for (auto i = 0; i < sstables_nr * 2; i++) { + merged.insert(make_insert(partition_key::from_exploded(*s, {to_bytes(to_sstring(i))}))); + } + + std::unordered_set gens; // input sstable generations + utils::UUID run_identifier = utils::make_random_uuid(); + auto merged_it = merged.begin(); + for (auto i = 0; i < sstables_nr; i++) { + auto mut1 = std::move(*merged_it); + merged_it++; + auto mut2 = std::move(*merged_it); + merged_it++; + auto sst = make_sstable_containing(sst_gen, { + std::move(mut1), + std::move(mut2) + }); + sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run. + sst->set_sstable_level(1); + + // every sstable will be eligible for cleanup, by having both an owned and unowned token. + owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token())); + + gens.insert(sst->generation()); + ssts.push_back(std::move(sst)); + } + + size_t last_input_sstable_count = sstables_nr; + { + column_family_for_tests t(env.manager(), s, tmp.path().string()); + auto stop = deferred_stop(t); + t->disable_auto_compaction().get(); + const dht::token_range_vector empty_owned_ranges; + for (auto&& sst : ssts) { + testlog.info("run id {}", sst->run_identifier()); + column_family_test(t).add_sstable(sst); + column_family_test::update_sstables_known_generation(*t, sst->generation().value()); + observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable { + auto sstables = t->get_sstables(); + auto input_sstable_count = std::count_if(sstables->begin(), sstables->end(), [&] (const shared_sstable& sst) { + return gens.count(sst->generation()); + }); + + testlog.info("Closing sstable of generation {}, table set size: {}", sst.generation(), input_sstable_count); + sstables_closed++; + if (input_sstable_count < last_input_sstable_count) { + sstables_closed_during_cleanup++; + last_input_sstable_count = input_sstable_count; + } + })); + } + ssts = {}; // releases references + auto owned_ranges_ptr = make_lw_shared(std::move(owned_token_ranges)); + t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get(); + testlog.info("Cleanup has finished"); + } + + while (sstables_closed != sstables_nr) { + yield().get(); + } + + testlog.info("Closed sstables {}, Closed during cleanup {}", sstables_closed, sstables_closed_during_cleanup); + + BOOST_REQUIRE(sstables_closed == sstables_nr); + BOOST_REQUIRE(sstables_closed_during_cleanup >= sstables_nr / 2); + }); +}