diff --git a/tests/rptest/clients/kubectl.py b/tests/rptest/clients/kubectl.py index 314647caa3687..adfa771bf9535 100644 --- a/tests/rptest/clients/kubectl.py +++ b/tests/rptest/clients/kubectl.py @@ -147,6 +147,25 @@ def _install(self): self._kubectl_installed = True return + def _run(self, cmd): + # Log and run + self._redpanda.logger.info(cmd) + res = subprocess.check_output(cmd) + return res + + def run_kube_command(self, kcmd): + # prepare + self._install() + _ssh_prefix = self._ssh_prefix() + _kubectl = ["kubectl", '-n', self._namespace] + + # Make it universal for str/list + _kcmd = kcmd if isinstance(kcmd, list) else kcmd.split() + # Format command + cmd = _ssh_prefix + _kubectl + _kcmd + # Log and run + return self._run(cmd) + def exec(self, remote_cmd): self._install() ssh_prefix = self._ssh_prefix() @@ -154,9 +173,7 @@ def exec(self, remote_cmd): 'kubectl', 'exec', '-n', self._namespace, '-c', 'redpanda', f'rp-{self._cluster_id}-0', '--', 'bash', '-c' ] + ['"' + remote_cmd + '"'] - self._redpanda.logger.info(cmd) - res = subprocess.check_output(cmd) - return res + return self._run(cmd) def exists(self, remote_path): self._install() diff --git a/tests/rptest/redpanda_cloud_tests/high_throughput_test.py b/tests/rptest/redpanda_cloud_tests/high_throughput_test.py index b549a0ff5d52f..a9298afce3d5c 100644 --- a/tests/rptest/redpanda_cloud_tests/high_throughput_test.py +++ b/tests/rptest/redpanda_cloud_tests/high_throughput_test.py @@ -444,6 +444,10 @@ def get_node(self): idx = random.randrange(len(self.cluster.nodes)) return self.cluster.nodes[idx] + def get_broker_pod(self): + idx = random.randrange(len(self.redpanda.pods)) + return self.redpanda.pods[idx] + @cluster(num_nodes=2) def test_throughput_simple(self): # create default topics @@ -822,15 +826,17 @@ def test_decommission_and_add(self): self.free_preallocated_nodes() def stage_decommission_and_add(self): - node, node_id, node_str = self.get_node() + pod = self.get_broker_pod() + pod_id = pod.slot_id + pod_str = pod.name - def topic_partitions_on_node(): + def topic_partitions_on_pod(): try: parts = self.redpanda.partitions(self.topic) except StopIteration: return 0 n = sum([ - 1 if r.account == node.account else 0 for p in parts + 1 if r.account == pod.account else 0 for p in parts for r in p.replicas ]) self.logger.debug(f"Partitions in the node-topic: {n}") @@ -839,44 +845,44 @@ def topic_partitions_on_node(): # create default topics self._create_default_topics() - nt_partitions_before = topic_partitions_on_node() + nt_partitions_before = topic_partitions_on_pod() self.logger.info( - f"Decommissioning node {node_str}, partitions: {nt_partitions_before}" + f"Decommissioning node {pod_str}, partitions: {nt_partitions_before}" ) decomm_time = time.monotonic() admin = self.redpanda._admin - admin.decommission_broker(node_id) + admin.decommission_broker(pod_id) waiter = NodeDecommissionWaiter(self.redpanda, - node_id, + pod_id, self.logger, progress_timeout=120) waiter.wait_for_removal() - self.redpanda.stop_node(node) - assert topic_partitions_on_node() == 0 + self.redpanda.stop_node(pod) + assert topic_partitions_on_pod() == 0 decomm_time = time.monotonic() - decomm_time self.logger.info(f"Adding a node") - self.redpanda.clean_node(node, + self.redpanda.clean_node(pod, preserve_logs=True, preserve_current_install=True) - self.redpanda.start_node(node, + self.redpanda.start_node(pod, auto_assign_node_id=False, omit_seeds_on_idx_one=False) wait_until(self.redpanda.healthy, timeout_sec=600, backoff_sec=1) - new_node_id = self.redpanda.node_id(node, force_refresh=True) + new_node_id = self.redpanda.node_id(pod, force_refresh=True) self.logger.info( f"Node added, new node_id: {new_node_id}, waiting for {int(nt_partitions_before/2)} partitions to move there in {int(decomm_time*2)} s" ) wait_until( - lambda: topic_partitions_on_node() > nt_partitions_before / 2, + lambda: topic_partitions_on_pod() > nt_partitions_before / 2, timeout_sec=max(120, decomm_time * 2), backoff_sec=2, err_msg= f"{int(nt_partitions_before/2)} partitions failed to move to node {new_node_id} in {max(60, decomm_time*2)} s" ) - self.logger.info(f"{topic_partitions_on_node()} partitions moved") + self.logger.info(f"{topic_partitions_on_pod()} partitions moved") @cluster(num_nodes=3, log_allow_list=NOS3_LOG_ALLOW_LIST) def test_cloud_cache_thrash(self): diff --git a/tests/rptest/services/cloud_broker.py b/tests/rptest/services/cloud_broker.py new file mode 100644 index 0000000000000..d2cc8a3a37c2b --- /dev/null +++ b/tests/rptest/services/cloud_broker.py @@ -0,0 +1,27 @@ +class CloudBroker(): + def __init__(self, pod, kubectl, logger) -> None: + self.logger = logger + # Validate + if not isinstance(pod, dict) or pod['kind'] != 'Pod': + self.logger.error("Invalid pod data provided") + # Metadata + self.operating_system = 'k8s' + self._meta = pod['metadata'] + self.name = self._meta['name'] + self.slot_id = int( + self._meta['labels']['operator.redpanda.com/node-id']) + self.uuid = self._meta['uid'] + + # Save other data + self._spec = pod['spec'] + self._status = pod['status'] + + # save client + self._kubeclient = kubectl + + # Backward compatibility + self.account = self._meta + + # Backward compatibility + def ssh_output(self, cmd): + return self._kubeclient.exec(cmd) diff --git a/tests/rptest/services/redpanda.py b/tests/rptest/services/redpanda.py index e3810b91f8c24..53da466513adc 100644 --- a/tests/rptest/services/redpanda.py +++ b/tests/rptest/services/redpanda.py @@ -55,6 +55,7 @@ from rptest.services.admin import Admin from rptest.services.redpanda_installer import RedpandaInstaller, VERSION_RE as RI_VERSION_RE, int_tuple as ri_int_tuple from rptest.services.redpanda_cloud import CloudCluster, CloudTierName, get_config_profile_name +from rptest.services.cloud_broker import CloudBroker from rptest.services.rolling_restarter import RollingRestarter from rptest.services.storage import ClusterStorage, NodeStorage, NodeCacheStorage from rptest.services.storage_failure_injection import FailureInjectionConfig @@ -1414,26 +1415,6 @@ class RedpandaServiceCloud(RedpandaServiceK8s): use `RedpandaServiceCloud`. """ - # This parameter used in make_redpanda_service - # and multiple other services for detection - # TODO: Update it to GLOBAL_CLOUD_CLUSTER_CONFIG - GLOBAL_CLOUD_OAUTH_URL = 'cloud_oauth_url' - # Deprecated. Left for future reference - # GLOBAL_CLOUD_OAUTH_CLIENT_ID = 'cloud_oauth_client_id' - # GLOBAL_CLOUD_OAUTH_CLIENT_SECRET = 'cloud_oauth_client_secret' - # GLOBAL_CLOUD_OAUTH_AUDIENCE = 'cloud_oauth_audience' - # GLOBAL_CLOUD_API_URL = 'cloud_api_url' - # GLOBAL_CLOUD_CLUSTER_ID = 'cloud_cluster_id' - # GLOBAL_CLOUD_DELETE_CLUSTER = 'cloud_delete_cluster' - # GLOBAL_TELEPORT_AUTH_SERVER = 'cloud_teleport_auth_server' - # GLOBAL_TELEPORT_BOT_TOKEN = 'cloud_teleport_bot_token' - # GLOBAL_CLOUD_CLUSTER_REGION = 'cloud_cluster_region' - # GLOBAL_CLOUD_CLUSTER_PROVIDER = 'cloud_provider' - # GLOBAL_CLOUD_CLUSTER_TYPE = 'cloud_cluster_type' - # GLOBAL_CLOUD_CLUSTER_NETWORK = 'cloud_cluster_network' - # GLOBAL_CLOUD_PEER_VPC_ID = 'cloud_cluster_peer_vpc_id' - # GLOBAL_CLOUD_PEER_OWNER_ID = 'cloud_cluster_peer_owner_id' - GLOBAL_CLOUD_CLUSTER_CONFIG = 'cloud_cluster' def __init__(self, @@ -1479,47 +1460,19 @@ def __init__(self, # log cloud cluster id self.logger.debug(f"initial cluster_id: {self._cc_config['id']}") - # Removed in favor to serialization - # self._cloud_oauth_url = context.globals.get( - # self.GLOBAL_CLOUD_OAUTH_URL, None) - # self._cloud_oauth_client_id = context.globals.get( - # self.GLOBAL_CLOUD_OAUTH_CLIENT_ID, None) - # self._cloud_oauth_client_secret = context.globals.get( - # self.GLOBAL_CLOUD_OAUTH_CLIENT_SECRET, None) - # self._cloud_oauth_audience = context.globals.get( - # self.GLOBAL_CLOUD_OAUTH_AUDIENCE, None) - # self._cloud_teleport_proxy = context.globals.get( - # self.GLOBAL_TELEPORT_AUTH_SERVER, None) - # self._cloud_teleport_bot_token = context.globals.get( - # self.GLOBAL_TELEPORT_BOT_TOKEN, None) - # self._cloud_api_url = context.globals.get(self.GLOBAL_CLOUD_API_URL, - # None) - # self._cloud_cluster_id = context.globals.get( - # self.GLOBAL_CLOUD_CLUSTER_ID, '') - # self._cloud_delete_cluster = context.globals.get( - # self.GLOBAL_CLOUD_DELETE_CLUSTER, True) - # self._cloud_cluster_provider = context.globals.get( - # self.GLOBAL_CLOUD_CLUSTER_PROVIDER, "AWS").upper() - # self._cloud_cluster_region = context.globals.get( - # self.GLOBAL_CLOUD_CLUSTER_REGION, "us-west-2") - # self._cloud_cluster_type = context.globals.get( - # self.GLOBAL_CLOUD_CLUSTER_TYPE, "FMC").upper() - - # self._cloud_cluster_network = context.globals.get( - # self.GLOBAL_CLOUD_CLUSTER_NETWORK, "public").lower() - # self._cloud_peer_vpc_id = context.globals.get( - # self.GLOBAL_CLOUD_PEER_VPC_ID, None) - # self._cloud_peer_owner_id = context.globals.get( - # self.GLOBAL_CLOUD_PEER_OWNER_ID, None) - + # Create cluster class self._cloud_cluster = CloudCluster( context, self.logger, self._cc_config, provider_config=self._provider_config) - + # Prepare kubectl self._kubectl = None + # Backward compatibility with RedpandaService + # Fake out sasl_enabled callable + self.sasl_enabled = lambda: True + # Always true for Cloud Cluster self._dedicated_nodes = True self.logger.info( 'ResourceSettings: setting dedicated_nodes=True because serving from redpanda cloud' @@ -1551,6 +1504,24 @@ def start(self, **kwargs): tp_proxy=self._cloud_cluster.config.teleport_auth_server, tp_token=self._cloud_cluster.config.teleport_bot_token) + # Get pods and form node list + self.pods = [] + _r = self._kubectl.run_kube_command("get pods -o json") + _pods = json.loads(_r.decode()) + for p in _pods['items']: + if not p['metadata']['name'].startswith( + f"rp-{self._cloud_cluster.config.id}"): + continue + else: + _node = CloudBroker(p, self._kubectl, self.logger) + self.pods.append(_node) + + def get_node_by_id(self, id): + for p in self.pods: + if p.slot_id == id: + return p + return None + def stop_node(self, node, **kwargs): pass