Skip to content

Commit

Permalink
Remove calculation of gpu cluster recommendation from python tool whe…
Browse files Browse the repository at this point in the history
…n cluster argument is passed (#1278)

* Deprecate gpu cluster recommendation from python

Signed-off-by: Partho Sarthi <[email protected]>

* Remove 'gpuClusterRecommendation' from conf

Signed-off-by: Partho Sarthi <[email protected]>

* Remove unused imports

Signed-off-by: Partho Sarthi <[email protected]>

* Remove unused methods and configs

Signed-off-by: Partho Sarthi <[email protected]>

---------

Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa authored Aug 12, 2024
1 parent bec93f0 commit 8fab112
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 258 deletions.
201 changes: 13 additions & 188 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@
import json
import re
from dataclasses import dataclass, field
from math import ceil
from typing import Any, List, Callable, Optional, Dict

import numpy as np
import pandas as pd
from tabulate import tabulate

from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, ClusterBase
from spark_rapids_pytools.cloud_api.sp_types import ClusterBase
from spark_rapids_pytools.common.cluster_inference import ClusterInference, ClusterType
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer, convert_dict_to_camel_case
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
Expand Down Expand Up @@ -127,112 +126,18 @@ def _process_cpu_cluster_args(self, offline_cluster_opts: dict = None):
if cpu_cluster_arg is not None:
cpu_cluster_obj = self._create_migration_cluster('CPU', cpu_cluster_arg)
self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj)

def _process_gpu_cluster_args(self, offline_cluster_opts: dict = None) -> bool:
def _process_gpu_cluster_worker_node():
try:
if gpu_cluster_obj:
worker_node = gpu_cluster_obj.get_worker_node()
worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
sys_info = worker_node._pull_sys_info() # pylint: disable=protected-access
gpu_info = worker_node._pull_gpu_hw_info() # pylint: disable=protected-access
worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info)

except Exception as e: # pylint: disable=broad-except
self.logger.warning(
'Failed to get the worker node information for the GPU cluster %s:%s',
type(e).__name__, e)

gpu_cluster_arg = offline_cluster_opts.get('gpuCluster')
cpu_cluster = self.ctxt.get_ctxt('cpuClusterProxy')
if gpu_cluster_arg:
gpu_cluster_obj = self._create_migration_cluster('GPU', gpu_cluster_arg)
else:
gpu_cluster_obj = None
if cpu_cluster:
# Convert the CPU instances to support gpu. Otherwise, gpuCluster is not set
self.logger.info('Creating GPU cluster by converting the CPU cluster instances to GPU supported types')
gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(cpu_cluster)

self.ctxt.set_ctxt('gpuClusterProxy', gpu_cluster_obj)

_process_gpu_cluster_worker_node()
if cpu_cluster and cpu_cluster.is_inferred:
# If the CPU cluster is inferred, we skip the auto-tuner as it is called after the Qualification tool.
return gpu_cluster_obj is not None

if gpu_cluster_obj and self.ctxt.get_rapids_auto_tuner_enabled():
# Generate Autotuner input file for the Qualification
# Note that we do not call the `_calculate_spark_settings(worker_node_hw_info)` method here
# because the Qualification tool does not need to calculate the recommended Spark settings
# as it will be part of the generated Autotuner output file.
self._generate_autotuner_input_from_cluster(gpu_cluster_obj)

return gpu_cluster_obj is not None

# this function is a lot like _process_gpu_cluster_args but handles clusters
# on a per application basis and was explicitly copied to not have to deal with
# changing the cost savings flow at the same time.
def _process_gpu_cluster_args_for_auto_tuner(self, offline_cluster_opts: dict = None) -> dict:
def _process_gpu_cluster_worker_node():
try:
worker_node = gpu_cluster_obj.get_worker_node()
worker_node._pull_and_set_mc_props(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
sys_info = worker_node._pull_sys_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
gpu_info = worker_node._pull_gpu_hw_info(cli=self.ctxt.platform.cli) # pylint: disable=protected-access
worker_node.hw_info = NodeHWInfo(sys_info=sys_info, gpu_info=gpu_info)

except Exception as e: # pylint: disable=broad-except
self.logger.warning(
'Failed to get the worker node information for the GPU cluster %s:%s',
type(e).__name__, e)

gpu_cluster_arg = offline_cluster_opts.get('gpuCluster')
# only do this if no gpu cluster specified
gpu_cluster_info_dict = {}
gpu_cluster_obj = None
if not gpu_cluster_arg:
cpu_cluster_info_per_app = self.ctxt.get_ctxt('cpuClusterInfoPerApp')
for app_id in cpu_cluster_info_per_app:
cpu_cluster_info = cpu_cluster_info_per_app[app_id]
if cpu_cluster_info:
# Convert the CPU instances to support gpu. Otherwise, gpuCluster is not set
self.logger.info(
'Creating GPU cluster by converting the CPU cluster instances to GPU supported types')
gpu_cluster_obj = self.ctxt.platform.migrate_cluster_to_gpu(cpu_cluster_info)
_process_gpu_cluster_worker_node()
gpu_cluster_info_dict[app_id] = gpu_cluster_obj

return gpu_cluster_info_dict
if cpu_cluster_obj and self.ctxt.get_rapids_auto_tuner_enabled():
# Generate Autotuner input file for the Qualification
# Note that we do not call the `_calculate_spark_settings(worker_node_hw_info)` method here
# because the Qualification tool does not need to calculate the recommended Spark settings
# as it will be part of the generated Autotuner output file.
self._generate_autotuner_input_from_cluster(cpu_cluster_obj)

# process a single cluster specified by the user
def _process_offline_cluster_args(self) -> None:
# read the wrapper option defined by the spark_rapids cmd if any.
offline_cluster_opts = self.wrapper_options.get('migrationClustersProps', {})
self._process_cpu_cluster_args(offline_cluster_opts)
self._process_gpu_cluster_args(offline_cluster_opts)

def _set_savings_calculations_flag(self, enable_flag: bool) -> None:
self.ctxt.set_ctxt('enableSavingsCalculations', enable_flag)

def __process_gpu_cluster_recommendation(self, arg_val: str) -> None:
available_types = [filter_enum.value for filter_enum in QualGpuClusterReshapeType]
default_recommendation_txt = self.ctxt.get_value('sparkRapids', 'cli', 'defaults',
'gpuClusterRecommendation',
'defaultRecommendation')
if arg_val:
try:
selected_recommendation = QualGpuClusterReshapeType.fromstring(arg_val)
except Exception: # pylint: disable=broad-except
selected_recommendation = QualGpuClusterReshapeType.fromstring(default_recommendation_txt)
self.logger.warning(
'Invalid argument gpu_cluster_recommendation=%s.\n\t'
'Accepted options are: [%s].\n\t'
'Falling-back to default filter: %s',
arg_val, Utils.gen_joined_str(' | ', available_types), default_recommendation_txt)
else:
selected_recommendation = QualFilterApp.fromstring(default_recommendation_txt)
self.ctxt.set_ctxt('gpuClusterShapeRecommendation', selected_recommendation)

def __process_filter_args(self, arg_val: str) -> None:
selected_filter = QualFilterApp.fromstring(arg_val)
Expand Down Expand Up @@ -279,7 +184,6 @@ def _process_custom_args(self) -> None:
self.ctxt.set_ctxt('gpuDevice', gpu_device)
self.ctxt.set_ctxt('cuda', cuda)
# we need to process each argument to verify it is valid. otherwise, we may crash late
self.__process_gpu_cluster_recommendation(self.wrapper_options.get('gpuClusterRecommendation'))
self.__process_filter_args(self.wrapper_options.get('filterApps'))
self._process_estimation_model_args()
self._process_offline_cluster_args()
Expand Down Expand Up @@ -388,84 +292,6 @@ def __generate_cluster_shape_report(self) -> Optional[str]:
return Utils.gen_multiline_str(self.ctxt.platform.ctxt['notes'].get('clusterShape'))
return None

def __recommendation_is_non_standard(self):
cluster_shape_type = self.ctxt.get_ctxt('gpuClusterShapeRecommendation')
if cluster_shape_type:
return cluster_shape_type != QualGpuClusterReshapeType.get_default()
return False

def __apply_non_standard_gpu_shape(self,
all_apps: pd.DataFrame,
cluster_workers_cnt: int,
cluster_shape_t: QualGpuClusterReshapeType):
min_w_cnt_from_conf = self.ctxt.platform.configs.get_value_silent('clusterSpecs',
'minWorkerNodes')
scale_factor_from_conf = self.ctxt.platform.configs.get_value_silent('clusterSpecs',
'gpuScaleFactor')
# get the min_worker_cnt from the qualification config in case it is not defined for the platform
default_min_w_cnt = self.ctxt.get_value('local', 'output', 'processDFProps',
'minimumWorkerCount')
# get the scale factor from the qualification config in case it is not defined for the platform
default_scale_factor = self.ctxt.get_value('local', 'output', 'processDFProps', 'gpuScaleFactor')
# As you reduce nodes, performance will be slightly better than linear based on benchmarks
scale_f = scale_factor_from_conf if scale_factor_from_conf else default_scale_factor
min_w_cnt = min_w_cnt_from_conf if min_w_cnt_from_conf else default_min_w_cnt
# calculate the reshape_cluster_column
reshape_col = self.ctxt.get_value('local', 'output', 'processDFProps',
'clusterShapeCols', 'columnName')
speedup_col = 'Estimated GPU Speedup'
gpu_dur_col = 'Estimated GPU Duration'
cpu_dur_col = 'App Duration'

def f_cell(x):
return ceil(x * 100) / 100

def calc_cluster_shape_col(df_row, min_worker_cnt: int, old_workers_cnt: int) -> pd.Series:
gpu_speedup = df_row[speedup_col]
# We should not worry about division by 0 because speedup is BGE 1.0
cluster_shape = max(min_worker_cnt, ceil(scale_f * old_workers_cnt / gpu_speedup))
return pd.Series([cluster_shape])

def update_cols_with_new_shape(apps_df: pd.DataFrame,
old_workers_cnt: int) -> (pd.DataFrame, bool):
apps_df[gpu_dur_col] = apps_df.apply(lambda row: f_cell(
(old_workers_cnt / row[reshape_col]) * scale_f * row[cpu_dur_col] / row[speedup_col]), axis=1)
apps_df[speedup_col] = apps_df.apply(
lambda row: f_cell(row[cpu_dur_col] / row[gpu_dur_col]), axis=1
)
return apps_df

all_apps[[reshape_col]] = all_apps.apply(
lambda row: calc_cluster_shape_col(row, min_w_cnt, cluster_workers_cnt), axis=1)
recalc_speedups_flag = True
if cluster_shape_t == QualGpuClusterReshapeType.CLUSTER:
# the column value should be reset to the maximum of all the rows
max_workers_cnt = all_apps[reshape_col].max()
all_apps[reshape_col] = max_workers_cnt
# Append a node to be part of the summary report
reshape_msg_plain = self.ctxt.get_value('local', 'output', 'processDFProps',
'clusterShapeCols', 'noteMsg')
self.ctxt.platform.update_ctxt_notes('clusterShape',
reshape_msg_plain.format(max_workers_cnt))
# If max_workers_cnt EQ gpu_cluster nodes then no need to recalculate the columns
recalc_speedups_flag = max_workers_cnt != cluster_workers_cnt
# check if we need to recalculate the flags
if not recalc_speedups_flag:
return all_apps, False
return update_cols_with_new_shape(all_apps, cluster_workers_cnt), True

def __apply_gpu_cluster_reshape(self, all_apps: pd.DataFrame) -> (pd.DataFrame, bool):
gpu_reshape_type = self.ctxt.get_ctxt('gpuClusterShapeRecommendation')
gpu_cluster = ClusterReshape(self.ctxt.get_ctxt('gpuClusterProxy'))
per_row_flag = False
if gpu_cluster.cluster_inst is not None and self.__recommendation_is_non_standard():
apps_df, per_row_flag = self.__apply_non_standard_gpu_shape(all_apps,
gpu_cluster.get_workers_count(),
gpu_reshape_type)
else:
apps_df = all_apps
return apps_df, per_row_flag

def __build_global_report_summary(self,
all_apps: pd.DataFrame,
total_apps: pd.DataFrame,
Expand Down Expand Up @@ -496,13 +322,12 @@ def __build_global_report_summary(self,
speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories'))
# Group the applications and recalculate metrics
apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df)
apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df)
df_final_result = speedup_category_ob.build_category_column(apps_grouped_df)
reshaped_notes = self.__generate_cluster_shape_report()
report_comments = [group_notes] if group_notes else []
if reshaped_notes:
report_comments.append(reshaped_notes)

df_final_result, _ = self.__apply_gpu_cluster_reshape(apps_grouped_df)
csv_out = output_files_info.get_value('summary', 'path')
if not df_final_result.empty:
self.logger.info('Generating GPU Estimated Speedup: as %s', csv_out)
Expand Down Expand Up @@ -623,10 +448,10 @@ def _infer_clusters_for_apps(self, cluster_info_df: pd.DataFrame) -> None:
"""
Infer CPU and GPU clusters for each app in the DataFrame and set the inferred clusters in the context.
"""
# if the user passed in the cpu cluster property, use that but we still want to try to infer the gpu
# cluster to use
if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported:
self.logger.info('CPU cluster is already set. Skipping cluster inference.')
# if cluster inference is not supported, skip the inference
if not self.ctxt.platform.cluster_inference_supported:
self.logger.info('Cluster inference is not supported for platform: %s',
self.ctxt.platform.get_platform_name())
return
cpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'cpuClusterColumns')
gpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'gpuClusterColumns')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ sparkRapids:
cudaVersion: '11.5'
cli:
defaults:
gpuClusterRecommendation:
defaultRecommendation: 'MATCH'
costSavingsSettings: #temporrary settings to control the code behavior until the cost savings feature is fully removed
enabled: false
toolOptions:
Expand Down Expand Up @@ -176,38 +174,6 @@ local:
- 'App Duration'
- 'Estimated GPU Duration'
- 'Estimated GPU Speedup'
processDFProps:
minimumWorkerCount: 2
gpuScaleFactor: 0.80
savingsRecommendationsDefault: 'Not Recommended'
savingRecommendationsRanges:
nonRecommended:
title: 'Not Recommended'
lowerBound: -1000000.0
upperBound: 1.0
recommended:
title: 'Recommended'
lowerBound: 1.0
upperBound: 30.0
stronglyRecommended:
title: 'Strongly Recommended'
lowerBound: 30.0
upperBound: 1000000.0
clusterShapeCols:
columnName: 'Recommended Cluster Shape'
noteMsg: 'The GPU estimations are done with a recommended GPU cluster shape of {} worker nodes'
colsPerShapeType:
CLUSTER:
excludeColumns:
- 'Speedup Based Recommendation'
- 'Estimated GPU Speedup'
JOB:
excludeColumns:
- 'Speedup Based Recommendation'
- 'Estimated GPU Speedup'
appendColumns:
- columnName: 'Recommended Cluster Shape'
index: 4
treeDirectory:
enabled: true
depthLevel: 4
Expand Down
8 changes: 0 additions & 8 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from spark_rapids_pytools.cloud_api.sp_types import DeployMode
from spark_rapids_pytools.common.utilities import ToolLogging
from spark_rapids_pytools.rapids.qualification import QualGpuClusterReshapeType
from spark_rapids_tools.cloud import ClientCluster
from spark_rapids_tools.utils import AbstractPropContainer, is_http_file
from ..enums import QualFilterApp, CspEnv, QualEstimationModel
Expand Down Expand Up @@ -447,7 +446,6 @@ class QualifyUserArgModel(ToolUserArgModel):
This is used as doing preliminary validation against some of the common pattern
"""
filter_apps: Optional[QualFilterApp] = None
gpu_cluster_recommendation: Optional[QualGpuClusterReshapeType] = None
estimation_model_args: Optional[Dict] = dataclasses.field(default_factory=dict)

def init_tool_args(self) -> None:
Expand All @@ -458,11 +456,6 @@ def init_tool_args(self) -> None:
self.p_args['toolArgs']['filterApps'] = QualFilterApp.get_default()
else:
self.p_args['toolArgs']['filterApps'] = self.filter_apps
# check the reshapeType argument
if self.gpu_cluster_recommendation is None:
self.p_args['toolArgs']['gpuClusterRecommendation'] = QualGpuClusterReshapeType.get_default()
else:
self.p_args['toolArgs']['gpuClusterRecommendation'] = self.gpu_cluster_recommendation
# Check the estimationModel argument
# This assumes that the EstimationModelArgProcessor was used to process the arguments before
# constructing this validator.
Expand Down Expand Up @@ -512,7 +505,6 @@ def build_tools_args(self) -> dict:
'eventlogs': self.eventlogs,
'filterApps': QualFilterApp.fromstring(self.p_args['toolArgs']['filterApps']),
'toolsJar': self.p_args['toolArgs']['toolsJar'],
'gpuClusterRecommendation': self.p_args['toolArgs']['gpuClusterRecommendation'],
'estimationModelArgs': self.p_args['toolArgs']['estimationModelArgs']
}
return wrapped_args
Expand Down
Loading

0 comments on commit 8fab112

Please sign in to comment.