Skip to content

Commit

Permalink
Merge branch 'dev' into spark-rapids-tools-1420
Browse files Browse the repository at this point in the history
  • Loading branch information
parthosa committed Dec 17, 2024
2 parents 4379878 + 4143ccc commit 46c75e4
Show file tree
Hide file tree
Showing 23 changed files with 69 additions and 54 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>24.10.3-SNAPSHOT</version>
<version>24.10.4-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-gke-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-l4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-dataproc-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-a10.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-a10G.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-emr-t4.csv
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/operatorsScore-onprem-a100.csv
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@ DecimalSum,1.5
MaxBy,1.5
MinBy,1.5
ArrayJoin,1.5
RunningWindowFunctionExec,1.5
1 change: 1 addition & 0 deletions core/src/main/resources/supportedExecs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ WriteFilesExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S,S,S
CustomShuffleReaderExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
WindowGroupLimitExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS
MapInArrowExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS,NS,NS
RunningWindowFunctionExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS,NS,NS
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
def getAggRawMetrics(app: AppBase, index: Int, sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None):
AggRawMetricsResult = {
val analysisObj = new AppSparkMetricsAnalyzer(app)
val sqlMetricsAgg = analysisObj.aggregateSparkMetricsBySql(index)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
analysisObj.aggregateSparkMetricsByStage(index),
analysisObj.shuffleSkewCheck(index),
analysisObj.aggregateSparkMetricsBySql(index),
analysisObj.aggregateIOMetricsBySql(analysisObj.aggregateSparkMetricsBySql(index)),
sqlMetricsAgg,
analysisObj.aggregateIOMetricsBySql(sqlMetricsAgg),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class GenericExecParser(
ExecInfo(
node,
sqlID,
node.name,
// Remove trailing spaces from node name if any
node.name.trim,
"",
speedupFactor,
duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ object SQLPlanParser extends Logging {
case "AggregateInPandas" | "ArrowEvalPython" | "AQEShuffleRead" | "CartesianProduct"
| "Coalesce" | "CollectLimit" | "CustomShuffleReader" | "FlatMapGroupsInPandas"
| "GlobalLimit" | "LocalLimit" | "InMemoryTableScan" | "MapInPandas"
| "PythonMapInArrow" | "MapInArrow" | "Range" | "Sample" | "Union"
| "WindowInPandas" =>
| "PythonMapInArrow" | "MapInArrow" | "Range" | "RunningWindowFunction"
| "Sample" | "Union" | "WindowInPandas" =>
GenericExecParser(node, checker, sqlID, app = Some(app)).parse
case "BatchScan" =>
BatchScanExecParser(node, checker, sqlID, app).parse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ class AutoTuner(
}
} else if (sparkVersion.contains("amzn")) {
sparkVersion match {
case ver if ver.contains("3.5.2") => "352"
case ver if ver.contains("3.5.1") => "351"
case ver if ver.contains("3.5.0") => "350"
case ver if ver.contains("3.4.1") => "341"
Expand Down
2 changes: 1 addition & 1 deletion user_tools/src/spark_rapids_pytools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from spark_rapids_pytools.build import get_version, get_spark_dep_version

VERSION = '24.10.3'
VERSION = '24.10.4'
# defines the default runtime build version for the user tools environment
SPARK_DEP_VERSION = '350'
__version__ = get_version(VERSION)
Expand Down
9 changes: 9 additions & 0 deletions user_tools/src/spark_rapids_tools/cmdli/argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ def init_extra_arg_cases(self) -> list:
def define_invalid_arg_cases(self) -> None:
super().define_invalid_arg_cases()
self.define_rejected_missing_eventlogs()
self.rejected['Missing Platform argument'] = {
'valid': False,
'callable': partial(self.raise_validation_exception,
'Cannot run tool cmd without platform argument. Re-run the command '
'providing the platform argument.'),
'cases': [
[ArgValueCase.UNDEFINED, ArgValueCase.IGNORE, ArgValueCase.IGNORE]
]
}
self.rejected['Cluster By Name Without Platform Hints'] = {
'valid': False,
'callable': partial(self.raise_validation_exception,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ def _read_csv_files(self) -> None:
'toolOutput', 'csv', 'unsupportedOperatorsReport', 'fileName')
rapids_unsupported_operators_file = FSUtil.build_path(
qual_output_dir, unsupported_operator_report_file)
self.unsupported_operators_df = pd.read_csv(rapids_unsupported_operators_file)
# load the unsupported operators and drop operators that have no names.
self.unsupported_operators_df = (
pd.read_csv(rapids_unsupported_operators_file,
dtype={'Unsupported Operator': str})).dropna(subset=['Unsupported Operator'])

stages_report_file = self.ctxt.get_value('toolOutput', 'csv', 'stagesInformation',
'fileName')
Expand All @@ -84,7 +87,14 @@ def _read_csv_files(self) -> None:

rapids_execs_file = self.ctxt.get_value('toolOutput', 'csv', 'execsInformation',
'fileName')
self.execs_df = pd.read_csv(FSUtil.build_path(qual_output_dir, rapids_execs_file))
# Load the execs CSV file and drop execs that have no stages or name
self.execs_df = (
pd.read_csv(FSUtil.build_path(qual_output_dir, rapids_execs_file),
dtype={'Exec Name': str,
'Exec Stages': str,
'Exec Children': str,
'Exec Children Node Ids': str})
.dropna(subset=['Exec Stages', 'Exec Name']))
self.logger.info('Reading CSV files completed.')

def _convert_durations(self) -> None:
Expand All @@ -103,7 +113,6 @@ def _preprocess_dataframes(self) -> None:
# from this dataframe can be matched with the stageID of stages dataframe
self.execs_df['Exec Stages'] = self.execs_df['Exec Stages'].str.split(':')
self.execs_df = (self.execs_df.explode('Exec Stages').
dropna(subset=['Exec Stages']).
rename(columns={'Exec Stages': 'Stage ID'}))
self.execs_df['Stage ID'] = self.execs_df['Stage ID'].astype(int)

Expand Down
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def combine_tables(table_name: str) -> pd.DataFrame:

# normalize WholeStageCodegen labels
ops_tbl.loc[
ops_tbl['nodeName'].str.startswith('WholeStageCodegen'), 'nodeName'
ops_tbl['nodeName'].astype(str).str.startswith('WholeStageCodegen'), 'nodeName'
] = 'WholeStageCodegen'

# format WholeStageCodegen for merging
Expand Down Expand Up @@ -1140,7 +1140,7 @@ def _is_ignore_no_perf(action: str) -> bool:
node_level_supp['Exec Is Supported'] = (
node_level_supp['Exec Is Supported']
| node_level_supp['Action'].apply(_is_ignore_no_perf)
| node_level_supp['Exec Name'].apply(
| node_level_supp['Exec Name'].astype(str).apply(
lambda x: x.startswith('WholeStageCodegen')
)
)
Expand Down
65 changes: 23 additions & 42 deletions user_tools/tests/spark_rapids_tools_ut/test_tool_argprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,9 @@ def test_with_platform_with_eventlogs(self, get_ut_data_dir, tool_name, csp):
cost_savings_enabled=False,
expected_platform=csp)

# should pass: platform not provided; event logs are provided
tool_args = self.create_tool_args_should_pass(tool_name,
eventlogs=f'{get_ut_data_dir}/eventlogs')
# for qualification, cost savings should be disabled because cluster is not provided
self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
cost_savings_enabled=False,
expected_platform=CspEnv.ONPREM)
# should fail: platform must be provided
self.create_tool_args_should_fail(tool_name,
eventlogs=f'{get_ut_data_dir}/eventlogs')

@pytest.mark.parametrize('tool_name', ['qualification', 'profiling'])
@pytest.mark.parametrize('csp', all_csps)
Expand All @@ -150,17 +146,19 @@ def test_with_platform_with_eventlogs_with_jar_files(self, get_ut_data_dir, tool
tools_jar=f'{get_ut_data_dir}/tools_mock.jar')
assert tool_args['toolsJar'] == f'{get_ut_data_dir}/tools_mock.jar'

# should pass: tools_jar is correct
tool_args = self.create_tool_args_should_pass(tool_name, eventlogs=f'{get_ut_data_dir}/eventlogs',
tools_jar=f'{get_ut_data_dir}/tools_mock.jar')
assert tool_args['toolsJar'] == f'{get_ut_data_dir}/tools_mock.jar'
# should fail: platform must be provided
self.create_tool_args_should_fail(tool_name,
eventlogs=f'{get_ut_data_dir}/eventlogs',
tools_jar=f'{get_ut_data_dir}/tools_mock.jar')

# should fail: tools_jar does not exist
self.create_tool_args_should_fail(tool_name, eventlogs=f'{get_ut_data_dir}/eventlogs',
self.create_tool_args_should_fail(tool_name, platform=csp,
eventlogs=f'{get_ut_data_dir}/eventlogs',
tools_jar=f'{get_ut_data_dir}/tools_mock.txt')

# should fail: tools_jar is not .jar extension
self.create_tool_args_should_fail(tool_name, eventlogs=f'{get_ut_data_dir}/eventlogs',
self.create_tool_args_should_fail(tool_name, platform=csp,
eventlogs=f'{get_ut_data_dir}/eventlogs',
tools_jar=f'{get_ut_data_dir}/worker_info.yaml')

@pytest.mark.parametrize('tool_name', ['qualification', 'profiling'])
Expand Down Expand Up @@ -230,25 +228,15 @@ def test_with_platform_with_cluster_props(self, get_ut_data_dir, tool_name, csp,
self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
cost_savings_enabled=True,
expected_platform=csp)

# should pass: platform not provided; missing eventlogs should be accepted for all CSPs (except onPrem)
# because the eventlogs can be retrieved from the cluster properties
tool_args = self.create_tool_args_should_pass(tool_name,
cluster=cluster_prop_file)
# for qualification, cost savings should be enabled because cluster is provided
self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
cost_savings_enabled=True,
expected_platform=csp)
else:
# should fail: onprem platform cannot retrieve eventlogs from cluster properties
self.create_tool_args_should_fail(tool_name,
platform=csp,
cluster=cluster_prop_file)

# should fail: platform not provided; defaults platform to onprem, cannot retrieve eventlogs from
# cluster properties
self.create_tool_args_should_fail(tool_name,
cluster=cluster_prop_file)
# should fail: platform must be provided for all CSPs as well as onprem
self.create_tool_args_should_fail(tool_name,
cluster=cluster_prop_file)

@pytest.mark.parametrize('tool_name', ['qualification', 'profiling'])
@pytest.mark.parametrize('csp,prop_path', all_cpu_cluster_props)
Expand All @@ -266,14 +254,10 @@ def test_with_platform_with_cluster_props_with_eventlogs(self, get_ut_data_dir,
cost_savings_enabled=CspEnv(csp) != CspEnv.ONPREM,
expected_platform=csp)

# should pass: platform not provided; cluster properties and eventlogs are provided
tool_args = self.create_tool_args_should_pass(tool_name,
cluster=cluster_prop_file,
eventlogs=f'{get_ut_data_dir}/eventlogs')
# for qualification, cost savings should be enabled because cluster is provided (except for onprem)
self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
cost_savings_enabled=CspEnv(csp) != CspEnv.ONPREM,
expected_platform=csp)
# should fail: platform must be provided
self.create_tool_args_should_fail(tool_name,
cluster=cluster_prop_file,
eventlogs=f'{get_ut_data_dir}/eventlogs')

@pytest.mark.parametrize('tool_name', ['profiling'])
@pytest.mark.parametrize('csp', all_csps)
Expand Down Expand Up @@ -308,18 +292,15 @@ def test_with_platform_with_autotuner_with_eventlogs(self, get_ut_data_dir, tool
cost_savings_enabled=False,
expected_platform=csp)

# should pass: platform not provided; autotuner properties and eventlogs are provided
tool_args = self.create_tool_args_should_pass(tool_name,
cluster=autotuner_prop_file,
eventlogs=f'{get_ut_data_dir}/eventlogs')
# cost savings should be disabled for profiling
self.validate_tool_args(tool_name=tool_name, tool_args=tool_args,
cost_savings_enabled=False,
expected_platform=CspEnv.ONPREM)
# should fail: platform must be provided
self.create_tool_args_should_fail(tool_name,
cluster=autotuner_prop_file,
eventlogs=f'{get_ut_data_dir}/eventlogs')

@pytest.mark.parametrize('prop_path', [autotuner_prop_path])
def test_profiler_with_driverlog(self, get_ut_data_dir, prop_path):
prof_args = AbsToolUserArgModel.create_tool_args('profiling',
platform=CspEnv.get_default(),
driverlog=f'{get_ut_data_dir}/{prop_path}')
assert not prof_args['requiresEventlogs']
assert prof_args['rapidOptions']['driverlog'] == f'{get_ut_data_dir}/{prop_path}'
Expand Down

0 comments on commit 46c75e4

Please sign in to comment.