Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: add log_compaction_test.py #24187

Merged
merged 9 commits into from
Dec 9, 2024
12 changes: 6 additions & 6 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ segment_set disk_log_impl::find_sliding_range(
config().ntp(),
_last_compaction_window_start_offset.value(),
_segs.front()->offsets().get_base_offset());

_probe->add_sliding_window_round_complete();
_last_compaction_window_start_offset.reset();
}

Expand Down Expand Up @@ -616,7 +616,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// compacted.
auto seg = segs.front();
co_await internal::mark_segment_as_finished_window_compaction(
seg, true);
seg, true, *_probe);
segs.pop_front();
}
if (segs.empty()) {
Expand Down Expand Up @@ -687,7 +687,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
"{}, resetting sliding window start offset",
config().ntp(),
idx_start_offset);

_probe->add_sliding_window_round_complete();
next_window_start_offset.reset();
}

Expand All @@ -707,7 +707,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// entirely comprised of non-data batches. Mark it as compacted so
// we can progress through compactions.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted);
seg, is_clean_compacted, *_probe);

vlog(
gclog.debug,
Expand All @@ -722,7 +722,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// All data records are already compacted away. Skip to avoid a
// needless rewrite.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted);
seg, is_clean_compacted, *_probe);

vlog(
gclog.trace,
Expand Down Expand Up @@ -822,7 +822,7 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// Mark the segment as completed window compaction, and possibly set the
// clean_compact_timestamp in it's index.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted);
seg, is_clean_compacted, *_probe);

co_await seg->index().flush();
co_await ss::rename_file(
Expand Down
20 changes: 20 additions & 0 deletions src/v/storage/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,26 @@ void probe::setup_metrics(const model::ntp& ntp) {
sm::description("Number of tombstone records removed by compaction "
"due to the delete.retention.ms setting."),
labels),
sm::make_counter(
"cleanly_compacted_segment",
[this] { return _segment_cleanly_compacted; },
sm::description(
"Number of segments cleanly compacted (i.e, had their "
"keys de-duplicated with all previous segments "
"before them to the front of the log)"),
labels),
sm::make_counter(
"segments_marked_tombstone_free",
[this] { return _segments_marked_tombstone_free; },
sm::description("Number of segments that have been verified through "
"the compaction process to be tombstone free."),
labels),
sm::make_counter(
"complete_sliding_window_rounds",
[this] { return _num_rounds_window_compaction; },
sm::description("Number of rounds of sliding window compaction that "
"have been driven to completion."),
labels),
},
{},
{sm::shard_label, partition_label});
Expand Down
10 changes: 10 additions & 0 deletions src/v/storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ class probe {
}

void add_removed_tombstone() { ++_tombstones_removed; }
void add_cleanly_compacted_segment() { ++_segment_cleanly_compacted; }
void add_segment_marked_tombstone_free() {
++_segments_marked_tombstone_free;
}
void add_sliding_window_round_complete() {
++_num_rounds_window_compaction;
}

void batch_parse_error() { ++_batch_parse_errors; }

Expand Down Expand Up @@ -140,6 +147,9 @@ class probe {
uint32_t _batch_write_errors = 0;
double _compaction_ratio = 1.0;
uint64_t _tombstones_removed = 0;
uint64_t _segment_cleanly_compacted = 0;
uint64_t _segments_marked_tombstone_free = 0;
uint64_t _num_rounds_window_compaction = 0;

ssize_t _compaction_removed_bytes = 0;

Expand Down
6 changes: 6 additions & 0 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ ss::future<index_state> deduplicate_segment(
// Set may_have_tombstone_records
new_idx.may_have_tombstone_records = may_have_tombstone_records;

if (
seg->index().may_have_tombstone_records()
&& !may_have_tombstone_records) {
probe.add_segment_marked_tombstone_free();
}

co_return new_idx;
}

Expand Down
26 changes: 19 additions & 7 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@ ss::future<storage::index_state> do_copy_segment_data(
// Set may_have_tombstone_records
new_index.may_have_tombstone_records = may_have_tombstone_records;

if (
seg->index().may_have_tombstone_records()
&& !may_have_tombstone_records) {
pb.add_segment_marked_tombstone_free();
}

co_return new_index;
}

Expand Down Expand Up @@ -725,9 +731,10 @@ ss::future<compaction_result> self_compact_segment(
"Cannot compact an active segment. cfg:{} - segment:{}", cfg, s));
}

const bool may_remove_tombstones = may_have_removable_tombstones(s, cfg);
if (
!s->has_compactible_offsets(cfg)
|| (s->finished_self_compaction() && !may_have_removable_tombstones(s, cfg))) {
|| (s->finished_self_compaction() && !may_remove_tombstones)) {
co_return compaction_result{s->size_bytes()};
}

Expand All @@ -736,7 +743,11 @@ ss::future<compaction_result> self_compact_segment(
= co_await maybe_rebuild_compaction_index(
s, stm_manager, cfg, read_holder, resources, pb);

if (state == compacted_index::recovery_state::already_compacted) {
const bool segment_already_compacted
= (state == compacted_index::recovery_state::already_compacted)
&& !may_remove_tombstones;

if (segment_already_compacted) {
vlog(
gclog.debug,
"detected {} is already compacted",
Expand All @@ -745,10 +756,10 @@ ss::future<compaction_result> self_compact_segment(
co_return compaction_result{s->size_bytes()};
}

vassert(
state == compacted_index::recovery_state::index_recovered,
"Unexpected state {}",
state);
const bool is_valid_index_state
= (state == compacted_index::recovery_state::index_recovered)
|| (state == compacted_index::recovery_state::already_compacted);
vassert(is_valid_index_state, "Unexpected state {}", state);

auto sz_before = s->size_bytes();
auto apply_offset = should_apply_delta_time_offset(feature_table);
Expand Down Expand Up @@ -1137,12 +1148,13 @@ offset_delta_time should_apply_delta_time_offset(
}

ss::future<> mark_segment_as_finished_window_compaction(
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp) {
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp, probe& pb) {
seg->mark_as_finished_windowed_compaction();
if (set_clean_compact_timestamp) {
bool did_set = seg->index().maybe_set_clean_compact_timestamp(
model::timestamp::now());
if (did_set) {
pb.add_cleanly_compacted_segment();
return seg->index().flush();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/segment_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ bool may_have_removable_tombstones(
// Also potentially issues a call to seg->index()->flush(), if the
// `clean_compact_timestamp` was set in the index.
ss::future<> mark_segment_as_finished_window_compaction(
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp);
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp, probe& pb);

template<typename Func>
auto with_segment_reader_handle(segment_reader_handle handle, Func func) {
Expand Down
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/kgo-verifier
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
set -e
git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git
cd /opt/kgo-verifier
git reset --hard 7bbf8c883d1807cdf297fdb589d92f436604772b
git reset --hard bffac1f1358875ee6e91308229d908f40d5fe18e
go mod tidy
make
4 changes: 4 additions & 0 deletions tests/rptest/clients/kafka_cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ def create_topic(self, spec: TopicSpec):
args += ["--config", f"retention.ms={spec.retention_ms}"]
if spec.max_message_bytes:
args += ["--config", f"max.message.bytes={spec.max_message_bytes}"]
if spec.delete_retention_ms:
args += [
"--config", f"delete.retention.ms={spec.delete_retention_ms}"
]
return self._run("kafka-topics.sh", args, desc="create_topic")

def create_topic_partitions(self, topic: str, partitions: int):
Expand Down
Loading
Loading