From 33c3aa492e89a9ee7a44c6c5ef2863a22ef5495f Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 24 Dec 2024 16:36:07 -0800 Subject: [PATCH 1/2] storage: collect stats from copy_data_segment_reducer This adds some accounting to the copy_data_segment_reducer to keep track of records and batches that are removed. The goal here is ultimately to provide more visibility into removed records, though this accounting isn't yet plugged into anything. A later change will expose numbers (likely with logging). --- src/v/storage/compaction_reducers.cc | 10 ++++++++++ src/v/storage/compaction_reducers.h | 27 +++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/src/v/storage/compaction_reducers.cc b/src/v/storage/compaction_reducers.cc index 9422b5d0051ce..6e6b1002b8627 100644 --- a/src/v/storage/compaction_reducers.cc +++ b/src/v/storage/compaction_reducers.cc @@ -276,12 +276,22 @@ copy_data_segment_reducer::filter(model::record_batch batch) { ss::future copy_data_segment_reducer::filter_and_append( model::compression original, model::record_batch b) { + ++_stats.batches_processed; using stop_t = ss::stop_iteration; + const auto record_count_before = b.record_count(); auto to_copy = co_await filter(std::move(b)); if (to_copy == std::nullopt) { + ++_stats.batches_discarded; + _stats.records_discarded += record_count_before; co_return stop_t::no; } + const auto records_to_remove = record_count_before + - to_copy->record_count(); + _stats.records_discarded += records_to_remove; bool compactible_batch = is_compactible(to_copy.value()); + if (!compactible_batch) { + ++_stats.non_compactible_batches; + } if (_compacted_idx && compactible_batch) { co_await model::for_each_record( to_copy.value(), diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index a4a935954dda4..8cfe224abcba8 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -121,6 +121,31 @@ class copy_data_segment_reducer : public compaction_reducer { public: using filter_t = ss::noncopyable_function( const model::record_batch&, const model::record&, bool)>; + struct stats { + // Total number of batches passed to this reducer. + size_t batches_processed{0}; + // Number of batches that were completely removed. + size_t batches_discarded{0}; + // Number of records removed by this reducer, including batches that + // were entirely removed. + size_t records_discarded{0}; + // Number of batches that were ignored because they are not + // of a compactible type. + size_t non_compactible_batches{0}; + + friend std::ostream& operator<<(std::ostream& os, const stats& s) { + fmt::print( + os, + "{{ batches_processed: {}, batches_discarded: {}, " + "records_discarded: {}, non_compactible_batches: {} }}", + s.batches_processed, + s.batches_discarded, + s.records_discarded, + s.non_compactible_batches); + return os; + } + }; + copy_data_segment_reducer( filter_t f, segment_appender* a, @@ -180,6 +205,8 @@ class copy_data_segment_reducer : public compaction_reducer { /// Allows the reducer to stop early, e.g. in case the partition is being /// shut down. ss::abort_source* _as; + + stats _stats; }; class index_rebuilder_reducer : public compaction_reducer { From 8c84ffdabf04c0beb15954ac1ab766865aca6c69 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Tue, 24 Dec 2024 17:08:12 -0800 Subject: [PATCH 2/2] storage: add log lines about removed records Removing records from the storage layer is currently very silent, making it difficult to debug. This commit exposes the new copy_data_segment_reducer stats in as a log message, at info level when data is removed, and debug level when not (presumably we mostly care about these stats when data is actually being removed). --- src/v/storage/compaction_reducers.h | 11 +++++- src/v/storage/segment_deduplication_utils.cc | 19 +++++++++- src/v/storage/segment_utils.cc | 39 ++++++++++++++------ 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index 8cfe224abcba8..fbb72f5702fad 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -133,6 +133,11 @@ class copy_data_segment_reducer : public compaction_reducer { // of a compactible type. size_t non_compactible_batches{0}; + // Returns whether any data was removed by this reducer. + bool has_removed_data() const { + return batches_discarded > 0 || records_discarded > 0; + } + friend std::ostream& operator<<(std::ostream& os, const stats& s) { fmt::print( os, @@ -145,6 +150,10 @@ class copy_data_segment_reducer : public compaction_reducer { return os; } }; + struct idx_and_stats { + index_state new_idx; + stats reducer_stats; + }; copy_data_segment_reducer( filter_t f, @@ -165,7 +174,7 @@ class copy_data_segment_reducer : public compaction_reducer { , _as(as) {} ss::future operator()(model::record_batch); - storage::index_state end_of_stream() { return std::move(_idx); } + idx_and_stats end_of_stream() { return {std::move(_idx), _stats}; } private: ss::future diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index 62511506afdd9..5b299e00e3b61 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -235,10 +235,25 @@ ss::future deduplicate_segment( inject_reader_failure, cfg.asrc); - auto new_idx = co_await std::move(rdr).consume( + auto res = co_await std::move(rdr).consume( std::move(copy_reducer), model::no_timeout); + const auto& stats = res.reducer_stats; + if (stats.has_removed_data()) { + vlog( + gclog.info, + "Windowed compaction filtering removing data from {}: {}", + seg->filename(), + stats); + } else { + vlog( + gclog.debug, + "Windowed compaction filtering not removing any records from {}: {}", + seg->filename(), + stats); + } // restore broker timestamp and clean compact timestamp + auto& new_idx = res.new_idx; new_idx.broker_timestamp = seg->index().broker_timestamp(); new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp(); @@ -251,7 +266,7 @@ ss::future deduplicate_segment( probe.add_segment_marked_tombstone_free(); } - co_return new_idx; + co_return std::move(new_idx); } } // namespace storage diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 5ab8341a7d565..b23edfa8d2945 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -440,18 +440,33 @@ ss::future do_copy_segment_data( cfg.asrc); // create the segment, get the in-memory index for the new segment - auto new_index = co_await create_segment_full_reader( - seg, cfg, pb, std::move(rw_lock_holder)) - .consume(std::move(copy_reducer), model::no_timeout) - .finally([&] { - return appender->close().handle_exception( - [](std::exception_ptr e) { - vlog( - gclog.error, - "Error copying index to new segment:{}", - e); - }); + auto res = co_await create_segment_full_reader( + seg, cfg, pb, std::move(rw_lock_holder)) + .consume(std::move(copy_reducer), model::no_timeout) + .finally([&] { + return appender->close().handle_exception( + [](std::exception_ptr e) { + vlog( + gclog.error, + "Error copying index to new segment:{}", + e); }); + }); + const auto& stats = res.reducer_stats; + if (stats.has_removed_data()) { + vlog( + gclog.info, + "Self compaction filtering removing data from {}: {}", + seg->filename(), + stats); + } else { + vlog( + gclog.debug, + "Self compaction filtering not removing any records from {}: {}", + seg->filename(), + stats); + } + auto& new_index = res.new_idx; // restore broker timestamp and clean compact timestamp new_index.broker_timestamp = old_broker_timestamp; @@ -466,7 +481,7 @@ ss::future do_copy_segment_data( pb.add_segment_marked_tombstone_free(); } - co_return new_index; + co_return std::move(new_index); } model::record_batch_reader create_segment_full_reader(