Skip to content

Commit

Permalink
rptest: add log_compaction_test.py
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Nov 26, 2024
1 parent bbb4dbf commit 6df7e4b
Showing 1 changed file with 292 additions and 0 deletions.
292 changes: 292 additions & 0 deletions tests/rptest/tests/log_compaction_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import time
import threading

from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer
from rptest.services.redpanda import MetricsEndpoint
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.mode_checks import skip_debug_mode
from rptest.tests.prealloc_nodes import PreallocNodesTest


class LogCompactionTest(PreallocNodesTest, PartitionMovementMixin):
def __init__(self, test_context):
self.test_context = test_context
# Run with small segments, a low retention value and a very frequent compaction interval.
self.extra_rp_conf = {
'log_compaction_interval_ms': 4000,
'log_segment_size': 2 * 1024**2, # 2 MiB
'retention_bytes': 25 * 1024**2, # 25 MiB
'compacted_log_segment_size': 1024**2 # 1 MiB
}
super().__init__(test_context=test_context,
num_brokers=3,
node_prealloc_count=1,
extra_rp_conf=self.extra_rp_conf)

def topic_setup(self, cleanup_policy, key_set_cardinality):
"""
Sets variables and creates topic.
"""
self.msg_size = 1024 # 1 KiB
self.rate_limit = 50 * 1024**2 # 50 MiBps
self.total_data = 100 * 1024**2 # 100 MiB
self.msg_count = int(self.total_data / self.msg_size)
self.tombstone_probability = 0.4
self.partition_count = 10
self.consumer_count = 1
self.cleanup_policy = cleanup_policy
self.key_set_cardinality = key_set_cardinality

# A value below log_compaction_interval_ms (therefore, tombstones that would be compacted away during deduplication will be visibly removed instead)
self.delete_retention_ms = 3000
self.topic_spec = TopicSpec(
name="tapioca",
delete_retention_ms=self.delete_retention_ms,
partition_count=self.partition_count,
cleanup_policy=self.cleanup_policy)
self.client().create_topic(self.topic_spec)

def _start_stress_fibers(self):
"""
Start the stress fibers across the redpanda nodes,
retrying until successful or timeout is reached.
"""
def try_start_stress_fiber():
res = list(
map(
lambda node: self.redpanda._admin.stress_fiber_start(
node,
num_fibers=10,
min_spins_per_scheduling_point=10,
max_spins_per_scheduling_point=100),
self.redpanda.nodes))
return all([r.status_code == 200 for r in res])

wait_until(try_start_stress_fiber,
timeout_sec=30,
backoff_sec=2,
err_msg="Unable to start stress fibers")

def _stop_stress_fibers(self):
"""
Attempt to stop the stress fibers.
"""
try:
for node in self.redpanda.nodes:
self.redpanda._admin.stress_fiber_stop(node)
except:
return

def get_cleanly_compacted_segments(self):
return self.redpanda.metric_sum(
metric_name=
"vectorized_storage_log_cleanly_compacted_segment_total",
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic_spec.name)

def get_segments_marked_tombstone_free(self):
return self.redpanda.metric_sum(
metric_name=
"vectorized_storage_log_segments_marked_tombstone_free_total",
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic_spec.name)

def get_complete_sliding_window_rounds(self):
return self.redpanda.metric_sum(
metric_name=
"vectorized_storage_log_complete_sliding_window_rounds_total",
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic_spec.name)

def get_num_segments(self):
storage = self.redpanda.storage(all_nodes=True)
topic_partitions = storage.partitions("kafka", self.topic_spec.name)
segment_count = 0
for partition in topic_partitions:
segment_count += len(partition.segments)
return segment_count

def produce_and_consume(self):
"""
Creates producer and consumer, a partition movement thread, and triggers
stress fibers on brokers. After producing and consuming, asserts on
latest key-value pairs produced and consumed.
"""

producer = KgoVerifierProducer(
context=self.test_context,
redpanda=self.redpanda,
topic=self.topic_spec.name,
msg_size=self.msg_size,
msg_count=self.msg_count,
rate_limit_bps=self.rate_limit,
key_set_cardinality=self.key_set_cardinality,
tolerate_data_loss=False,
tombstone_probability=self.tombstone_probability,
validate_latest_values=True,
custom_node=self.preallocated_nodes)

consumer = KgoVerifierConsumerGroupConsumer(
self.test_context,
self.redpanda,
self.topic_spec.name,
self.msg_size,
readers=self.consumer_count,
compacted=True,
nodes=self.preallocated_nodes)

# Produce and wait
producer.start()
producer.wait_for_latest_value_map()
producer.wait(timeout_sec=60)

assert producer.produce_status.tombstones_produced > 0
assert producer.produce_status.bad_offsets == 0

# Consume and wait. clean=False to not accidentally remove latest value map.
consumer.start(clean=False)
consumer.wait(timeout_sec=60)

# Clean up
producer.stop()
consumer.stop()

assert consumer.consumer_status.validator.tombstones_consumed > 0
assert consumer.consumer_status.validator.invalid_reads == 0

def validate_log(self):
"""
After several rounds of compaction, restart the brokers,
create a consumer, and assert that no tombstones are consumed.
"""

# Restart each redpanda broker to force roll segments
self.redpanda.restart_nodes(self.redpanda.nodes)

# Sleep until the log has been fully compacted.
self.prev_sliding_window_rounds = -1
self.prev_tombstones_removed = -1

def compaction_has_complete():
# In order to be confident that compaction has settled,
# we check that the number of compaction rounds that
# have occured as well as the number of tombstones records
# removed have stabilized over the period of log_compaction_interval_ms.
new_sliding_window_rounds = self.get_complete_sliding_window_rounds(
)
new_tombstones_removed = self.get_removed_tombstones()
res = self.prev_sliding_window_rounds == new_sliding_window_rounds and self.prev_tombstones_removed == new_tombstones_removed
self.prev_sliding_window_rounds = new_sliding_window_rounds
self.prev_tombstones_removed = new_tombstones_removed
return res

wait_until(
compaction_has_complete,
timeout_sec=120,
backoff_sec=self.extra_rp_conf['log_compaction_interval_ms'] /
1000 * 2,
err_msg="Compaction did not stabilize.")

assert self.get_complete_sliding_window_rounds() > 0
assert self.get_cleanly_compacted_segments() > 0
assert self.get_segments_marked_tombstone_free() > 0

consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
self.topic_spec.name,
self.msg_size,
compacted=True,
loop=False,
validate_latest_values=True,
nodes=self.preallocated_nodes)

# Consume and wait. clean=False to not accidentally remove latest value map.
consumer.start(clean=False)
consumer.wait(timeout_sec=60)

consumer.stop()

# Expect to see 0 tombstones consumed
assert consumer.consumer_status.validator.tombstones_consumed == 0
assert consumer.consumer_status.validator.invalid_reads == 0

@skip_debug_mode
@cluster(num_nodes=4)
@matrix(
cleanup_policy=[
TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE
],
key_set_cardinality=[100, 1000],
)
def compaction_stress_test(self, cleanup_policy, key_set_cardinality):
"""
Uses partition movement, stress fibers, and frequent compaction/garbage collecting to
validate tombstone removal and general compaction behavior.
"""
self.topic_setup(cleanup_policy, key_set_cardinality)

class PartitionMoveExceptionReporter:
exc = None

def background_test_loop(reporter,
fn,
iterations=10,
sleep_sec=1,
allowable_retries=3):
try:
while iterations > 0:
try:
fn()
except Exception as e:
if allowable_retries == 0:
raise e
time.sleep(sleep_sec)
iterations -= 1
allowable_retries -= 1
except Exception as e:
reporter.exc = e

def issue_partition_move():
try:
self._dispatch_random_partition_move(self.topic_spec.name, 0)
self._wait_for_move_in_progress(self.topic_spec.name,
0,
timeout=5)
except Exception as e:
reporter.exc = e

partition_move_thread = threading.Thread(
target=background_test_loop,
args=(PartitionMoveExceptionReporter, issue_partition_move),
kwargs={
'iterations': 5,
'sleep_sec': 1
})

# Start partition movement thread and stress fibers
partition_move_thread.start()
self._start_stress_fibers()

self.produce_and_consume()

self.validate_log()

# Clean up partition movement thread and stress fibers
partition_move_thread.join()
self._stop_stress_fibers()

if PartitionMoveExceptionReporter.exc is not None:
raise PartitionMoveExceptionReporter.exc

0 comments on commit 6df7e4b

Please sign in to comment.