From 1f5f3633ee14522c0962daf18500a972228c2f60 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 5 Dec 2024 19:57:35 -0500 Subject: [PATCH 1/9] `storage`: check for `may_have_tombstones` in `self_compact_segment` Segments that may have tombstone records in them should still be considered eligible for self compaction. An early return statement was missing a check for this condition. Add it so that tombstones will be properly removed for a segment eligible for removal. Also adjusts a `vassert()` to account for this case. --- src/v/storage/segment_utils.cc | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 7cfcca47b9dd..3d130303518a 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -725,9 +725,10 @@ ss::future self_compact_segment( "Cannot compact an active segment. cfg:{} - segment:{}", cfg, s)); } + const bool may_remove_tombstones = may_have_removable_tombstones(s, cfg); if ( !s->has_compactible_offsets(cfg) - || (s->finished_self_compaction() && !may_have_removable_tombstones(s, cfg))) { + || (s->finished_self_compaction() && !may_remove_tombstones)) { co_return compaction_result{s->size_bytes()}; } @@ -736,7 +737,11 @@ ss::future self_compact_segment( = co_await maybe_rebuild_compaction_index( s, stm_manager, cfg, read_holder, resources, pb); - if (state == compacted_index::recovery_state::already_compacted) { + const bool segment_already_compacted + = (state == compacted_index::recovery_state::already_compacted) + && !may_remove_tombstones; + + if (segment_already_compacted) { vlog( gclog.debug, "detected {} is already compacted", @@ -745,10 +750,10 @@ ss::future self_compact_segment( co_return compaction_result{s->size_bytes()}; } - vassert( - state == compacted_index::recovery_state::index_recovered, - "Unexpected state {}", - state); + const bool is_valid_index_state + = (state == compacted_index::recovery_state::index_recovered) + || (state == compacted_index::recovery_state::already_compacted); + vassert(is_valid_index_state, "Unexpected state {}", state); auto sz_before = s->size_bytes(); auto apply_offset = should_apply_delta_time_offset(feature_table); From 5b50f25e9be30bb58bbffc371323da39d2718b05 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Tue, 26 Nov 2024 13:11:37 -0500 Subject: [PATCH 2/9] `storage`: add `_segment_cleanly_compacted` to `probe` A metric that measures the number of segments that have been cleanly compacted (i.e, had their keys de-duplicated with all previous segments before them to the front of the log). --- src/v/storage/disk_log_impl.cc | 8 ++++---- src/v/storage/probe.cc | 8 ++++++++ src/v/storage/probe.h | 2 ++ src/v/storage/segment_utils.cc | 3 ++- src/v/storage/segment_utils.h | 2 +- 5 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 6a138a938fa4..6c5a9c9d314c 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -616,7 +616,7 @@ ss::future disk_log_impl::sliding_window_compact( // compacted. auto seg = segs.front(); co_await internal::mark_segment_as_finished_window_compaction( - seg, true); + seg, true, *_probe); segs.pop_front(); } if (segs.empty()) { @@ -707,7 +707,7 @@ ss::future disk_log_impl::sliding_window_compact( // entirely comprised of non-data batches. Mark it as compacted so // we can progress through compactions. co_await internal::mark_segment_as_finished_window_compaction( - seg, is_clean_compacted); + seg, is_clean_compacted, *_probe); vlog( gclog.debug, @@ -722,7 +722,7 @@ ss::future disk_log_impl::sliding_window_compact( // All data records are already compacted away. Skip to avoid a // needless rewrite. co_await internal::mark_segment_as_finished_window_compaction( - seg, is_clean_compacted); + seg, is_clean_compacted, *_probe); vlog( gclog.trace, @@ -822,7 +822,7 @@ ss::future disk_log_impl::sliding_window_compact( // Mark the segment as completed window compaction, and possibly set the // clean_compact_timestamp in it's index. co_await internal::mark_segment_as_finished_window_compaction( - seg, is_clean_compacted); + seg, is_clean_compacted, *_probe); co_await seg->index().flush(); co_await ss::rename_file( diff --git a/src/v/storage/probe.cc b/src/v/storage/probe.cc index ed650aba358c..ef4a1328401f 100644 --- a/src/v/storage/probe.cc +++ b/src/v/storage/probe.cc @@ -201,6 +201,14 @@ void probe::setup_metrics(const model::ntp& ntp) { sm::description("Number of tombstone records removed by compaction " "due to the delete.retention.ms setting."), labels), + sm::make_counter( + "cleanly_compacted_segment", + [this] { return _segment_cleanly_compacted; }, + sm::description( + "Number of segments cleanly compacted (i.e, had their " + "keys de-duplicated with all previous segments " + "before them to the front of the log)"), + labels), }, {}, {sm::shard_label, partition_label}); diff --git a/src/v/storage/probe.h b/src/v/storage/probe.h index dcc464dd740e..88608ba5c212 100644 --- a/src/v/storage/probe.h +++ b/src/v/storage/probe.h @@ -98,6 +98,7 @@ class probe { } void add_removed_tombstone() { ++_tombstones_removed; } + void add_cleanly_compacted_segment() { ++_segment_cleanly_compacted; } void batch_parse_error() { ++_batch_parse_errors; } @@ -140,6 +141,7 @@ class probe { uint32_t _batch_write_errors = 0; double _compaction_ratio = 1.0; uint64_t _tombstones_removed = 0; + uint64_t _segment_cleanly_compacted = 0; ssize_t _compaction_removed_bytes = 0; diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 3d130303518a..6cba638d9cd2 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -1142,12 +1142,13 @@ offset_delta_time should_apply_delta_time_offset( } ss::future<> mark_segment_as_finished_window_compaction( - ss::lw_shared_ptr seg, bool set_clean_compact_timestamp) { + ss::lw_shared_ptr seg, bool set_clean_compact_timestamp, probe& pb) { seg->mark_as_finished_windowed_compaction(); if (set_clean_compact_timestamp) { bool did_set = seg->index().maybe_set_clean_compact_timestamp( model::timestamp::now()); if (did_set) { + pb.add_cleanly_compacted_segment(); return seg->index().flush(); } } diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index 51f6e51041ea..df60ec3a07e4 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -284,7 +284,7 @@ bool may_have_removable_tombstones( // Also potentially issues a call to seg->index()->flush(), if the // `clean_compact_timestamp` was set in the index. ss::future<> mark_segment_as_finished_window_compaction( - ss::lw_shared_ptr seg, bool set_clean_compact_timestamp); + ss::lw_shared_ptr seg, bool set_clean_compact_timestamp, probe& pb); template auto with_segment_reader_handle(segment_reader_handle handle, Func func) { From 8d66388ecf56ff7e1a10f0df62f4b56312bdf8a6 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Tue, 26 Nov 2024 13:07:19 -0500 Subject: [PATCH 3/9] `storage`: add `_segments_marked_tombstone_free` to `probe` A metric that measures the number of segments that have been verified through the compaction process to be tombstone free. --- src/v/storage/probe.cc | 6 ++++++ src/v/storage/probe.h | 4 ++++ src/v/storage/segment_deduplication_utils.cc | 6 ++++++ src/v/storage/segment_utils.cc | 6 ++++++ 4 files changed, 22 insertions(+) diff --git a/src/v/storage/probe.cc b/src/v/storage/probe.cc index ef4a1328401f..2e50738a4360 100644 --- a/src/v/storage/probe.cc +++ b/src/v/storage/probe.cc @@ -209,6 +209,12 @@ void probe::setup_metrics(const model::ntp& ntp) { "keys de-duplicated with all previous segments " "before them to the front of the log)"), labels), + sm::make_counter( + "segments_marked_tombstone_free", + [this] { return _segments_marked_tombstone_free; }, + sm::description("Number of segments that have been verified through " + "the compaction process to be tombstone free."), + labels), }, {}, {sm::shard_label, partition_label}); diff --git a/src/v/storage/probe.h b/src/v/storage/probe.h index 88608ba5c212..5579995d6bec 100644 --- a/src/v/storage/probe.h +++ b/src/v/storage/probe.h @@ -99,6 +99,9 @@ class probe { void add_removed_tombstone() { ++_tombstones_removed; } void add_cleanly_compacted_segment() { ++_segment_cleanly_compacted; } + void add_segment_marked_tombstone_free() { + ++_segments_marked_tombstone_free; + } void batch_parse_error() { ++_batch_parse_errors; } @@ -142,6 +145,7 @@ class probe { double _compaction_ratio = 1.0; uint64_t _tombstones_removed = 0; uint64_t _segment_cleanly_compacted = 0; + uint64_t _segments_marked_tombstone_free = 0; ssize_t _compaction_removed_bytes = 0; diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index fd4d058e77ad..62511506afdd 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -245,6 +245,12 @@ ss::future deduplicate_segment( // Set may_have_tombstone_records new_idx.may_have_tombstone_records = may_have_tombstone_records; + if ( + seg->index().may_have_tombstone_records() + && !may_have_tombstone_records) { + probe.add_segment_marked_tombstone_free(); + } + co_return new_idx; } diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 6cba638d9cd2..5ab8341a7d56 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -460,6 +460,12 @@ ss::future do_copy_segment_data( // Set may_have_tombstone_records new_index.may_have_tombstone_records = may_have_tombstone_records; + if ( + seg->index().may_have_tombstone_records() + && !may_have_tombstone_records) { + pb.add_segment_marked_tombstone_free(); + } + co_return new_index; } From 87d442bf2a992eae2a139b8f33d122a19094ec1a Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Tue, 26 Nov 2024 13:08:43 -0500 Subject: [PATCH 4/9] `storage`: add `_num_rounds_window_compaction` to `probe` A metric that measures the number of rounds of sliding window compaction that have been driven to completion. --- src/v/storage/disk_log_impl.cc | 4 ++-- src/v/storage/probe.cc | 6 ++++++ src/v/storage/probe.h | 4 ++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 6c5a9c9d314c..0c68787b89e3 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -522,7 +522,7 @@ segment_set disk_log_impl::find_sliding_range( config().ntp(), _last_compaction_window_start_offset.value(), _segs.front()->offsets().get_base_offset()); - + _probe->add_sliding_window_round_complete(); _last_compaction_window_start_offset.reset(); } @@ -687,7 +687,7 @@ ss::future disk_log_impl::sliding_window_compact( "{}, resetting sliding window start offset", config().ntp(), idx_start_offset); - + _probe->add_sliding_window_round_complete(); next_window_start_offset.reset(); } diff --git a/src/v/storage/probe.cc b/src/v/storage/probe.cc index 2e50738a4360..de7c96109fb6 100644 --- a/src/v/storage/probe.cc +++ b/src/v/storage/probe.cc @@ -215,6 +215,12 @@ void probe::setup_metrics(const model::ntp& ntp) { sm::description("Number of segments that have been verified through " "the compaction process to be tombstone free."), labels), + sm::make_counter( + "complete_sliding_window_rounds", + [this] { return _num_rounds_window_compaction; }, + sm::description("Number of rounds of sliding window compaction that " + "have been driven to completion."), + labels), }, {}, {sm::shard_label, partition_label}); diff --git a/src/v/storage/probe.h b/src/v/storage/probe.h index 5579995d6bec..449e6aefeff2 100644 --- a/src/v/storage/probe.h +++ b/src/v/storage/probe.h @@ -102,6 +102,9 @@ class probe { void add_segment_marked_tombstone_free() { ++_segments_marked_tombstone_free; } + void add_sliding_window_round_complete() { + ++_num_rounds_window_compaction; + } void batch_parse_error() { ++_batch_parse_errors; } @@ -146,6 +149,7 @@ class probe { uint64_t _tombstones_removed = 0; uint64_t _segment_cleanly_compacted = 0; uint64_t _segments_marked_tombstone_free = 0; + uint64_t _num_rounds_window_compaction = 0; ssize_t _compaction_removed_bytes = 0; From 85bea2b00b2ec5035d7f3a679e555f9c227db950 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 4 Dec 2024 16:42:55 -0500 Subject: [PATCH 5/9] `tests`: bump `kgo-verifier` version and support new features See https://github.com/redpanda-data/kgo-verifier/pull/60, which added `--tombstone-probability`, `--compacted`, `--validate-latest-values` as input parameters. --- tests/docker/ducktape-deps/kgo-verifier | 2 +- .../rptest/services/kgo_verifier_services.py | 51 +++++++++++++++---- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/tests/docker/ducktape-deps/kgo-verifier b/tests/docker/ducktape-deps/kgo-verifier index 176b425778ad..75b4be9f8d29 100644 --- a/tests/docker/ducktape-deps/kgo-verifier +++ b/tests/docker/ducktape-deps/kgo-verifier @@ -2,6 +2,6 @@ set -e git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git cd /opt/kgo-verifier -git reset --hard 7bbf8c883d1807cdf297fdb589d92f436604772b +git reset --hard bffac1f1358875ee6e91308229d908f40d5fe18e go mod tidy make diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index efbd4cab4393..286d904c5383 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -228,6 +228,7 @@ def clean_node(self, node: ClusterNode): self._redpanda.logger.info(f"{self.__class__.__name__}.clean_node") node.account.kill_process("kgo-verifier", clean_shutdown=False) node.account.remove("valid_offsets*json", True) + node.account.remove("latest_value*json", True) node.account.remove(f"/tmp/{self.__class__.__name__}*", True) def _remote(self, node, action, timeout=60): @@ -475,8 +476,8 @@ class ValidatorStatus: def __init__(self, name: str, valid_reads: int, invalid_reads: int, out_of_scope_invalid_reads: int, - max_offsets_consumed: Optional[int], lost_offsets: Dict[str, - int]): + max_offsets_consumed: Optional[int], + lost_offsets: Dict[str, int], tombstones_consumed: int): # Validator name is just a unique name per worker thread in kgo-verifier: useful in logging # but we mostly don't care self.name = name @@ -486,6 +487,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int, self.out_of_scope_invalid_reads = out_of_scope_invalid_reads self.max_offsets_consumed = max_offsets_consumed self.lost_offsets = lost_offsets + self.tombstones_consumed = tombstones_consumed @property def total_reads(self): @@ -510,7 +512,8 @@ def __str__(self): f"valid_reads={self.valid_reads}, " \ f"invalid_reads={self.invalid_reads}, " \ f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \ - f"lost_offsets={self.lost_offsets}>" + f"lost_offsets={self.lost_offsets}, " \ + f"tombstones_consumed={self.tombstones_consumed}>" class ConsumerStatus: @@ -531,7 +534,8 @@ def __init__(self, 'out_of_scope_invalid_reads': 0, 'name': "", 'max_offsets_consumed': dict(), - 'lost_offsets': dict() + 'lost_offsets': dict(), + 'tombstones_consumed': 0 } self.validator = ValidatorStatus(**validator) @@ -571,7 +575,9 @@ def __init__(self, msgs_per_producer_id=None, max_buffered_records=None, tolerate_data_loss=False, - tolerate_failed_produce=False): + tolerate_failed_produce=False, + tombstone_probability=0.0, + validate_latest_values=False): super(KgoVerifierProducer, self).__init__(context, redpanda, topic, msg_size, custom_node, debug_logs, trace_logs, username, password, @@ -590,6 +596,8 @@ def __init__(self, self._max_buffered_records = max_buffered_records self._tolerate_data_loss = tolerate_data_loss self._tolerate_failed_produce = tolerate_failed_produce + self._tombstone_probability = tombstone_probability + self._validate_latest_values = validate_latest_values @property def produce_status(self): @@ -697,6 +705,11 @@ def start_node(self, node, clean=False): if self._tolerate_failed_produce: cmd += " --tolerate-failed-produce" + if self._tombstone_probability is not None: + cmd += f" --tombstone-probability {self._tombstone_probability}" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ProduceStatus) @@ -745,7 +758,9 @@ def __init__( username: Optional[str] = None, password: Optional[str] = None, enable_tls: Optional[bool] = False, - use_transactions: Optional[bool] = False): + use_transactions: Optional[bool] = False, + compacted: Optional[bool] = False, + validate_latest_values: Optional[bool] = False): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) self._max_msgs = max_msgs @@ -755,6 +770,8 @@ def __init__( self._tolerate_data_loss = tolerate_data_loss self._producer = producer self._use_transactions = use_transactions + self._compacted = compacted + self._validate_latest_values = validate_latest_values def start_node(self, node, clean=False): if clean: @@ -778,6 +795,11 @@ def start_node(self, node, clean=False): cmd += " --tolerate-data-loss" if self._use_transactions: cmd += " --use-transactions" + if self._compacted: + cmd += " --compacted" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -877,7 +899,9 @@ def __init__(self, continuous=False, tolerate_data_loss=False, group_name=None, - use_transactions=False): + use_transactions=False, + compacted=False, + validate_latest_values=False): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) @@ -889,6 +913,8 @@ def __init__(self, self._continuous = continuous self._tolerate_data_loss = tolerate_data_loss self._use_transactions = use_transactions + self._compacted = compacted + self._validate_latest_values = validate_latest_values def start_node(self, node, clean=False): if clean: @@ -915,6 +941,11 @@ def start_node(self, node, clean=False): cmd += f" --consumer_group_name {self._group_name}" if self._use_transactions: cmd += " --use-transactions" + if self._compacted: + cmd += " --compacted" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -933,7 +964,8 @@ def __init__(self, active=False, failed_transactions=0, aborted_transaction_msgs=0, - fails=0): + fails=0, + tombstones_produced=0): self.topic = topic self.sent = sent self.acked = acked @@ -947,7 +979,8 @@ def __init__(self, self.failed_transactions = failed_transactions self.aborted_transaction_messages = aborted_transaction_msgs self.fails = fails + self.tombstones_produced = tombstones_produced def __str__(self): l = self.latency - return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {l['p50']}/{l['p90']}/{l['p99']}>" + return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {self.tombstones_produced} {l['p50']}/{l['p90']}/{l['p99']}>" From 15ab6a85579871913a9e30280d3052ebc0a3e5c9 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 18 Nov 2024 09:32:23 -0500 Subject: [PATCH 6/9] `rptest`: add `wait_for_latest_value_map()` to `kgo` --- tests/rptest/services/kgo_verifier_services.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index 286d904c5383..42a283b05c1e 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -652,6 +652,15 @@ def wait_for_offset_map(self): backoff_sec=1) self._status_thread.raise_on_error() + def wait_for_latest_value_map(self): + # Producer worker aims to checkpoint every 5 seconds, so we should see this promptly. + self._redpanda.wait_until(lambda: self._status_thread.errored or all( + node.account.exists(f"latest_value_{self._topic}.json") + for node in self.nodes), + timeout_sec=15, + backoff_sec=1) + self._status_thread.raise_on_error() + def is_complete(self): return self._status.acked >= self._msg_count From ac273cc3213c5512988a880f12324f2b0cd38ebd Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 15 Nov 2024 10:49:06 -0500 Subject: [PATCH 7/9] `rptest`: use `delete.retention.ms` in `KafkaCliTools.create_topic()` --- tests/rptest/clients/kafka_cli_tools.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index f5b707c8221b..3f6ae9884f80 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -152,6 +152,10 @@ def create_topic(self, spec: TopicSpec): args += ["--config", f"retention.ms={spec.retention_ms}"] if spec.max_message_bytes: args += ["--config", f"max.message.bytes={spec.max_message_bytes}"] + if spec.delete_retention_ms: + args += [ + "--config", f"delete.retention.ms={spec.delete_retention_ms}" + ] return self._run("kafka-topics.sh", args, desc="create_topic") def create_topic_partitions(self, topic: str, partitions: int): From 985beffa2034e543d8685e9196a390f5e03772c0 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Thu, 28 Nov 2024 09:56:38 -0500 Subject: [PATCH 8/9] `rptest`: add `_wait_for_file_on_nodes()` to `KgoVerifierProducer` Also add an error message to the `wait_until()` call. --- .../rptest/services/kgo_verifier_services.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index 42a283b05c1e..dc2c6c2017e5 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -643,23 +643,24 @@ def wait_for_acks(self, count, timeout_sec, backoff_sec): backoff_sec=backoff_sec) self._status_thread.raise_on_error() + def _wait_for_file_on_nodes(self, file_name): + self._redpanda.wait_until( + lambda: self._status_thread.errored or all( + node.account.exists(file_name) for node in self.nodes), + timeout_sec=15, + backoff_sec=1, + err_msg=f"Timed out waiting for {file_name} to be created") + self._status_thread.raise_on_error() + def wait_for_offset_map(self): # Producer worker aims to checkpoint every 5 seconds, so we should see this promptly. - self._redpanda.wait_until(lambda: self._status_thread.errored or all( - node.account.exists(f"valid_offsets_{self._topic}.json") - for node in self.nodes), - timeout_sec=15, - backoff_sec=1) - self._status_thread.raise_on_error() + offset_map_file_name = f"valid_offsets_{self._topic}.json" + self._wait_for_file_on_nodes(offset_map_file_name) def wait_for_latest_value_map(self): # Producer worker aims to checkpoint every 5 seconds, so we should see this promptly. - self._redpanda.wait_until(lambda: self._status_thread.errored or all( - node.account.exists(f"latest_value_{self._topic}.json") - for node in self.nodes), - timeout_sec=15, - backoff_sec=1) - self._status_thread.raise_on_error() + value_map_file_name = f"latest_value_{self._topic}.json" + self._wait_for_file_on_nodes(value_map_file_name) def is_complete(self): return self._status.acked >= self._msg_count From d238b1c9b606758520149e2f17aee241a8368a2d Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 15 Nov 2024 11:24:34 -0500 Subject: [PATCH 9/9] `rptest`: add `log_compaction_test.py` --- tests/rptest/tests/log_compaction_test.py | 262 ++++++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 tests/rptest/tests/log_compaction_test.py diff --git a/tests/rptest/tests/log_compaction_test.py b/tests/rptest/tests/log_compaction_test.py new file mode 100644 index 000000000000..61171447ac4c --- /dev/null +++ b/tests/rptest/tests/log_compaction_test.py @@ -0,0 +1,262 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import time +import threading + +from ducktape.mark import matrix +from ducktape.utils.util import wait_until +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer +from rptest.services.redpanda import MetricsEndpoint +from rptest.tests.partition_movement import PartitionMovementMixin +from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.mode_checks import skip_debug_mode +from rptest.tests.prealloc_nodes import PreallocNodesTest + + +class LogCompactionTest(PreallocNodesTest, PartitionMovementMixin): + def __init__(self, test_context): + self.test_context = test_context + # Run with small segments, a low retention value and a very frequent compaction interval. + self.extra_rp_conf = { + 'log_compaction_interval_ms': 4000, + 'log_segment_size': 2 * 1024**2, # 2 MiB + 'retention_bytes': 25 * 1024**2, # 25 MiB + 'compacted_log_segment_size': 1024**2 # 1 MiB + } + super().__init__(test_context=test_context, + num_brokers=3, + node_prealloc_count=1, + extra_rp_conf=self.extra_rp_conf) + + def topic_setup(self, cleanup_policy, key_set_cardinality): + """ + Sets variables and creates topic. + """ + self.msg_size = 1024 # 1 KiB + self.rate_limit = 50 * 1024**2 # 50 MiBps + self.total_data = 100 * 1024**2 # 100 MiB + self.msg_count = int(self.total_data / self.msg_size) + self.tombstone_probability = 0.4 + self.partition_count = 10 + self.cleanup_policy = cleanup_policy + self.key_set_cardinality = key_set_cardinality + + # A value below log_compaction_interval_ms (therefore, tombstones that would be compacted away during deduplication will be visibly removed instead) + self.delete_retention_ms = 3000 + self.topic_spec = TopicSpec( + name="tapioca", + delete_retention_ms=self.delete_retention_ms, + partition_count=self.partition_count, + cleanup_policy=self.cleanup_policy) + self.client().create_topic(self.topic_spec) + + def get_removed_tombstones(self): + return self.redpanda.metric_sum( + metric_name="vectorized_storage_log_tombstones_removed_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_spec.name) + + def get_cleanly_compacted_segments(self): + return self.redpanda.metric_sum( + metric_name= + "vectorized_storage_log_cleanly_compacted_segment_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_spec.name) + + def get_segments_marked_tombstone_free(self): + return self.redpanda.metric_sum( + metric_name= + "vectorized_storage_log_segments_marked_tombstone_free_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_spec.name) + + def get_complete_sliding_window_rounds(self): + return self.redpanda.metric_sum( + metric_name= + "vectorized_storage_log_complete_sliding_window_rounds_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_spec.name) + + def produce_and_consume(self): + """ + Creates producer and consumer. Asserts that tombstones are seen + in the consumer log. + """ + + producer = KgoVerifierProducer( + context=self.test_context, + redpanda=self.redpanda, + topic=self.topic_spec.name, + debug_logs=True, + trace_logs=True, + msg_size=self.msg_size, + msg_count=self.msg_count, + rate_limit_bps=self.rate_limit, + key_set_cardinality=self.key_set_cardinality, + tolerate_data_loss=False, + tombstone_probability=self.tombstone_probability, + validate_latest_values=True, + custom_node=self.preallocated_nodes) + + # Produce and wait + producer.start() + producer.wait_for_latest_value_map() + producer.wait(timeout_sec=180) + + assert producer.produce_status.tombstones_produced > 0 + assert producer.produce_status.bad_offsets == 0 + + consumer = KgoVerifierSeqConsumer(self.test_context, + self.redpanda, + self.topic_spec.name, + self.msg_size, + debug_logs=True, + trace_logs=True, + compacted=True, + loop=False, + nodes=self.preallocated_nodes) + + # Consume and wait. clean=False to not accidentally remove latest value map. + consumer.start(clean=False) + consumer.wait(timeout_sec=180) + + # Clean up + producer.stop() + consumer.stop() + + assert consumer.consumer_status.validator.tombstones_consumed > 0 + assert consumer.consumer_status.validator.invalid_reads == 0 + + def validate_log(self): + """ + After several rounds of compaction, restart the brokers, + create a consumer, and assert that no tombstones are consumed. + Latest key-value pairs in the log are verified in KgoVerifier. + """ + + # Restart each redpanda broker to force roll segments + self.redpanda.restart_nodes(self.redpanda.nodes) + + # Sleep until the log has been fully compacted. + self.prev_sliding_window_rounds = -1 + self.prev_tombstones_removed = -1 + + def compaction_has_completed(): + # In order to be confident that compaction has settled, + # we check that the number of compaction rounds that + # have occured as well as the number of tombstones records + # removed have stabilized over some period longer than + # log_compaction_interval_ms (and expected time for compaction to complete). + new_sliding_window_rounds = self.get_complete_sliding_window_rounds( + ) + new_tombstones_removed = self.get_removed_tombstones() + res = self.prev_sliding_window_rounds == new_sliding_window_rounds and self.prev_tombstones_removed == new_tombstones_removed + self.prev_sliding_window_rounds = new_sliding_window_rounds + self.prev_tombstones_removed = new_tombstones_removed + return res + + wait_until( + compaction_has_completed, + timeout_sec=120, + backoff_sec=self.extra_rp_conf['log_compaction_interval_ms'] / + 1000 * 4, + err_msg="Compaction did not stabilize.") + + assert self.get_complete_sliding_window_rounds() > 0 + assert self.get_cleanly_compacted_segments() > 0 + assert self.get_segments_marked_tombstone_free() > 0 + + consumer = KgoVerifierSeqConsumer(self.test_context, + self.redpanda, + self.topic_spec.name, + self.msg_size, + debug_logs=True, + trace_logs=True, + compacted=True, + loop=False, + validate_latest_values=True, + nodes=self.preallocated_nodes) + + # Consume and wait. clean=False to not accidentally remove latest value map. + consumer.start(clean=False) + consumer.wait(timeout_sec=180) + + consumer.stop() + + # Expect to see 0 tombstones consumed + assert consumer.consumer_status.validator.tombstones_consumed == 0 + assert consumer.consumer_status.validator.invalid_reads == 0 + + @skip_debug_mode + @cluster(num_nodes=4) + @matrix( + cleanup_policy=[ + TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE + ], + key_set_cardinality=[100, 1000], + ) + def compaction_stress_test(self, cleanup_policy, key_set_cardinality): + """ + Uses partition movement and frequent compaction/garbage collecting to + validate tombstone removal and general compaction behavior. + """ + self.topic_setup(cleanup_policy, key_set_cardinality) + + class PartitionMoveExceptionReporter: + exc = None + + def background_test_loop(reporter, + fn, + iterations=10, + sleep_sec=1, + allowable_retries=3): + try: + while iterations > 0: + try: + fn() + except Exception as e: + if allowable_retries == 0: + raise e + time.sleep(sleep_sec) + iterations -= 1 + allowable_retries -= 1 + except Exception as e: + reporter.exc = e + + def issue_partition_move(): + try: + self._dispatch_random_partition_move(self.topic_spec.name, 0) + self._wait_for_move_in_progress(self.topic_spec.name, + 0, + timeout=5) + except Exception as e: + reporter.exc = e + + partition_move_thread = threading.Thread( + target=background_test_loop, + args=(PartitionMoveExceptionReporter, issue_partition_move), + kwargs={ + 'iterations': 5, + 'sleep_sec': 1 + }) + + # Start partition movement thread + partition_move_thread.start() + + self.produce_and_consume() + + self.validate_log() + + # Clean up partition movement thread + partition_move_thread.join() + + if PartitionMoveExceptionReporter.exc is not None: + raise PartitionMoveExceptionReporter.exc