From 79a40c61f854e3341b73b3fcb6f3da4aa09159f7 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 15 Nov 2024 11:24:34 -0500 Subject: [PATCH] `rptest`: add `log_compaction_test.py` --- tests/rptest/tests/log_compaction_test.py | 249 ++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 tests/rptest/tests/log_compaction_test.py diff --git a/tests/rptest/tests/log_compaction_test.py b/tests/rptest/tests/log_compaction_test.py new file mode 100644 index 0000000000000..fe4e8653dc1a1 --- /dev/null +++ b/tests/rptest/tests/log_compaction_test.py @@ -0,0 +1,249 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import time +import threading + +from ducktape.mark import matrix +from ducktape.utils.util import wait_until +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierConsumerGroupConsumer +from rptest.services.redpanda import MetricsEndpoint +from rptest.tests.partition_movement import PartitionMovementMixin +from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.mode_checks import skip_debug_mode + + +class LogCompactionTest(RedpandaTest, PartitionMovementMixin): + def __init__(self, test_context): + self.test_context = test_context + # Run with small segments, a very frequent compaction interval, + # and a short retention time for segments. + # log_retention_ms should be > 2*log_compaction_interval_ms to + # allow tombstones to be visibly removed during compaction process. + self.extra_rp_conf = { + 'log_compaction_interval_ms': 4000, + 'log_segment_size': 2 * 1024**2, # 2 MiB + 'log_retention_ms': 15000, + 'compacted_log_segment_size': 1024**2 # 1 MiB + } + super().__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf=self.extra_rp_conf) + + def prologue(self, cleanup_policy, key_set_cardinality): + """ + Sets variables and creates topic. + """ + self.msg_size = 1024 # 1 KiB + self.rate_limit = 100 * 1024**2 # 100 MiBps + self.total_data = 500 * 1024**2 # 500 MiB + self.msg_count = int(self.total_data / self.msg_size) + self.tombstone_probability = 0.4 + self.partition_count = 10 + self.consumer_count = 1 + self.cleanup_policy = cleanup_policy + self.key_set_cardinality = key_set_cardinality + + # A value below log_compaction_interval_ms (therefore, tombstones that would be compacted away during deduplication will be visibly removed instead) + self.delete_retention_ms = 3000 + self.topic_spec = TopicSpec( + name="tapioca", + delete_retention_ms=self.delete_retention_ms, + partition_count=self.partition_count, + cleanup_policy=self.cleanup_policy) + self.client().create_topic(self.topic_spec) + + def _start_stress_fibers(self): + """ + Start the stress fibers across the redpanda nodes, + retrying until successful or timeout is reached. + """ + def try_start_stress_fiber(): + res = list( + map( + lambda node: self.redpanda._admin.stress_fiber_start( + node, + num_fibers=10, + min_spins_per_scheduling_point=10, + max_spins_per_scheduling_point=100), + self.redpanda.nodes)) + return all([r.status_code == 200 for r in res]) + + wait_until(try_start_stress_fiber, + timeout_sec=30, + backoff_sec=2, + err_msg="Unable to start stress fibers") + + def _stop_stress_fibers(self): + """ + Attempt to stop the stress fibers. + """ + try: + for node in self.redpanda.nodes: + self.redpanda._admin.stress_fiber_stop(node) + except: + return + + def get_removed_tombstones(self): + return self.redpanda.metric_sum( + metric_name="vectorized_storage_log_tombstones_removed_total", + metrics_endpoint=MetricsEndpoint.METRICS) + + def part_one(self): + """ + Creates producer and consumer, a partition movement thread, and triggers + stress fibers on brokers. After producing and consuming, asserts on + latest key-value pairs produced and consumed. + """ + class PartitionMoveExceptionReporter: + exc = None + + def background_test_loop(reporter, + fn, + iterations=10, + sleep_sec=1, + allowable_retries=3): + try: + while iterations > 0: + try: + fn() + except Exception as e: + if allowable_retries == 0: + raise e + time.sleep(sleep_sec) + iterations -= 1 + allowable_retries -= 1 + except Exception as e: + reporter.exc = e + + def issue_partition_move(): + try: + self._dispatch_random_partition_move(self.topic_spec.name, 0) + self._wait_for_move_in_progress(self.topic_spec.name, + 0, + timeout=5) + except Exception as e: + reporter.exc = e + + partition_move_thread = threading.Thread( + target=background_test_loop, + args=(PartitionMoveExceptionReporter, issue_partition_move), + kwargs={ + 'iterations': 5, + 'sleep_sec': 1 + }) + + producer = KgoVerifierProducer( + context=self.test_context, + redpanda=self.redpanda, + topic=self.topic_spec.name, + msg_size=self.msg_size, + msg_count=self.msg_count, + rate_limit_bps=self.rate_limit, + key_set_cardinality=self.key_set_cardinality, + tolerate_data_loss=False, + tombstone_probability=self.tombstone_probability, + validate_latest_values=True) + + consumer = KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + self.topic_spec.name, + self.msg_size, + readers=self.consumer_count, + compacted=True) + + # Start partition movement thread + partition_move_thread.start() + + # Start stress fibers + self._start_stress_fibers() + + # Produce and wait + producer.start() + producer.wait(timeout_sec=60) + + assert producer.produce_status.tombstones_produced > 0 + assert producer.produce_status.bad_offsets == 0 + + # KgoVerifier seems to have some issues consuming with stress fibers/partition move fibers running. + partition_move_thread.join() + self._stop_stress_fibers() + + # Consume and wait + consumer.start() + consumer.wait(timeout_sec=180) + + # Clean up + producer.stop() + consumer.stop() + + if PartitionMoveExceptionReporter.exc is not None: + raise PartitionMoveExceptionReporter.exc + + assert consumer.consumer_status.validator.tombstones_consumed > 0 + assert consumer.consumer_status.validator.invalid_reads == 0 + + producer.free() + consumer.free() + + def part_two(self): + """ + After several rounds of compaction, restart the brokers, + create a consumer, and assert that no tombstones are consumed. + """ + + # By this point, a non-zero amount of tombstones should have been removed due to + # delete.retention.ms + assert self.get_removed_tombstones() > 0 + + # Restart each redpanda broker to force roll segments + self.redpanda.restart_nodes(self.redpanda.nodes) + + # Sleep until all tombstones are definitely eligible for removal + timeout = self.extra_rp_conf['log_compaction_interval_ms'] / 1000 * 2 + time.sleep(timeout) + + consumer = KgoVerifierConsumerGroupConsumer( + self.test_context, + self.redpanda, + self.topic_spec.name, + self.msg_size, + readers=self.consumer_count, + compacted=True, + validate_latest_values=True) + + # Consume and wait + consumer.start() + consumer.wait(timeout_sec=180) + + # Expect to see 0 tombstones consumed + assert consumer.consumer_status.validator.tombstones_consumed == 0 + + consumer.stop() + + #@skip_debug_mode + @cluster(num_nodes=5) + @matrix( + cleanup_policy=[ + TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE + ], + key_set_cardinality=[100, 1000], + ) + def compaction_stress_test(self, cleanup_policy, key_set_cardinality): + """ + Uses partition movement, stress fibers, and frequent compaction/garbage collecting to + validate tombstone removal and general compaction behavior. + """ + self.prologue(cleanup_policy, key_set_cardinality) + + self.part_one() + + self.part_two()