Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cluster recommendation when CPU cluster cannot be determined #13

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,12 @@ def _init_nodes(self):
# construct worker nodes info when cluster is inactive
executors_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0
if num_workers != executors_cnt:
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using generated names.', executors_cnt,
num_workers)
if not self.is_inferred:
# this warning should be raised only when the cluster is not inferred, i.e. user has provided the
# cluster configuration with num_workers explicitly set
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using the `num_workers` value.', executors_cnt,
num_workers)
worker_nodes_from_conf = self.generate_node_configurations(num_workers)
if num_workers == 0 and self.props.get_value('node_type_id') is None:
# if there are no worker nodes and no node_type_id, then we cannot proceed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,12 @@ def _init_nodes(self):
# construct worker nodes info when cluster is inactive
executors_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0
if num_workers != executors_cnt:
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using generated names.', executors_cnt,
num_workers)
if not self.is_inferred:
# this warning should be raised only when the cluster is not inferred, i.e. user has provided the
# cluster configuration with num_workers explicitly set
self.logger.warning('Cluster configuration: `executors` count %d does not match the '
'`num_workers` value %d. Using generated names.', executors_cnt,
num_workers)
worker_nodes_from_conf = self.generate_node_configurations(num_workers)
if num_workers == 0 and self.props.get_value('node_type_id') is None:
# if there are no worker nodes and no node_type_id, then we cannot proceed
Expand Down
9 changes: 6 additions & 3 deletions user_tools/src/spark_rapids_pytools/cloud_api/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,12 @@ def _init_nodes(self):
worker_nodes_from_conf = self.props.get_value_silent('config', 'workerConfig', 'instanceNames')
instance_names_cnt = len(worker_nodes_from_conf) if worker_nodes_from_conf else 0
if worker_cnt != instance_names_cnt:
self.logger.warning('Cluster configuration: `instanceNames` count %d does not '
'match the `numInstances` value %d. Using generated names.',
instance_names_cnt, worker_cnt)
if not self.is_inferred:
# this warning should be raised only when the cluster is not inferred, i.e. user has provided the
# cluster configuration with num_workers explicitly set
self.logger.warning('Cluster configuration: `instanceNames` count %d does not '
'match the `numInstances` value %d. Using generated names.',
instance_names_cnt, worker_cnt)
worker_nodes_from_conf = self.generate_node_configurations(worker_cnt)
# create workers array
for worker_node in worker_nodes_from_conf:
Expand Down
9 changes: 9 additions & 0 deletions user_tools/src/spark_rapids_pytools/cloud_api/sp_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,15 @@ def generate_node_configurations(self, num_executors: int, render_args: dict = N
node_config = self._generate_node_configuration(render_args)
return [node_config for _ in range(num_executors)]

def get_cluster_shape_str(self) -> str:
"""
Returns a string representation of the cluster shape.
"""
master_node = self.get_master_node().instance_type
executor_node = self.get_worker_node(0).instance_type
num_executors = self.get_nodes_cnt(SparkNodeType.WORKER)
return f'<Driver: {master_node}, Executor: {num_executors} X {executor_node}>'


@dataclass
class ClusterReshape(ClusterGetAccessor):
Expand Down
58 changes: 40 additions & 18 deletions user_tools/src/spark_rapids_pytools/common/cluster_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""This module provides functionality for cluster inference"""

from dataclasses import dataclass, field

from enum import Enum
from typing import Optional
from logging import Logger

Expand All @@ -26,17 +26,29 @@
from spark_rapids_pytools.common.utilities import ToolLogging


class ClusterType(Enum):
"""
Enum for cluster types
"""
CPU = 'CPU'
GPU = 'GPU'

def __str__(self):
return self.value


@dataclass
class ClusterInference:
"""
Class for inferring cluster information and constructing CPU clusters.
Class for inferring cluster information and constructing CPU or GPU clusters.

:param platform: The platform on which the cluster inference is performed.
"""
platform: PlatformBase = field(default=None, init=True)
cluster_type: ClusterType = field(default=ClusterType.CPU, init=True)
logger: Logger = field(default=ToolLogging.get_and_setup_logger('rapids.tools.cluster_inference'), init=False)

def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict]:
def _get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict]:
"""
Extract information about drivers and executors from input json
"""
Expand All @@ -53,10 +65,15 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict
cores_per_executor = cluster_info_df.get('Cores Per Executor')
execs_per_node = cluster_info_df.get('Num Executors Per Node')
total_cores_per_node = execs_per_node * cores_per_executor
if pd.isna(total_cores_per_node):
self.logger.info('For App ID: %s, Unable to infer %s cluster. Reason - Total cores per node cannot'
' be determined.', cluster_info_df['App ID'], self.cluster_type)
return None
# TODO - need to account for number of GPUs per executor
executor_instance = self.platform.get_matching_executor_instance(total_cores_per_node)
if pd.isna(executor_instance):
self.logger.info('Unable to infer CPU cluster. No matching executor instance found for vCPUs = %s',
self.logger.info('For App ID: %s, Unable to infer %s cluster. Reason - No matching executor instance '
'found for num cores = %d', cluster_info_df['App ID'], self.cluster_type,
total_cores_per_node)
return None
return {
Expand All @@ -66,21 +83,26 @@ def get_cluster_template_args(self, cluster_info_df: pd.Series) -> Optional[dict
'NUM_EXECUTOR_NODES': int(num_executor_nodes)
}

def infer_cpu_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]:
def infer_cluster(self, cluster_info_df: pd.DataFrame) -> Optional[ClusterBase]:
"""
Infer CPU cluster configuration based on json input and return the constructed cluster object.
Infer CPU or GPU cluster configuration based input cluster df and return the constructed cluster object.
"""
if len(cluster_info_df) != 1:
self.logger.info('Cannot infer CPU cluster from event logs. Only single cluster is supported.')
return None
try:
if len(cluster_info_df) != 1:
self.logger.info('Cannot infer %s cluster from event logs. Only single cluster is supported.',
self.cluster_type)
return None

# Extract cluster information from parsed logs. Above check ensures df contains single row.
cluster_template_args = self.get_cluster_template_args(cluster_info_df.iloc[0])
if cluster_template_args is None:
return None
# Construct cluster configuration using platform-specific logic
cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args)
if cluster_conf is None:
# Extract cluster information from parsed logs. Above check ensures df contains single row.
cluster_template_args = self._get_cluster_template_args(cluster_info_df.iloc[0])
if cluster_template_args is None:
return None
# Construct cluster configuration using platform-specific logic
cluster_conf = self.platform.generate_cluster_configuration(cluster_template_args)
if cluster_conf is None:
return None
cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False)
return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Error while inferring cluster: %s', str(e))
return None
cluster_props_new = JSONPropertiesContainer(cluster_conf, file_load=False)
return self.platform.load_cluster_by_prop(cluster_props_new, is_inferred=True)
99 changes: 38 additions & 61 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import re
from dataclasses import dataclass, field
from math import ceil
from typing import Any, List, Callable, Optional
from typing import Any, List, Callable, Optional, Dict

import numpy as np
import pandas as pd
from tabulate import tabulate

from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, SparkNodeType
from spark_rapids_pytools.common.cluster_inference import ClusterInference
from spark_rapids_pytools.cloud_api.sp_types import ClusterReshape, NodeHWInfo, ClusterBase
from spark_rapids_pytools.common.cluster_inference import ClusterInference, ClusterType
from spark_rapids_pytools.common.prop_manager import JSONPropertiesContainer, convert_dict_to_camel_case
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
Expand Down Expand Up @@ -137,7 +137,7 @@ def generate_report(self,
report_content.append(f'{app_name} tool found no records to show.')

if self.filter_apps_count > 0:
self.comments.append('\'Estimated GPU Speedup Category\' assumes the user is using the node type '
self.comments.append('**Estimated GPU Speedup Category assumes the user is using the node type '
'recommended and config recommendations with the same size cluster as was used '
'with the CPU side.')

Expand Down Expand Up @@ -713,8 +713,7 @@ def process_df_for_stdout(raw_df):
# info columns, even if `cluster_info_df` is empty.
df = pd.merge(df, cluster_info_df, on=['App Name', 'App ID'], how='left')
if len(cluster_info_df) > 0:
self.__infer_cluster_and_update_savings(cluster_info_df)
self.__infer_cluster_for_auto_tuning(cluster_info_df)
self._infer_clusters_for_apps(cluster_info_df)
except Exception as e: # pylint: disable=broad-except
self.logger.error('Unable to process cluster information. Cost savings will be disabled. '
'Reason - %s:%s', type(e).__name__, e)
Expand Down Expand Up @@ -764,68 +763,46 @@ def _init_rapids_arg_list_for_qual(self) -> List[str]:
rapids_threads_args = self._get_rapids_threads_count(self.name)
return ['--per-sql'] + rapids_threads_args + self._create_autotuner_rapids_args()

def __infer_cluster_and_update_savings(self, cluster_info_df: pd.DataFrame):
def _infer_cluster_per_app(self, cluster_info_df: pd.DataFrame,
cluster_type: ClusterType) -> Dict[str, Optional[ClusterBase]]:
"""
Update savings if CPU cluster can be inferred and corresponding GPU cluster can be defined.
:param cluster_info_df: Parsed cluster information.
"""
# we actually want to use the inferred version over what user passed if possible
if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported:
self.logger.info('Inferred Cluster but cpu node was already set')
return

# Infer the CPU cluster from the cluster information
cpu_cluster_obj = ClusterInference(platform=self.ctxt.platform).infer_cpu_cluster(cluster_info_df)
if cpu_cluster_obj is None:
return

# Log the inferred cluster information and set the context
self._log_inferred_cluster_info(cpu_cluster_obj)
self.ctxt.set_ctxt('cpuClusterProxy', cpu_cluster_obj)
Infers clusters for each app in the DataFrame and returns a dictionary of Cluster objects.

# Process gpu cluster arguments and update savings calculations flag
offline_cluster_opts = self.wrapper_options.get('migrationClustersProps', {})
enable_savings_flag = self._process_gpu_cluster_args(offline_cluster_opts)
self._set_savings_calculations_flag(enable_savings_flag)
:param cluster_info_df: DataFrame containing cluster information for each app.
:param cluster_type: The type of cluster to infer.
:return: A dictionary where the key is the app ID and the value is the inferred Cluster object.
"""
cluster_inference_obj = ClusterInference(platform=self.ctxt.platform, cluster_type=cluster_type)
return {
row['App ID']: cluster_inference_obj.infer_cluster(cluster_info_df.iloc[[index]])
for index, row in cluster_info_df.iterrows()
}

# this function is a lot like __infer_cluster_and_update_savings but handles clusters
# on a per application basis and was explicitly copied to not have to deal with
# changing the cost savings flow at the same time. Ideally in the future they
# get combined back together.
def __infer_cluster_for_auto_tuning(self, cluster_info_df: pd.DataFrame):
def _infer_clusters_for_apps(self, cluster_info_df: pd.DataFrame) -> None:
"""
Infer CPU and GPU clusters for each app in the DataFrame and set the inferred clusters in the context.
"""
# if the user passed in the cpu cluster property, use that but we still want to try to infer the gpu
# cluster to use
if self.ctxt.get_ctxt('cpuClusterProxy') is not None or not self.ctxt.platform.cluster_inference_supported:
self.logger.info('auto tuning inferred Cluster but cpu node was already set')
self.logger.info('CPU cluster is already set. Skipping cluster inference.')
return
cpu_cluster_dict = {}
offline_cluster_opts = self.wrapper_options.get('migrationClustersProps', {})
for index, row in cluster_info_df.iterrows():
single_cluster_df = cluster_info_df.iloc[[index]]

# TODO - test executor instance picked up if there
# Infer the CPU cluster from the cluster information
cpu_cluster_obj = ClusterInference(platform=self.ctxt.platform).infer_cpu_cluster(single_cluster_df)
# Continue cluster inference for next app
if cpu_cluster_obj is None:
continue
cpu_cluster_dict[row['App ID']] = cpu_cluster_obj
# Log the inferred cluster information and set the context
self._log_inferred_cluster_info(cpu_cluster_obj)

self.ctxt.set_ctxt('cpuClusterInfoPerApp', cpu_cluster_dict)
# Process gpu cluster arguments and update savings calculations flag
gpu_cluster_dict = self._process_gpu_cluster_args_for_auto_tuner(offline_cluster_opts)
self.ctxt.set_ctxt('gpuClusterInfoPerApp', gpu_cluster_dict)

def _log_inferred_cluster_info(self, cpu_cluster_obj):
master_node = cpu_cluster_obj.get_master_node()
executor_node = cpu_cluster_obj.get_worker_node(0)
num_executors = cpu_cluster_obj.get_nodes_cnt(SparkNodeType.WORKER)
self.logger.info('Inferred Cluster => Driver: %s, Executor: %s X %s',
master_node.instance_type,
num_executors,
executor_node.instance_type)
cpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'cpuClusterColumns')
gpu_cluster_cols = self.ctxt.get_value('local', 'output', 'clusterInference', 'gpuClusterColumns')
# == Infer CPU clusters per app ==
# Drop GPU/Recommended columns to infer the CPU cluster information
cpu_cluster_df = cluster_info_df.drop(columns=gpu_cluster_cols, errors='ignore')
cpu_clusters_per_app = self._infer_cluster_per_app(cpu_cluster_df, ClusterType.CPU)
self.ctxt.set_ctxt('cpuClusterInfoPerApp', cpu_clusters_per_app)
# == Infer GPU clusters per app ==
# Drop CPU columns to infer the GPU cluster information
gpu_cluster_df = cluster_info_df.drop(columns=cpu_cluster_cols, errors='ignore')
# Rename GPU columns to drop the 'Recommended' prefix
gpu_cluster_df.rename(columns=dict(zip(gpu_cluster_cols, cpu_cluster_cols)), inplace=True)
# Assumption: num executors per node will be same as num gpus per node
gpu_cluster_df['Num Executors Per Node'] = cluster_info_df['Recommended Num GPUs Per Node']
gpu_clusters_per_app = self._infer_cluster_per_app(gpu_cluster_df, ClusterType.GPU)
self.ctxt.set_ctxt('gpuClusterInfoPerApp', gpu_clusters_per_app)

def __build_output_files_info(self) -> dict:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ local:
outputComment: "Heuristics information"
configRecommendations:
name: 'rapids_4_spark_qualification_output/tuning'
outputComment: "Cluster config recommendations"
outputComment: "*Cluster config recommendations"
columns:
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
Expand Down Expand Up @@ -341,6 +341,16 @@ local:
dstCol: 'Estimated GPU Speedup'
- srcCol: 'appDuration_pred'
dstCol: 'Estimated GPU Duration'
clusterInference:
cpuClusterColumns:
- 'Num Executor Nodes'
- 'Executor Instance'
- 'Cores Per Executor'
gpuClusterColumns:
- 'Recommended Num Executor Nodes'
- 'Recommended Executor Instance'
- 'Recommended Cores Per Executor'

platform:
shortName: 'qual'
outputDir: qual-tool-output
Expand Down
Loading