Skip to content

Commit

Permalink
rptest: add log_compaction_test.py
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Nov 20, 2024
1 parent 5189757 commit 79a40c6
Showing 1 changed file with 249 additions and 0 deletions.
249 changes: 249 additions & 0 deletions tests/rptest/tests/log_compaction_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import time
import threading

from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierConsumerGroupConsumer
from rptest.services.redpanda import MetricsEndpoint
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.mode_checks import skip_debug_mode


class LogCompactionTest(RedpandaTest, PartitionMovementMixin):
def __init__(self, test_context):
self.test_context = test_context
# Run with small segments, a very frequent compaction interval,
# and a short retention time for segments.
# log_retention_ms should be > 2*log_compaction_interval_ms to
# allow tombstones to be visibly removed during compaction process.
self.extra_rp_conf = {
'log_compaction_interval_ms': 4000,
'log_segment_size': 2 * 1024**2, # 2 MiB
'log_retention_ms': 15000,
'compacted_log_segment_size': 1024**2 # 1 MiB
}
super().__init__(test_context=test_context,
num_brokers=3,
extra_rp_conf=self.extra_rp_conf)

def prologue(self, cleanup_policy, key_set_cardinality):
"""
Sets variables and creates topic.
"""
self.msg_size = 1024 # 1 KiB
self.rate_limit = 100 * 1024**2 # 100 MiBps
self.total_data = 500 * 1024**2 # 500 MiB
self.msg_count = int(self.total_data / self.msg_size)
self.tombstone_probability = 0.4
self.partition_count = 10
self.consumer_count = 1
self.cleanup_policy = cleanup_policy
self.key_set_cardinality = key_set_cardinality

# A value below log_compaction_interval_ms (therefore, tombstones that would be compacted away during deduplication will be visibly removed instead)
self.delete_retention_ms = 3000
self.topic_spec = TopicSpec(
name="tapioca",
delete_retention_ms=self.delete_retention_ms,
partition_count=self.partition_count,
cleanup_policy=self.cleanup_policy)
self.client().create_topic(self.topic_spec)

def _start_stress_fibers(self):
"""
Start the stress fibers across the redpanda nodes,
retrying until successful or timeout is reached.
"""
def try_start_stress_fiber():
res = list(
map(
lambda node: self.redpanda._admin.stress_fiber_start(
node,
num_fibers=10,
min_spins_per_scheduling_point=10,
max_spins_per_scheduling_point=100),
self.redpanda.nodes))
return all([r.status_code == 200 for r in res])

wait_until(try_start_stress_fiber,
timeout_sec=30,
backoff_sec=2,
err_msg="Unable to start stress fibers")

def _stop_stress_fibers(self):
"""
Attempt to stop the stress fibers.
"""
try:
for node in self.redpanda.nodes:
self.redpanda._admin.stress_fiber_stop(node)
except:
return

def get_removed_tombstones(self):
return self.redpanda.metric_sum(
metric_name="vectorized_storage_log_tombstones_removed_total",
metrics_endpoint=MetricsEndpoint.METRICS)

def part_one(self):
"""
Creates producer and consumer, a partition movement thread, and triggers
stress fibers on brokers. After producing and consuming, asserts on
latest key-value pairs produced and consumed.
"""
class PartitionMoveExceptionReporter:
exc = None

def background_test_loop(reporter,
fn,
iterations=10,
sleep_sec=1,
allowable_retries=3):
try:
while iterations > 0:
try:
fn()
except Exception as e:
if allowable_retries == 0:
raise e
time.sleep(sleep_sec)
iterations -= 1
allowable_retries -= 1
except Exception as e:
reporter.exc = e

def issue_partition_move():
try:
self._dispatch_random_partition_move(self.topic_spec.name, 0)
self._wait_for_move_in_progress(self.topic_spec.name,
0,
timeout=5)
except Exception as e:
reporter.exc = e

partition_move_thread = threading.Thread(
target=background_test_loop,
args=(PartitionMoveExceptionReporter, issue_partition_move),
kwargs={
'iterations': 5,
'sleep_sec': 1
})

producer = KgoVerifierProducer(
context=self.test_context,
redpanda=self.redpanda,
topic=self.topic_spec.name,
msg_size=self.msg_size,
msg_count=self.msg_count,
rate_limit_bps=self.rate_limit,
key_set_cardinality=self.key_set_cardinality,
tolerate_data_loss=False,
tombstone_probability=self.tombstone_probability,
validate_latest_values=True)

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
self.topic_spec.name,
self.msg_size,
readers=self.consumer_count,
compacted=True)

# Start partition movement thread
partition_move_thread.start()

# Start stress fibers
self._start_stress_fibers()

# Produce and wait
producer.start()
producer.wait(timeout_sec=60)

assert producer.produce_status.tombstones_produced > 0
assert producer.produce_status.bad_offsets == 0

# KgoVerifier seems to have some issues consuming with stress fibers/partition move fibers running.
partition_move_thread.join()
self._stop_stress_fibers()

# Consume and wait
consumer.start()
consumer.wait(timeout_sec=180)

# Clean up
producer.stop()
consumer.stop()

if PartitionMoveExceptionReporter.exc is not None:
raise PartitionMoveExceptionReporter.exc

assert consumer.consumer_status.validator.tombstones_consumed > 0
assert consumer.consumer_status.validator.invalid_reads == 0

producer.free()
consumer.free()

def part_two(self):
"""
After several rounds of compaction, restart the brokers,
create a consumer, and assert that no tombstones are consumed.
"""

# By this point, a non-zero amount of tombstones should have been removed due to
# delete.retention.ms
assert self.get_removed_tombstones() > 0

# Restart each redpanda broker to force roll segments
self.redpanda.restart_nodes(self.redpanda.nodes)

# Sleep until all tombstones are definitely eligible for removal
timeout = self.extra_rp_conf['log_compaction_interval_ms'] / 1000 * 2
time.sleep(timeout)

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
self.topic_spec.name,
self.msg_size,
readers=self.consumer_count,
compacted=True,
validate_latest_values=True)

# Consume and wait
consumer.start()
consumer.wait(timeout_sec=180)

# Expect to see 0 tombstones consumed
assert consumer.consumer_status.validator.tombstones_consumed == 0

consumer.stop()

#@skip_debug_mode
@cluster(num_nodes=5)
@matrix(
cleanup_policy=[
TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE
],
key_set_cardinality=[100, 1000],
)
def compaction_stress_test(self, cleanup_policy, key_set_cardinality):
"""
Uses partition movement, stress fibers, and frequent compaction/garbage collecting to
validate tombstone removal and general compaction behavior.
"""
self.prologue(cleanup_policy, key_set_cardinality)

self.part_one()

self.part_two()

0 comments on commit 79a40c6

Please sign in to comment.