From 8fab112471b0a021fde3f65ce31464def8d0b1fa Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 12 Aug 2024 13:36:37 -0700 Subject: [PATCH] Remove calculation of gpu cluster recommendation from python tool when cluster argument is passed (#1278) * Deprecate gpu cluster recommendation from python Signed-off-by: Partho Sarthi * Remove 'gpuClusterRecommendation' from conf Signed-off-by: Partho Sarthi * Remove unused imports Signed-off-by: Partho Sarthi * Remove unused methods and configs Signed-off-by: Partho Sarthi --------- Signed-off-by: Partho Sarthi --- .../rapids/qualification.py | 201 ++---------------- .../resources/qualification-conf.yaml | 34 --- .../spark_rapids_tools/cmdli/argprocessor.py | 8 - .../src/spark_rapids_tools/cmdli/tools_cli.py | 5 +- user_tools/src/spark_rapids_tools/enums.py | 11 - .../tools/cluster_config_recommender.py | 15 +- 6 files changed, 16 insertions(+), 258 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index d2ae44174..f90c0e9f3 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -16,20 +16,19 @@ import json import re from dataclasses import dataclass, field -from math import ceil from typing import Any, List, Callable, Optional, Dict import numpy as np import pandas as pd from tabulate import tabulate -from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, ClusterBase +from spark_rapids_pytools.cloud_api.sp_types import ClusterBase from spark_rapids_pytools.common.cluster_inference import ClusterInference, ClusterType from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer, convert_dict_to_camel_case from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool -from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel +from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender from spark_rapids_tools.tools.qualx.qualx_main import predict @@ -127,112 +126,18 @@ def _process_cpu_cluster_args(self, offline_cluster_opts: dict = None): if cpu_cluster_arg is not None: cpu_cluster_obj = self._create_migration_cluster('CPU', cpu_cluster_arg) self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj) - - def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool: - def _process_gpu_cluster_worker_node(): - try: - if gpu_cluster_obj: - worker_node = gpu_cluster_obj.get_worker_node() - worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - sys_info = worker_node._pull_sys_info() # pylint: disable=protected-access - gpu_info = worker_node._pull_gpu_hw_info() # pylint: disable=protected-access - worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) - - except Exception as e: # pylint: disable=broad-except - self.logger.warning( - 'Failed to get the worker node information for the GPU cluster %s:%s', - type(e).__name__, e) - - gpu_cluster_arg = offline_cluster_opts.get('gpuCluster') - cpu_cluster = self.ctxt.get_ctxt('cpuClusterProxy') - if gpu_cluster_arg: - gpu_cluster_obj = self._create_migration_cluster('GPU', gpu_cluster_arg) - else: - gpu_cluster_obj = None - if cpu_cluster: - # Convert the CPU instances to support gpu. Otherwise, gpuCluster is not set - self.logger.info('Creating GPU cluster by converting the CPU cluster instances to GPU supported types') - gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(cpu_cluster) - - self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj) - - _process_gpu_cluster_worker_node() - if cpu_cluster and cpu_cluster.is_inferred: - # If the CPU cluster is inferred, we skip the auto-tuner as it is called after the Qualification tool. - return gpu_cluster_obj is not None - - if gpu_cluster_obj and self.ctxt.get_rapids_auto_tuner_enabled(): - # Generate Autotuner input file for the Qualification - # Note that we do not call the `_calculate_spark_settings(worker_node_hw_info)` method here - # because the Qualification tool does not need to calculate the recommended Spark settings - # as it will be part of the generated Autotuner output file. - self._generate_autotuner_input_from_cluster(gpu_cluster_obj) - - return gpu_cluster_obj is not None - - # this function is a lot like _process_gpu_cluster_args but handles clusters - # on a per application basis and was explicitly copied to not have to deal with - # changing the cost savings flow at the same time. - def _process_gpu_cluster_args_for_auto_tuner(self, offline_cluster_opts: dict = None) -> dict: - def _process_gpu_cluster_worker_node(): - try: - worker_node = gpu_cluster_obj.get_worker_node() - worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access - worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info) - - except Exception as e: # pylint: disable=broad-except - self.logger.warning( - 'Failed to get the worker node information for the GPU cluster %s:%s', - type(e).__name__, e) - - gpu_cluster_arg = offline_cluster_opts.get('gpuCluster') - # only do this if no gpu cluster specified - gpu_cluster_info_dict = {} - gpu_cluster_obj = None - if not gpu_cluster_arg: - cpu_cluster_info_per_app = self.ctxt.get_ctxt('cpuClusterInfoPerApp') - for app_id in cpu_cluster_info_per_app: - cpu_cluster_info = cpu_cluster_info_per_app[app_id] - if cpu_cluster_info: - # Convert the CPU instances to support gpu. Otherwise, gpuCluster is not set - self.logger.info( - 'Creating GPU cluster by converting the CPU cluster instances to GPU supported types') - gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(cpu_cluster_info) - _process_gpu_cluster_worker_node() - gpu_cluster_info_dict[app_id] = gpu_cluster_obj - - return gpu_cluster_info_dict + if cpu_cluster_obj and self.ctxt.get_rapids_auto_tuner_enabled(): + # Generate Autotuner input file for the Qualification + # Note that we do not call the `_calculate_spark_settings(worker_node_hw_info)` method here + # because the Qualification tool does not need to calculate the recommended Spark settings + # as it will be part of the generated Autotuner output file. + self._generate_autotuner_input_from_cluster(cpu_cluster_obj) # process a single cluster specified by the user def _process_offline_cluster_args(self) -> None: # read the wrapper option defined by the spark_rapids cmd if any. offline_cluster_opts = self.wrapper_options.get('migrationClustersProps', {}) self._process_cpu_cluster_args(offline_cluster_opts) - self._process_gpu_cluster_args(offline_cluster_opts) - - def _set_savings_calculations_flag(self, enable_flag: bool) -> None: - self.ctxt.set_ctxt('enableSavingsCalculations', enable_flag) - - def __process_gpu_cluster_recommendation(self, arg_val: str) -> None: - available_types = [filter_enum.value for filter_enum in QualGpuClusterReshapeType] - default_recommendation_txt = self.ctxt.get_value('sparkRapids', 'cli', 'defaults', - 'gpuClusterRecommendation', - 'defaultRecommendation') - if arg_val: - try: - selected_recommendation = QualGpuClusterReshapeType.fromstring(arg_val) - except Exception: # pylint: disable=broad-except - selected_recommendation = QualGpuClusterReshapeType.fromstring(default_recommendation_txt) - self.logger.warning( - 'Invalid argument gpu_cluster_recommendation=%s.\n\t' - 'Accepted options are: [%s].\n\t' - 'Falling-back to default filter: %s', - arg_val, Utils.gen_joined_str(' | ', available_types), default_recommendation_txt) - else: - selected_recommendation = QualFilterApp.fromstring(default_recommendation_txt) - self.ctxt.set_ctxt('gpuClusterShapeRecommendation', selected_recommendation) def __process_filter_args(self, arg_val: str) -> None: selected_filter = QualFilterApp.fromstring(arg_val) @@ -279,7 +184,6 @@ def _process_custom_args(self) -> None: self.ctxt.set_ctxt('gpuDevice', gpu_device) self.ctxt.set_ctxt('cuda', cuda) # we need to process each argument to verify it is valid. otherwise, we may crash late - self.__process_gpu_cluster_recommendation(self.wrapper_options.get('gpuClusterRecommendation')) self.__process_filter_args(self.wrapper_options.get('filterApps')) self._process_estimation_model_args() self._process_offline_cluster_args() @@ -388,84 +292,6 @@ def __generate_cluster_shape_report(self) -> Optional[str]: return Utils.gen_multiline_str(self.ctxt.platform.ctxt['notes'].get('clusterShape')) return None - def __recommendation_is_non_standard(self): - cluster_shape_type = self.ctxt.get_ctxt('gpuClusterShapeRecommendation') - if cluster_shape_type: - return cluster_shape_type != QualGpuClusterReshapeType.get_default() - return False - - def __apply_non_standard_gpu_shape(self, - all_apps: pd.DataFrame, - cluster_workers_cnt: int, - cluster_shape_t: QualGpuClusterReshapeType): - min_w_cnt_from_conf = self.ctxt.platform.configs.get_value_silent('clusterSpecs', - 'minWorkerNodes') - scale_factor_from_conf = self.ctxt.platform.configs.get_value_silent('clusterSpecs', - 'gpuScaleFactor') - # get the min_worker_cnt from the qualification config in case it is not defined for the platform - default_min_w_cnt = self.ctxt.get_value('local', 'output', 'processDFProps', - 'minimumWorkerCount') - # get the scale factor from the qualification config in case it is not defined for the platform - default_scale_factor = self.ctxt.get_value('local', 'output', 'processDFProps', 'gpuScaleFactor') - # As you reduce nodes, performance will be slightly better than linear based on benchmarks - scale_f = scale_factor_from_conf if scale_factor_from_conf else default_scale_factor - min_w_cnt = min_w_cnt_from_conf if min_w_cnt_from_conf else default_min_w_cnt - # calculate the reshape_cluster_column - reshape_col = self.ctxt.get_value('local', 'output', 'processDFProps', - 'clusterShapeCols', 'columnName') - speedup_col = 'Estimated GPU Speedup' - gpu_dur_col = 'Estimated GPU Duration' - cpu_dur_col = 'App Duration' - - def f_cell(x): - return ceil(x * 100) / 100 - - def calc_cluster_shape_col(df_row, min_worker_cnt: int, old_workers_cnt: int) -> pd.Series: - gpu_speedup = df_row[speedup_col] - # We should not worry about division by 0 because speedup is BGE 1.0 - cluster_shape = max(min_worker_cnt, ceil(scale_f * old_workers_cnt / gpu_speedup)) - return pd.Series([cluster_shape]) - - def update_cols_with_new_shape(apps_df: pd.DataFrame, - old_workers_cnt: int) -> (pd.DataFrame, bool): - apps_df[gpu_dur_col] = apps_df.apply(lambda row: f_cell( - (old_workers_cnt / row[reshape_col]) * scale_f * row[cpu_dur_col] / row[speedup_col]), axis=1) - apps_df[speedup_col] = apps_df.apply( - lambda row: f_cell(row[cpu_dur_col] / row[gpu_dur_col]), axis=1 - ) - return apps_df - - all_apps[[reshape_col]] = all_apps.apply( - lambda row: calc_cluster_shape_col(row, min_w_cnt, cluster_workers_cnt), axis=1) - recalc_speedups_flag = True - if cluster_shape_t == QualGpuClusterReshapeType.CLUSTER: - # the column value should be reset to the maximum of all the rows - max_workers_cnt = all_apps[reshape_col].max() - all_apps[reshape_col] = max_workers_cnt - # Append a node to be part of the summary report - reshape_msg_plain = self.ctxt.get_value('local', 'output', 'processDFProps', - 'clusterShapeCols', 'noteMsg') - self.ctxt.platform.update_ctxt_notes('clusterShape', - reshape_msg_plain.format(max_workers_cnt)) - # If max_workers_cnt EQ gpu_cluster nodes then no need to recalculate the columns - recalc_speedups_flag = max_workers_cnt != cluster_workers_cnt - # check if we need to recalculate the flags - if not recalc_speedups_flag: - return all_apps, False - return update_cols_with_new_shape(all_apps, cluster_workers_cnt), True - - def __apply_gpu_cluster_reshape(self, all_apps: pd.DataFrame) -> (pd.DataFrame, bool): - gpu_reshape_type = self.ctxt.get_ctxt('gpuClusterShapeRecommendation') - gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy')) - per_row_flag = False - if gpu_cluster.cluster_inst is not None and self.__recommendation_is_non_standard(): - apps_df, per_row_flag = self.__apply_non_standard_gpu_shape(all_apps, - gpu_cluster.get_workers_count(), - gpu_reshape_type) - else: - apps_df = all_apps - return apps_df, per_row_flag - def __build_global_report_summary(self, all_apps: pd.DataFrame, total_apps: pd.DataFrame, @@ -496,13 +322,12 @@ def __build_global_report_summary(self, speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories')) # Group the applications and recalculate metrics apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df) - apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df) + df_final_result = speedup_category_ob.build_category_column(apps_grouped_df) reshaped_notes = self.__generate_cluster_shape_report() report_comments = [group_notes] if group_notes else [] if reshaped_notes: report_comments.append(reshaped_notes) - df_final_result, _ = self.__apply_gpu_cluster_reshape(apps_grouped_df) csv_out = output_files_info.get_value('summary', 'path') if not df_final_result.empty: self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out) @@ -623,10 +448,10 @@ def _infer_clusters_for_apps(self, cluster_info_df: pd.DataFrame) -> None: """ Infer CPU and GPU clusters for each app in the DataFrame and set the inferred clusters in the context. """ - # if the user passed in the cpu cluster property, use that but we still want to try to infer the gpu - # cluster to use - if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported: - self.logger.info('CPU cluster is already set. Skipping cluster inference.') + # if cluster inference is not supported, skip the inference + if not self.ctxt.platform.cluster_inference_supported: + self.logger.info('Cluster inference is not supported for platform: %s', + self.ctxt.platform.get_platform_name()) return cpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'cpuClusterColumns') gpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'gpuClusterColumns') diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index dc71d36ef..9aef64b52 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -71,8 +71,6 @@ sparkRapids: cudaVersion: '11.5' cli: defaults: - gpuClusterRecommendation: - defaultRecommendation: 'MATCH' costSavingsSettings: #temporrary settings to control the code behavior until the cost savings feature is fully removed enabled: false toolOptions: @@ -176,38 +174,6 @@ local: - 'App Duration' - 'Estimated GPU Duration' - 'Estimated GPU Speedup' - processDFProps: - minimumWorkerCount: 2 - gpuScaleFactor: 0.80 - savingsRecommendationsDefault: 'Not Recommended' - savingRecommendationsRanges: - nonRecommended: - title: 'Not Recommended' - lowerBound: -1000000.0 - upperBound: 1.0 - recommended: - title: 'Recommended' - lowerBound: 1.0 - upperBound: 30.0 - stronglyRecommended: - title: 'Strongly Recommended' - lowerBound: 30.0 - upperBound: 1000000.0 - clusterShapeCols: - columnName: 'Recommended Cluster Shape' - noteMsg: 'The GPU estimations are done with a recommended GPU cluster shape of {} worker nodes' - colsPerShapeType: - CLUSTER: - excludeColumns: - - 'Speedup Based Recommendation' - - 'Estimated GPU Speedup' - JOB: - excludeColumns: - - 'Speedup Based Recommendation' - - 'Estimated GPU Speedup' - appendColumns: - - columnName: 'Recommended Cluster Shape' - index: 4 treeDirectory: enabled: true depthLevel: 4 diff --git a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py index 7d53bfd0e..eb963e7a2 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py +++ b/user_tools/src/spark_rapids_tools/cmdli/argprocessor.py @@ -27,7 +27,6 @@ from spark_rapids_pytools.cloud_api.sp_types import DeployMode from spark_rapids_pytools.common.utilities import ToolLogging -from spark_rapids_pytools.rapids.qualification import QualGpuClusterReshapeType from spark_rapids_tools.cloud import ClientCluster from spark_rapids_tools.utils import AbstractPropContainer, is_http_file from ..enums import QualFilterApp, CspEnv, QualEstimationModel @@ -447,7 +446,6 @@ class QualifyUserArgModel(ToolUserArgModel): This is used as doing preliminary validation against some of the common pattern """ filter_apps: Optional[QualFilterApp] = None - gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict) def init_tool_args(self) -> None: @@ -458,11 +456,6 @@ def init_tool_args(self) -> None: self.p_args['toolArgs']['filterApps'] = QualFilterApp.get_default() else: self.p_args['toolArgs']['filterApps'] = self.filter_apps - # check the reshapeType argument - if self.gpu_cluster_recommendation is None: - self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default() - else: - self.p_args['toolArgs']['gpuClusterRecommendation'] = self.gpu_cluster_recommendation # Check the estimationModel argument # This assumes that the EstimationModelArgProcessor was used to process the arguments before # constructing this validator. @@ -512,7 +505,6 @@ def build_tools_args(self) -> dict: 'eventlogs': self.eventlogs, 'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']), 'toolsJar': self.p_args['toolArgs']['toolsJar'], - 'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'], 'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs'] } return wrapped_args diff --git a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py index 5780ef54d..894dd4b9f 100644 --- a/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py +++ b/user_tools/src/spark_rapids_tools/cmdli/tools_cli.py @@ -18,7 +18,7 @@ import fire from spark_rapids_tools.cmdli.argprocessor import AbsToolUserArgModel -from spark_rapids_tools.enums import QualGpuClusterReshapeType, CspEnv, QualEstimationModel +from spark_rapids_tools.enums import CspEnv, QualEstimationModel from spark_rapids_tools.utils.util import gen_app_banner, init_environment from spark_rapids_pytools.common.utilities import Utils, ToolLogging from spark_rapids_pytools.rapids.qualx.prediction import Prediction @@ -115,8 +115,7 @@ def qualification(self, jvm_heap_size=jvm_heap_size, jvm_threads=jvm_threads, filter_apps=filter_apps, - estimation_model_args=estimation_model_args, - gpu_cluster_shape=QualGpuClusterReshapeType.get_default()) + estimation_model_args=estimation_model_args) if qual_args: tool_obj = QualificationAsLocal(platform_type=qual_args['runtimePlatform'], output_folder=qual_args['outputFolder'], diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index dd5417404..1351e3efc 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -119,17 +119,6 @@ def get_default(cls): return cls.TOP_CANDIDATES -class QualGpuClusterReshapeType(EnumeratedType): - """Values used to filter out the applications in the qualification report""" - MATCH = 'match' - CLUSTER = 'cluster' - JOB = 'job' - - @classmethod - def get_default(cls): - return cls.MATCH - - class ConditionOperator(EnumeratedType): """Enum representing comparison operators for conditions.""" EQUAL = auto() diff --git a/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py b/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py index f5d5494c9..ab2f39245 100644 --- a/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py +++ b/user_tools/src/spark_rapids_tools/tools/cluster_config_recommender.py @@ -122,14 +122,6 @@ def _get_cluster_conversion_summary(self) -> Dict[str, ClusterRecommendationInfo ii. '' -> `ClusterRecommendationInfo` for each app """ cluster_conversion_summary = {} - # Summary for all instances - cpu_cluster_info = self.ctxt.get_ctxt('cpuClusterProxy') - gpu_cluster_info = self.ctxt.get_ctxt('gpuClusterProxy') - conversion_summary_all = self._get_instance_type_conversion(cpu_cluster_info, gpu_cluster_info) - if conversion_summary_all: - cluster_conversion_summary['all'] = conversion_summary_all - - # Summary for each app cpu_cluster_info_per_app = self.ctxt.get_ctxt('cpuClusterInfoPerApp') gpu_cluster_info_per_app = self.ctxt.get_ctxt('gpuClusterInfoPerApp') if cpu_cluster_info_per_app and gpu_cluster_info_per_app: @@ -181,12 +173,7 @@ def add_cluster_and_tuning_recommendations(self, tools_processed_apps: pd.DataFr cluster_conversion_summary = self._get_cluster_conversion_summary() # 'all' is a special indication that all the applications need to use this same node # recommendation vs the recommendations being per application - if 'all' in cluster_conversion_summary: - # Add cluster conversion columns to all apps - conversion_summary_flattened = cluster_conversion_summary['all'].to_dict() - for col, conversion_val in conversion_summary_flattened.items(): # type: str, Union[dict, str] - result_df[col] = [conversion_val] * result_df.shape[0] - elif len(cluster_conversion_summary) > 0: + if len(cluster_conversion_summary) > 0: # Add the per-app node conversions conversion_summary_flattened = {k: v.to_dict() for k, v in cluster_conversion_summary.items()} conversion_df = pd.DataFrame.from_dict(conversion_summary_flattened, orient='index').reset_index()