Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add log lines about removed data #24659

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,22 @@ copy_data_segment_reducer::filter(model::record_batch batch) {

ss::future<ss::stop_iteration> copy_data_segment_reducer::filter_and_append(
model::compression original, model::record_batch b) {
++_stats.batches_processed;
using stop_t = ss::stop_iteration;
const auto record_count_before = b.record_count();
auto to_copy = co_await filter(std::move(b));
if (to_copy == std::nullopt) {
++_stats.batches_discarded;
_stats.records_discarded += record_count_before;
co_return stop_t::no;
}
const auto records_to_remove = record_count_before
- to_copy->record_count();
_stats.records_discarded += records_to_remove;
bool compactible_batch = is_compactible(to_copy.value());
if (!compactible_batch) {
++_stats.non_compactible_batches;
}
if (_compacted_idx && compactible_batch) {
co_await model::for_each_record(
to_copy.value(),
Expand Down
38 changes: 37 additions & 1 deletion src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,40 @@ class copy_data_segment_reducer : public compaction_reducer {
public:
using filter_t = ss::noncopyable_function<ss::future<bool>(
const model::record_batch&, const model::record&, bool)>;
struct stats {
// Total number of batches passed to this reducer.
size_t batches_processed{0};
// Number of batches that were completely removed.
size_t batches_discarded{0};
// Number of records removed by this reducer, including batches that
// were entirely removed.
size_t records_discarded{0};
// Number of batches that were ignored because they are not
// of a compactible type.
size_t non_compactible_batches{0};

// Returns whether any data was removed by this reducer.
bool has_removed_data() const {
return batches_discarded > 0 || records_discarded > 0;
}

friend std::ostream& operator<<(std::ostream& os, const stats& s) {
fmt::print(
os,
"{{ batches_processed: {}, batches_discarded: {}, "
"records_discarded: {}, non_compactible_batches: {} }}",
s.batches_processed,
s.batches_discarded,
s.records_discarded,
s.non_compactible_batches);
return os;
}
};
struct idx_and_stats {
index_state new_idx;
stats reducer_stats;
};

copy_data_segment_reducer(
filter_t f,
segment_appender* a,
Expand All @@ -140,7 +174,7 @@ class copy_data_segment_reducer : public compaction_reducer {
, _as(as) {}

ss::future<ss::stop_iteration> operator()(model::record_batch);
storage::index_state end_of_stream() { return std::move(_idx); }
idx_and_stats end_of_stream() { return {std::move(_idx), _stats}; }

private:
ss::future<ss::stop_iteration>
Expand Down Expand Up @@ -180,6 +214,8 @@ class copy_data_segment_reducer : public compaction_reducer {
/// Allows the reducer to stop early, e.g. in case the partition is being
/// shut down.
ss::abort_source* _as;

stats _stats;
};

class index_rebuilder_reducer : public compaction_reducer {
Expand Down
19 changes: 17 additions & 2 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,25 @@ ss::future<index_state> deduplicate_segment(
inject_reader_failure,
cfg.asrc);

auto new_idx = co_await std::move(rdr).consume(
auto res = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);
const auto& stats = res.reducer_stats;
if (stats.has_removed_data()) {
vlog(
gclog.info,
"Windowed compaction filtering removing data from {}: {}",
seg->filename(),
stats);
} else {
vlog(
gclog.debug,
"Windowed compaction filtering not removing any records from {}: {}",
seg->filename(),
stats);
}

// restore broker timestamp and clean compact timestamp
auto& new_idx = res.new_idx;
new_idx.broker_timestamp = seg->index().broker_timestamp();
new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp();

Expand All @@ -251,7 +266,7 @@ ss::future<index_state> deduplicate_segment(
probe.add_segment_marked_tombstone_free();
}

co_return new_idx;
co_return std::move(new_idx);
}

} // namespace storage
39 changes: 27 additions & 12 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,18 +440,33 @@ ss::future<storage::index_state> do_copy_segment_data(
cfg.asrc);

// create the segment, get the in-memory index for the new segment
auto new_index = co_await create_segment_full_reader(
seg, cfg, pb, std::move(rw_lock_holder))
.consume(std::move(copy_reducer), model::no_timeout)
.finally([&] {
return appender->close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.error,
"Error copying index to new segment:{}",
e);
});
auto res = co_await create_segment_full_reader(
seg, cfg, pb, std::move(rw_lock_holder))
.consume(std::move(copy_reducer), model::no_timeout)
.finally([&] {
return appender->close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.error,
"Error copying index to new segment:{}",
e);
});
});
const auto& stats = res.reducer_stats;
if (stats.has_removed_data()) {
vlog(
gclog.info,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong feelings that this shouldn't be at the info level, but I can imagine that it would make compaction output for a log with a high number of segments much noisier.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely more noisy, but we already put up with an info level message when we create a segment, so maybe not too bad. are there situations where we re-compact a segment many times, or is it usually <= 2x?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I'm trying to be cognizant about noise in making this conditionally a debug log.

It's certainly possible we'll compact a segment multiple times (it's not hard to imagine with high cardinality segments). But generally I'd argue that despite with the noise, if we're removing data, the log line is worth it. There was one recent instance where tracing back a record's removal was impossible because of a lack of observability in this area.

"Self compaction filtering removing data from {}: {}",
seg->filename(),
stats);
} else {
vlog(
gclog.debug,
"Self compaction filtering not removing any records from {}: {}",
seg->filename(),
stats);
}
auto& new_index = res.new_idx;

// restore broker timestamp and clean compact timestamp
new_index.broker_timestamp = old_broker_timestamp;
Expand All @@ -466,7 +481,7 @@ ss::future<storage::index_state> do_copy_segment_data(
pb.add_segment_marked_tombstone_free();
}

co_return new_index;
co_return std::move(new_index);
}

model::record_batch_reader create_segment_full_reader(
Expand Down
Loading