diff --git a/tests/rptest/tests/read_replica_e2e_test.py b/tests/rptest/tests/read_replica_e2e_test.py index 1a635bc9320d0..19f02a42e8952 100644 --- a/tests/rptest/tests/read_replica_e2e_test.py +++ b/tests/rptest/tests/read_replica_e2e_test.py @@ -11,6 +11,7 @@ from rptest.services.cluster import cluster from rptest.clients.default import DefaultClient +from rptest.services.admin import Admin from rptest.services.redpanda import SISettings from rptest.clients.rpk import RpkTool, RpkException from rptest.clients.types import TopicSpec @@ -159,12 +160,12 @@ def __init__(self, test_context: TestContext): cloud_storage_housekeeping_interval_ms=10) self.second_cluster = None - def start_second_cluster(self) -> None: + def start_second_cluster(self, num_brokers=3) -> None: # NOTE: the RRR cluster won't have a bucket, so don't upload. extra_rp_conf = dict(enable_cluster_metadata_upload_loop=False) self.second_cluster = make_redpanda_service( self.test_context, - num_brokers=3, + num_brokers=num_brokers, si_settings=self.rr_settings, extra_rp_conf=extra_rp_conf) self.second_cluster.start(start_si=False) @@ -210,7 +211,9 @@ def create_read_replica_topic_success(self) -> bool: def _setup_read_replica(self, num_messages=0, partition_count=3, - producer_timeout=None) -> None: + producer_timeout=None, + num_source_brokers=3, + num_rrr_brokers=3) -> None: if producer_timeout is None: producer_timeout = 30 @@ -218,7 +221,7 @@ def _setup_read_replica(self, f"{num_messages} msg, {partition_count} " "partitions.") # Create original topic - self.start_redpanda(3, si_settings=self.si_settings) + self.start_redpanda(num_source_brokers, si_settings=self.si_settings) spec = TopicSpec(name=self.topic_name, partition_count=partition_count, replication_factor=3) @@ -237,7 +240,7 @@ def _setup_read_replica(self, str(self.producer.last_acked_offsets)) self.producer.stop() - self.start_second_cluster() + self.start_second_cluster(num_rrr_brokers) # wait until the read replica topic creation succeeds wait_until( @@ -457,6 +460,82 @@ def test_simple_end_to_end( m = f"S3 Bucket usage changed during read replica test: {delta}" assert False, m + def _get_node_assignments(self, admin, topic, partition): + def try_get_partitions(): + try: + res = admin.get_partitions(topic, partition) + return True, res + except: + return False, None + + res = wait_until_result(try_get_partitions, + timeout_sec=30, + backoff_sec=1) + + return [dict(node_id=a["node_id"]) for a in res["replicas"]] + + def _set_partition_assignments(self, topic, partition, assignments, + admin: Admin): + self.logger.info( + f"setting assignments for {topic}/{partition} to {assignments}") + + admin.set_partition_replicas(topic, partition, + [{ + "core": 0, + "node_id": a["node_id"], + } for a in assignments]) + + # fips on S3 is not compatible with path-style urls. TODO remove this once get_cloud_storage_type_and_url_style is fips aware + @skip_fips_mode + @cluster(num_nodes=10, log_allow_list=READ_REPLICA_LOG_ALLOW_LIST) + @matrix(partition_count=[10]) + def test_partition_movement(self, partition_count: int) -> None: + data_timeout = 300 + num_messages = 100000 + self._setup_read_replica(num_messages=num_messages, + partition_count=partition_count, + producer_timeout=300, + num_rrr_brokers=4) + + # Consume from read replica topic and validate + self.start_consumer() + self.run_validation( + min_records=num_messages, + consumer_timeout_sec=data_timeout) # calls self.consumer.stop() + + # Initiate partition movement in RRR cluster + admin = Admin(self.second_cluster) + brokers = admin.get_brokers() + for part_id in range(0, partition_count): + assignments = self._get_node_assignments(admin, self.topic_name, + part_id) + self.logger.info( + f"initial assignments for {self.topic_name}/{part_id}: {assignments}" + ) + replicas = set([r['node_id'] for r in assignments]) + for b in brokers: + if b['node_id'] not in replicas: + assignments[0] = {"node_id": b['node_id']} + break + self.logger.info( + f"new assignments for {self.topic_name}/{part_id}: {assignments}" + ) + self._set_partition_assignments(self.topic_name, part_id, + assignments, admin) + + # Wait until reconfigurations are started and then completed + wait_until(lambda: len(admin.list_reconfigurations()) > 0, + 30, + err_msg="Reconfigurations are not started") + wait_until(lambda: len(admin.list_reconfigurations()) == 0, + 30, + err_msg="Reconfiguration are not completed in time") + + # Consume all messages + self.start_consumer() + self.run_validation(min_records=num_messages, + consumer_timeout_sec=data_timeout) + class ReadReplicasUpgradeTest(EndToEndTest): log_segment_size = 1024 * 1024