From 52e13839277001dcf2271116a2048e4b0c74219e Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 25 Jul 2024 17:30:38 -0700 Subject: [PATCH 1/2] Fix node recommendation when CPU cluster cannot be determined Signed-off-by: Partho Sarthi --- .../cloud_api/databricks_aws.py | 9 +- .../cloud_api/databricks_azure.py | 9 +- .../cloud_api/dataproc.py | 9 +- .../cloud_api/sp_types.py | 9 ++ .../common/cluster_inference.py | 58 ++++++---- .../rapids/qualification.py | 100 +++++++----------- .../resources/qualification-conf.yaml | 2 +- .../tools/cluster_config_recommender.py | 49 ++++++--- 8 files changed, 142 insertions(+), 103 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py index 9aac70668..467ec05b0 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_aws.py @@ -248,9 +248,12 @@ def _init_nodes(self): # construct worker nodes info when cluster is inactive executors_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0 if num_workers != executors_cnt: - self.logger.warning('Cluster configuration: `executors` count %d does not match the ' - '`num_workers` value %d. Using generated names.', executors_cnt, - num_workers) + if not self.is_inferred: + # this warning should be raised only when the cluster is not inferred, i.e. user has provided the + # cluster configuration with num_workers explicitly set + self.logger.warning('Cluster configuration: `executors` count %d does not match the ' + '`num_workers` value %d. Using the `num_workers` value.', executors_cnt, + num_workers) worker_nodes_from_conf = self.generate_node_configurations(num_workers) if num_workers == 0 and self.props.get_value('node_type_id') is None: # if there are no worker nodes and no node_type_id, then we cannot proceed diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py index e96d961a4..78edee7a7 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/databricks_azure.py @@ -307,9 +307,12 @@ def _init_nodes(self): # construct worker nodes info when cluster is inactive executors_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0 if num_workers != executors_cnt: - self.logger.warning('Cluster configuration: `executors` count %d does not match the ' - '`num_workers` value %d. Using generated names.', executors_cnt, - num_workers) + if not self.is_inferred: + # this warning should be raised only when the cluster is not inferred, i.e. user has provided the + # cluster configuration with num_workers explicitly set + self.logger.warning('Cluster configuration: `executors` count %d does not match the ' + '`num_workers` value %d. Using generated names.', executors_cnt, + num_workers) worker_nodes_from_conf = self.generate_node_configurations(num_workers) if num_workers == 0 and self.props.get_value('node_type_id') is None: # if there are no worker nodes and no node_type_id, then we cannot proceed diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py index 847f96115..9fe1525ec 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py @@ -505,9 +505,12 @@ def _init_nodes(self): worker_nodes_from_conf = self.props.get_value_silent('config', 'workerConfig', 'instanceNames') instance_names_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0 if worker_cnt != instance_names_cnt: - self.logger.warning('Cluster configuration: `instanceNames` count %d does not ' - 'match the `numInstances` value %d. Using generated names.', - instance_names_cnt, worker_cnt) + if not self.is_inferred: + # this warning should be raised only when the cluster is not inferred, i.e. user has provided the + # cluster configuration with num_workers explicitly set + self.logger.warning('Cluster configuration: `instanceNames` count %d does not ' + 'match the `numInstances` value %d. Using generated names.', + instance_names_cnt, worker_cnt) worker_nodes_from_conf = self.generate_node_configurations(worker_cnt) # create workers array for worker_node in worker_nodes_from_conf: diff --git a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py index ad296c859..f1abc746e 100644 --- a/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py +++ b/user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py @@ -1259,6 +1259,15 @@ def generate_node_configurations(self, num_executors: int, render_args: dict = N node_config = self._generate_node_configuration(render_args) return [node_config for _ in range(num_executors)] + def get_cluster_shape_str(self) -> str: + """ + Returns a string representation of the cluster shape. + """ + master_node = self.get_master_node().instance_type + executor_node = self.get_worker_node(0).instance_type + num_executors = self.get_nodes_cnt(SparkNodeType.WORKER) + return f'' + @dataclass class ClusterReshape(ClusterGetAccessor): diff --git a/user_tools/src/spark_rapids_pytools/common/cluster_inference.py b/user_tools/src/spark_rapids_pytools/common/cluster_inference.py index 17a8824cf..5d821cbb1 100644 --- a/user_tools/src/spark_rapids_pytools/common/cluster_inference.py +++ b/user_tools/src/spark_rapids_pytools/common/cluster_inference.py @@ -15,7 +15,7 @@ """This module provides functionality for cluster inference""" from dataclasses import dataclass, field - +from enum import Enum from typing import Optional from logging import Logger @@ -26,17 +26,29 @@ from spark_rapids_pytools.common.utilities import ToolLogging +class ClusterType(Enum): + """ + Enum for cluster types + """ + CPU = 'CPU' + GPU = 'GPU' + + def __str__(self): + return self.value + + @dataclass class ClusterInference: """ - Class for inferring cluster information and constructing CPU clusters. + Class for inferring cluster information and constructing CPU or GPU clusters. :param platform: The platform on which the cluster inference is performed. """ platform: PlatformBase = field(default=None, init=True) + cluster_type: ClusterType = field(default=ClusterType.CPU, init=True) logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster_inference'), init=False) - def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict]: + def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict]: """ Extract information about drivers and executors from input json """ @@ -53,10 +65,15 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict cores_per_executor = cluster_info_df.get('Cores Per Executor') execs_per_node = cluster_info_df.get('Num Executors Per Node') total_cores_per_node = execs_per_node * cores_per_executor + if pd.isna(total_cores_per_node): + self.logger.info('For App ID: %s, Unable to infer %s cluster. Reason - Total cores per node cannot' + ' be determined.', cluster_info_df['App ID'], self.cluster_type) + return None # TODO - need to account for number of GPUs per executor executor_instance = self.platform.get_matching_executor_instance(total_cores_per_node) if pd.isna(executor_instance): - self.logger.info('Unable to infer CPU cluster. No matching executor instance found for vCPUs = %s', + self.logger.info('For App ID: %s, Unable to infer %s cluster. Reason - No matching executor instance ' + 'found for num cores = %d', cluster_info_df['App ID'], self.cluster_type, total_cores_per_node) return None return { @@ -66,21 +83,26 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict 'NUM_EXECUTOR_NODES': int(num_executor_nodes) } - def infer_cpu_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]: + def infer_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]: """ - Infer CPU cluster configuration based on json input and return the constructed cluster object. + Infer CPU or GPU cluster configuration based input cluster df and return the constructed cluster object. """ - if len(cluster_info_df) != 1: - self.logger.info('Cannot infer CPU cluster from event logs. Only single cluster is supported.') - return None + try: + if len(cluster_info_df) != 1: + self.logger.info('Cannot infer %s cluster from event logs. Only single cluster is supported.', + self.cluster_type) + return None - # Extract cluster information from parsed logs. Above check ensures df contains single row. - cluster_template_args = self.get_cluster_template_args(cluster_info_df.iloc[0]) - if cluster_template_args is None: - return None - # Construct cluster configuration using platform-specific logic - cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args) - if cluster_conf is None: + # Extract cluster information from parsed logs. Above check ensures df contains single row. + cluster_template_args = self._get_cluster_template_args(cluster_info_df.iloc[0]) + if cluster_template_args is None: + return None + # Construct cluster configuration using platform-specific logic + cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args) + if cluster_conf is None: + return None + cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False) + return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True) + except Exception as e: # pylint: disable=broad-except + self.logger.error('Error while inferring cluster: %s', str(e)) return None - cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False) - return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 63475d67b..1e7f1dd89 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -17,14 +17,14 @@ import re from dataclasses import dataclass, field from math import ceil -from typing import Any, List, Callable, Optional +from typing import Any, List, Callable, Optional, Dict import numpy as np import pandas as pd from tabulate import tabulate -from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, SparkNodeType -from spark_rapids_pytools.common.cluster_inference import ClusterInference +from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, ClusterBase +from spark_rapids_pytools.common.cluster_inference import ClusterInference, ClusterType from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer, convert_dict_to_camel_case from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator @@ -137,7 +137,7 @@ def generate_report(self, report_content.append(f'{app_name} tool found no records to show.') if self.filter_apps_count > 0: - self.comments.append('\'Estimated GPU Speedup Category\' assumes the user is using the node type ' + self.comments.append('**Estimated GPU Speedup Category assumes the user is using the node type ' 'recommended and config recommendations with the same size cluster as was used ' 'with the CPU side.') @@ -713,8 +713,7 @@ def process_df_for_stdout(raw_df): # info columns, even if `cluster_info_df` is empty. df = pd.merge(df, cluster_info_df, on=['App Name', 'App ID'], how='left') if len(cluster_info_df) > 0: - self.__infer_cluster_and_update_savings(cluster_info_df) - self.__infer_cluster_for_auto_tuning(cluster_info_df) + self._infer_clusters_for_apps(cluster_info_df) except Exception as e: # pylint: disable=broad-except self.logger.error('Unable to process cluster information. Cost savings will be disabled. ' 'Reason - %s:%s', type(e).__name__, e) @@ -764,68 +763,47 @@ def _init_rapids_arg_list_for_qual(self) -> List[str]: rapids_threads_args = self._get_rapids_threads_count(self.name) return ['--per-sql'] + rapids_threads_args + self._create_autotuner_rapids_args() - def __infer_cluster_and_update_savings(self, cluster_info_df: pd.DataFrame): + def _infer_cluster_per_app(self, cluster_info_df: pd.DataFrame, + cluster_type: ClusterType) -> Dict[str, Optional[ClusterBase]]: """ - Update savings if CPU cluster can be inferred and corresponding GPU cluster can be defined. - :param cluster_info_df: Parsed cluster information. - """ - # we actually want to use the inferred version over what user passed if possible - if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported: - self.logger.info('Inferred Cluster but cpu node was already set') - return - - # Infer the CPU cluster from the cluster information - cpu_cluster_obj = ClusterInference(platform=self.ctxt.platform).infer_cpu_cluster(cluster_info_df) - if cpu_cluster_obj is None: - return - - # Log the inferred cluster information and set the context - self._log_inferred_cluster_info(cpu_cluster_obj) - self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj) + Infers clusters for each app in the DataFrame and returns a dictionary of Cluster objects. - # Process gpu cluster arguments and update savings calculations flag - offline_cluster_opts = self.wrapper_options.get('migrationClustersProps', {}) - enable_savings_flag = self._process_gpu_cluster_args(offline_cluster_opts) - self._set_savings_calculations_flag(enable_savings_flag) + :param cluster_info_df: DataFrame containing cluster information for each app. + :param cluster_type: The type of cluster to infer. + :return: A dictionary where the key is the app ID and the value is the inferred Cluster object. + """ + cluster_inference_obj = ClusterInference(platform=self.ctxt.platform, cluster_type=cluster_type) + return { + row['App ID']: cluster_inference_obj.infer_cluster(cluster_info_df.iloc[[index]]) + for index, row in cluster_info_df.iterrows() + } - # this function is a lot like __infer_cluster_and_update_savings but handles clusters - # on a per application basis and was explicitly copied to not have to deal with - # changing the cost savings flow at the same time. Ideally in the future they - # get combined back together. - def __infer_cluster_for_auto_tuning(self, cluster_info_df: pd.DataFrame): + def _infer_clusters_for_apps(self, cluster_info_df: pd.DataFrame) -> None: + """ + Infer CPU and GPU clusters for each app in the DataFrame and set the inferred clusters in the context. + """ # if the user passed in the cpu cluster property, use that but we still want to try to infer the gpu # cluster to use if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported: - self.logger.info('auto tuning inferred Cluster but cpu node was already set') + self.logger.info('CPU cluster is already set. Skipping cluster inference.') return - cpu_cluster_dict = {} - offline_cluster_opts = self.wrapper_options.get('migrationClustersProps', {}) - for index, row in cluster_info_df.iterrows(): - single_cluster_df = cluster_info_df.iloc[[index]] - - # TODO - test executor instance picked up if there - # Infer the CPU cluster from the cluster information - cpu_cluster_obj = ClusterInference(platform=self.ctxt.platform).infer_cpu_cluster(single_cluster_df) - # Continue cluster inference for next app - if cpu_cluster_obj is None: - continue - cpu_cluster_dict[row['App ID']] = cpu_cluster_obj - # Log the inferred cluster information and set the context - self._log_inferred_cluster_info(cpu_cluster_obj) - - self.ctxt.set_ctxt('cpuClusterInfoPerApp', cpu_cluster_dict) - # Process gpu cluster arguments and update savings calculations flag - gpu_cluster_dict = self._process_gpu_cluster_args_for_auto_tuner(offline_cluster_opts) - self.ctxt.set_ctxt('gpuClusterInfoPerApp', gpu_cluster_dict) - - def _log_inferred_cluster_info(self, cpu_cluster_obj): - master_node = cpu_cluster_obj.get_master_node() - executor_node = cpu_cluster_obj.get_worker_node(0) - num_executors = cpu_cluster_obj.get_nodes_cnt(SparkNodeType.WORKER) - self.logger.info('Inferred Cluster => Driver: %s, Executor: %s X %s', - master_node.instance_type, - num_executors, - executor_node.instance_type) + cpu_cluster_cols = ['Num Executor Nodes', 'Executor Instance', 'Cores Per Executor'] + gpu_cluster_cols = ['Recommended Num Executor Nodes', 'Recommended Executor Instance', + 'Recommended Cores Per Executor'] + # == Infer CPU clusters per app == + # Drop GPU/Recommended columns to infer the CPU cluster information + cpu_cluster_df = cluster_info_df.drop(columns=gpu_cluster_cols, errors='ignore') + cpu_clusters_per_app = self._infer_cluster_per_app(cpu_cluster_df, ClusterType.CPU) + self.ctxt.set_ctxt('cpuClusterInfoPerApp', cpu_clusters_per_app) + # == Infer GPU clusters per app == + # Drop CPU columns to infer the GPU cluster information + gpu_cluster_df = cluster_info_df.drop(columns=cpu_cluster_cols, errors='ignore') + # Rename GPU columns to drop the 'Recommended' prefix + gpu_cluster_df.rename(columns=dict(zip(gpu_cluster_cols, cpu_cluster_cols)), inplace=True) + # Assumption: num executors per node will be same as num gpus per node + gpu_cluster_df['Num Executors Per Node'] = cluster_info_df['Recommended Num GPUs Per Node'] + gpu_clusters_per_app = self._infer_cluster_per_app(gpu_cluster_df, ClusterType.GPU) + self.ctxt.set_ctxt('gpuClusterInfoPerApp', gpu_clusters_per_app) def __build_output_files_info(self) -> dict: """ diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 399cba950..49ee8fe18 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -135,7 +135,7 @@ local: outputComment: "Heuristics information" configRecommendations: name: 'rapids_4_spark_qualification_output/tuning' - outputComment: "Cluster config recommendations" + outputComment: "*Cluster config recommendations" columns: - 'Full Cluster Config Recommendations*' - 'GPU Config Recommendation Breakdown*' diff --git a/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py b/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py index 559b931b7..a28defb11 100644 --- a/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py +++ b/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py @@ -17,12 +17,14 @@ from functools import partial from dataclasses import dataclass, field from typing import Optional +from logging import Logger import pandas as pd from spark_rapids_pytools.cloud_api.sp_types import ClusterBase from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.rapids.tool_ctxt import ToolContext +from spark_rapids_pytools.common.utilities import ToolLogging @dataclass @@ -72,10 +74,11 @@ class ClusterConfigRecommender: Class for recommending cluster shape and tuning configurations for processed apps. """ ctxt: ToolContext = field(default=None, init=True) + logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster_recommender'), init=False) @classmethod - def _get_instance_type_conversion(cls, cpu_cluster: ClusterBase, - gpu_cluster: ClusterBase) -> Optional[ClusterRecommendationInfo]: + def _get_instance_type_conversion(cls, cpu_cluster: Optional[ClusterBase], + gpu_cluster: Optional[ClusterBase]) -> Optional[ClusterRecommendationInfo]: """ Helper method to determine the conversion summary between CPU and GPU instance types. Generate the cluster shape recommendation as: @@ -85,21 +88,22 @@ def _get_instance_type_conversion(cls, cpu_cluster: ClusterBase, 'Qualified Node Recommendation': 'm6.xlarge to g5.2xlarge' } """ - if not cpu_cluster: + # Return None if no GPU cluster is available. + # If no CPU cluster is available, we can still recommend based on the inferred GPU cluster in the Scala tool. + if not gpu_cluster: return None - cpu_instance_type = cpu_cluster.get_worker_node().instance_type - recommended_cluster = cpu_cluster - conversion_str = cpu_instance_type - if gpu_cluster: - gpu_instance_type = gpu_cluster.get_worker_node().instance_type + gpu_instance_type = gpu_cluster.get_worker_node().instance_type + recommended_cluster_config = gpu_cluster.get_cluster_configuration() + conversion_str = gpu_instance_type + source_cluster_config = {} + + if cpu_cluster: + source_cluster_config = cpu_cluster.get_cluster_configuration() + cpu_instance_type = cpu_cluster.get_worker_node().instance_type if cpu_instance_type != gpu_instance_type: - recommended_cluster = gpu_cluster conversion_str = f'{cpu_instance_type} to {gpu_instance_type}' - return ClusterRecommendationInfo( - cpu_cluster.get_cluster_configuration(), - recommended_cluster.get_cluster_configuration(), - conversion_str) + return ClusterRecommendationInfo(source_cluster_config, recommended_cluster_config, conversion_str) def _get_cluster_conversion_summary(self) -> dict: """ @@ -114,6 +118,7 @@ def _get_cluster_conversion_summary(self) -> dict: gpu_cluster_info = self.ctxt.get_ctxt('gpuClusterProxy') conversion_summary_all = self._get_instance_type_conversion(cpu_cluster_info, gpu_cluster_info) if conversion_summary_all: + self._log_cluster_conversion(cpu_cluster_info, gpu_cluster_info) cluster_conversion_summary['all'] = conversion_summary_all.to_dict() # Summary for each app @@ -125,6 +130,7 @@ def _get_cluster_conversion_summary(self) -> dict: gpu_info = gpu_cluster_info_per_app.get(app_id) conversion_summary = self._get_instance_type_conversion(cpu_info, gpu_info) if conversion_summary: + self._log_cluster_conversion(cpu_info, gpu_info, app_id) cluster_conversion_summary[app_id] = conversion_summary.to_dict() return cluster_conversion_summary @@ -181,10 +187,25 @@ def add_cluster_and_tuning_recommendations(self, tools_processed_apps: pd.DataFr for col, replacement in ClusterRecommendationInfo.get_default_dict().items(): # Using partial to avoid closure issues with lambda in apply() fill_na_fn = partial(lambda x, def_val: def_val if pd.isna(x) else x, def_val=replacement) - result_df[col] = result_df.get(col, pd.Series()).apply(fill_na_fn) + col_dtype = 'str' if isinstance(replacement, str) else 'object' + result_df[col] = result_df.get(col, pd.Series(dtype=col_dtype)).apply(fill_na_fn) # 2. Add tuning configuration recommendations to all apps tuning_recommendation_summary = self._get_tuning_summary(result_df) for col, val in tuning_recommendation_summary.to_dict().items(): result_df[col] = val return result_df + + def _log_cluster_conversion(self, cpu_cluster: Optional[ClusterBase], gpu_cluster: Optional[ClusterBase], + app_id: Optional[str] = None) -> None: + """ + Log the cluster conversion summary + """ + cpu_cluster_str = cpu_cluster.get_cluster_shape_str() if cpu_cluster else 'N/A' + gpu_cluster_str = gpu_cluster.get_cluster_shape_str() if gpu_cluster else 'N/A' + conversion_log_msg = f'CPU cluster: {cpu_cluster_str}; Recommended GPU cluster: {gpu_cluster_str}' + if cpu_cluster and cpu_cluster.is_inferred: # If cluster is inferred, add it to the log message + conversion_log_msg = f'Inferred {conversion_log_msg}' + if app_id: # If app_id is provided, add it to the log message + conversion_log_msg = f'For App ID: {app_id}, {conversion_log_msg}' + self.logger.info(conversion_log_msg) From aa1149d3f59c563c7b972f82d8b9eefea7e0b6d3 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 26 Jul 2024 10:35:31 -0700 Subject: [PATCH 2/2] Move cluster cols to config file Signed-off-by: Partho Sarthi --- .../src/spark_rapids_pytools/rapids/qualification.py | 5 ++--- .../resources/qualification-conf.yaml | 10 ++++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 1e7f1dd89..c9a910955 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -787,9 +787,8 @@ def _infer_clusters_for_apps(self, cluster_info_df: pd.DataFrame) -> None: if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported: self.logger.info('CPU cluster is already set. Skipping cluster inference.') return - cpu_cluster_cols = ['Num Executor Nodes', 'Executor Instance', 'Cores Per Executor'] - gpu_cluster_cols = ['Recommended Num Executor Nodes', 'Recommended Executor Instance', - 'Recommended Cores Per Executor'] + cpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'cpuClusterColumns') + gpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'gpuClusterColumns') # == Infer CPU clusters per app == # Drop GPU/Recommended columns to infer the CPU cluster information cpu_cluster_df = cluster_info_df.drop(columns=gpu_cluster_cols, errors='ignore') diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 49ee8fe18..0a65f486c 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -341,6 +341,16 @@ local: dstCol: 'Estimated GPU Speedup' - srcCol: 'appDuration_pred' dstCol: 'Estimated GPU Duration' + clusterInference: + cpuClusterColumns: + - 'Num Executor Nodes' + - 'Executor Instance' + - 'Cores Per Executor' + gpuClusterColumns: + - 'Recommended Num Executor Nodes' + - 'Recommended Executor Instance' + - 'Recommended Cores Per Executor' + platform: shortName: 'qual' outputDir: qual-tool-output