Skip to content

Commit

Permalink
rptest: Add ducktape test for partition movement in RRR cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Lazin <[email protected]>
  • Loading branch information
Lazin committed Nov 27, 2024
1 parent ef17c12 commit aece37d
Showing 1 changed file with 84 additions and 5 deletions.
89 changes: 84 additions & 5 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rptest.services.cluster import cluster

from rptest.clients.default import DefaultClient
from rptest.services.admin import Admin
from rptest.services.redpanda import SISettings
from rptest.clients.rpk import RpkTool, RpkException
from rptest.clients.types import TopicSpec
Expand Down Expand Up @@ -159,12 +160,12 @@ def __init__(self, test_context: TestContext):
cloud_storage_housekeeping_interval_ms=10)
self.second_cluster = None

def start_second_cluster(self) -> None:
def start_second_cluster(self, num_brokers=3) -> None:
# NOTE: the RRR cluster won't have a bucket, so don't upload.
extra_rp_conf = dict(enable_cluster_metadata_upload_loop=False)
self.second_cluster = make_redpanda_service(
self.test_context,
num_brokers=3,
num_brokers=num_brokers,
si_settings=self.rr_settings,
extra_rp_conf=extra_rp_conf)
self.second_cluster.start(start_si=False)
Expand Down Expand Up @@ -210,15 +211,17 @@ def create_read_replica_topic_success(self) -> bool:
def _setup_read_replica(self,
num_messages=0,
partition_count=3,
producer_timeout=None) -> None:
producer_timeout=None,
num_source_brokers=3,
num_rrr_brokers=3) -> None:
if producer_timeout is None:
producer_timeout = 30

self.logger.info(f"Setup read replica \"{self.topic_name}\", : "
f"{num_messages} msg, {partition_count} "
"partitions.")
# Create original topic
self.start_redpanda(3, si_settings=self.si_settings)
self.start_redpanda(num_source_brokers, si_settings=self.si_settings)
spec = TopicSpec(name=self.topic_name,
partition_count=partition_count,
replication_factor=3)
Expand All @@ -237,7 +240,7 @@ def _setup_read_replica(self,
str(self.producer.last_acked_offsets))
self.producer.stop()

self.start_second_cluster()
self.start_second_cluster(num_rrr_brokers)

# wait until the read replica topic creation succeeds
wait_until(
Expand Down Expand Up @@ -457,6 +460,82 @@ def test_simple_end_to_end(
m = f"S3 Bucket usage changed during read replica test: {delta}"
assert False, m

def _get_node_assignments(self, admin, topic, partition):
def try_get_partitions():
try:
res = admin.get_partitions(topic, partition)
return True, res
except:
return False, None

res = wait_until_result(try_get_partitions,
timeout_sec=30,
backoff_sec=1)

return [dict(node_id=a["node_id"]) for a in res["replicas"]]

def _set_partition_assignments(self, topic, partition, assignments,
admin: Admin):
self.logger.info(
f"setting assignments for {topic}/{partition} to {assignments}")

admin.set_partition_replicas(topic, partition,
[{
"core": 0,
"node_id": a["node_id"],
} for a in assignments])

# fips on S3 is not compatible with path-style urls. TODO remove this once get_cloud_storage_type_and_url_style is fips aware
@skip_fips_mode
@cluster(num_nodes=10, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST)
@matrix(partition_count=[10])
def test_partition_movement(self, partition_count: int) -> None:
data_timeout = 300
num_messages = 100000
self._setup_read_replica(num_messages=num_messages,
partition_count=partition_count,
producer_timeout=300,
num_rrr_brokers=4)

# Consume from read replica topic and validate
self.start_consumer()
self.run_validation(
min_records=num_messages,
consumer_timeout_sec=data_timeout) # calls self.consumer.stop()

# Initiate partition movement in RRR cluster
admin = Admin(self.second_cluster)
brokers = admin.get_brokers()
for part_id in range(0, partition_count):
assignments = self._get_node_assignments(admin, self.topic_name,
part_id)
self.logger.info(
f"initial assignments for {self.topic_name}/{part_id}: {assignments}"
)
replicas = set([r['node_id'] for r in assignments])
for b in brokers:
if b['node_id'] not in replicas:
assignments[0] = {"node_id": b['node_id']}
break
self.logger.info(
f"new assignments for {self.topic_name}/{part_id}: {assignments}"
)
self._set_partition_assignments(self.topic_name, part_id,
assignments, admin)

# Wait until reconfigurations are started and then completed
wait_until(lambda: len(admin.list_reconfigurations()) > 0,
30,
err_msg="Reconfigurations are not started")
wait_until(lambda: len(admin.list_reconfigurations()) == 0,
30,
err_msg="Reconfiguration are not completed in time")

# Consume all messages
self.start_consumer()
self.run_validation(min_records=num_messages,
consumer_timeout_sec=data_timeout)


class ReadReplicasUpgradeTest(EndToEndTest):
log_segment_size = 1024 * 1024
Expand Down

0 comments on commit aece37d

Please sign in to comment.