diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index c605281c2..180edbfd2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -63,7 +63,8 @@ abstract class Platform(var gpuDevice: Option[GpuDevice]) { /** * Recommendations to be included in the final list of recommendations. * These properties should be specific to the platform and not general Spark properties. - * For example: "spark.databricks.optimizer.dynamicFilePruning" for the Databricks platform. + * For example: we used to set "spark.databricks.optimizer.dynamicFilePruning" to false for the + * Databricks platform. * * Represented as a tuple of (propertyKey, propertyValue). */ @@ -135,9 +136,6 @@ abstract class DatabricksPlatform(gpuDevice: Option[GpuDevice]) extends Platform "spark.executor.memory", "spark.executor.memoryOverhead" ) - override val recommendationsToInclude: Seq[(String, String)] = Seq( - ("spark.databricks.optimizer.dynamicFilePruning", "false") - ) override def createClusterInfo(coresPerExecutor: Int, numExecutorNodes: Int, sparkProperties: Map[String, String], systemProperties: Map[String, String]): ClusterInfo = { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 431b0ce41..4c1db168f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -1115,7 +1115,7 @@ object AutoTuner extends Logging { "spark.executor.instances" -> "'spark.executor.instances' should be set to (gpuCount * numWorkers).", "spark.task.resource.gpu.amount" -> - "'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)).", + "'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)).", "spark.rapids.sql.concurrentGpuTasks" -> s"'spark.rapids.sql.concurrentGpuTasks' should be set to Min(4, (gpuMemory / 7.5G)).", "spark.rapids.memory.pinnedPool.size" -> diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala index 169b28901..7b4c2c2d0 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AutoTunerSuite.scala @@ -235,7 +235,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)). + |- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)). |- RAPIDS Accelerator for Apache Spark plugin jar is missing | from the classpath entries. | If the Spark RAPIDS jar is being bundled with your @@ -273,7 +273,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)). + |- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)). |- Incorrect values in worker system information: {numCores: 0, memory: 122880MiB, numWorkers: 4}. |- RAPIDS Accelerator for Apache Spark plugin jar is missing | from the classpath entries. @@ -312,7 +312,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)). + |- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)). |- Incorrect values in worker system information: {numCores: 32, memory: , numWorkers: 4}. |- RAPIDS Accelerator for Apache Spark plugin jar is missing | from the classpath entries. @@ -351,7 +351,7 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging { |- 'spark.sql.adaptive.enabled' should be enabled for better performance. |- 'spark.sql.files.maxPartitionBytes' was not set. |- 'spark.sql.shuffle.partitions' was not set. - |- 'spark.task.resource.gpu.amount' should be set to Max(1, (numCores / gpuCount)). + |- 'spark.task.resource.gpu.amount' should be set to Min(1, (gpuCount / numCores)). |- Incorrect values in worker system information: {numCores: 32, memory: 0m, numWorkers: 4}. |- RAPIDS Accelerator for Apache Spark plugin jar is missing | from the classpath entries. diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 3eecddf27..99cd6a41e 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -14,7 +14,6 @@ """Implementation class representing wrapper around the RAPIDS acceleration Qualification tool.""" -import textwrap from dataclasses import dataclass, field from math import ceil from typing import Any, List, Callable @@ -32,6 +31,7 @@ from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool from spark_rapids_tools.enums import QualFilterApp, QualGpuClusterReshapeType, QualEstimationModel from spark_rapids_tools.tools.model_xgboost import predict +from spark_rapids_tools.tools.speedup_category import SpeedupCategory from spark_rapids_tools.tools.top_candidates import TopCandidates from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration from spark_rapids_tools.utils.util import Utilities @@ -694,8 +694,13 @@ def __build_global_report_summary(self, # Calculate unsupported operators stage duration before grouping all_apps = unsupported_ops_obj.prepare_apps_with_unsupported_stages(all_apps, unsupported_ops_df) apps_pruned_df = self.__remap_columns_and_prune(all_apps) + speedup_category_ob = SpeedupCategory(self.ctxt.get_value('local', 'output', 'speedupCategories')) + # Calculate the speedup category column + apps_pruned_df = speedup_category_ob.build_category_column(apps_pruned_df) apps_pruned_df.to_csv(output_files_info['full']['path'], float_format='%.2f') apps_grouped_df, group_notes = self.__group_apps_by_name(apps_pruned_df) + # Recalculate the speedup category column after grouping + apps_grouped_df = speedup_category_ob.build_category_column(apps_grouped_df) recommended_apps = self.__get_recommended_apps(apps_grouped_df) # if the gpu_reshape_type is set to JOB then, then we should ignore recommended apps speedups_irrelevant_flag = self.__recommendation_is_non_standard() @@ -763,13 +768,17 @@ def process_df_for_stdout(raw_df): filter_recommendation_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.SPEEDUPS filter_pos_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.SAVINGS filter_top_candidate_enabled = self.ctxt.get_ctxt('filterApps') == QualFilterApp.TOP_CANDIDATES + squeeze_header_enabled = self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'compactWidth') + header_width = self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'columnWidth') if filter_top_candidate_enabled: # TODO: Ideally we should create instance of TopCandidates as class variable using the filter apps flag. # This should be refactored along with entire filter apps logic to use more object-oriented design. top_candidates_obj = TopCandidates(self.ctxt.get_value('local', 'output', 'topCandidates')) filtered_apps = top_candidates_obj.filter_apps(raw_df) - return top_candidates_obj.prepare_output(filtered_apps) + result_df = top_candidates_obj.prepare_output(filtered_apps) + # squeeze the header titles if enabled + return Utilities.squeeze_df_header(result_df, header_width) if squeeze_header_enabled else result_df if self.__recommendation_is_non_standard(): # During processing of arguments phase, we verified that the filter does not conflict @@ -809,15 +818,7 @@ def process_df_for_stdout(raw_df): df_row.columns = df_row.columns.str.replace('Duration', f'Duration{time_unit}', regex=False) # squeeze the header titles if enabled - if self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'compactWidth'): - col_w_conf = self.ctxt.get_value('toolOutput', 'stdout', 'summaryReport', 'columnWidth') - for column in df_row.columns: - if len(column) > col_w_conf: - new_column_name = textwrap.fill(column, col_w_conf, break_long_words=False) - if new_column_name != column: - df_row.columns = df_row.columns.str.replace(column, - new_column_name, regex=False) - return df_row + return Utilities.squeeze_df_header(df_row, header_width) if squeeze_header_enabled else df_row if not self._evaluate_rapids_jar_tool_output_exist(): return diff --git a/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json b/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json index bfdd42914..882dfc1b3 100644 --- a/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json @@ -12,11 +12,11 @@ }, { "name": "Hadoop Azure", - "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.6/hadoop-azure-3.3.6.jar", + "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar", "type": "jar", - "md5": "0fb8b8e565fd920fb809220cb2cc5ee7", - "sha1": "24425e7fad3a302715cefd570a0f4bdf3f50bc8e", - "size": 609646 + "md5": "1ec4cbd59548412010fe1515070eef73", + "sha1": "a23f621bca9b2100554150f6b0b521f94b8b419e", + "size": 574116 } ], "SPARK333-LOCAL": [ diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 9e3fac40f..797d5e7e8 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -213,12 +213,15 @@ local: resultColumnName: 'Unsupported Operators Stage Duration' percentResultColumnName: 'Unsupported Operators Stage Duration Percent' topCandidates: + categoryColumnName: 'Estimated GPU Speedup Category' outputColumns: - 'App ID' - 'App Name' - - 'App Duration' - - 'Estimated GPU Speedup' - - 'Unsupported Operators Stage Duration Percent' + - 'Estimated GPU Speedup Category' + eligibleCategories: + - 'Small' + - 'Medium' + - 'Large' sortingColumns: - 'Estimated GPU Speedup' - 'Unsupported Operators Stage Duration Percent' @@ -226,30 +229,30 @@ local: - 'App ID' - 'App Name' - 'App Duration' - ranges: - - columnName: 'Estimated GPU Speedup' - lowerBound: 1.3 - upperBound: 1000000.0 - - columnName: 'Unsupported Operators Stage Duration Percent' - lowerBound: 0.0 - upperBound: 25.0 - output: # Configs related to output - columns: - - 'App ID' - - 'App Name' - - 'Estimated GPU Speedup' - remap: - - columnName: 'Estimated GPU Speedup' - recommendationRanges: - - title: 'Small' - lowerBound: 1.3 - upperBound: 2.0 - - title: 'Medium' - lowerBound: 2.0 - upperBound: 3.0 - - title: 'Large' - lowerBound: 3.0 - upperBound: 1000000.0 + speedupCategories: + speedupColumnName: 'Estimated GPU Speedup' + categoryColumnName: 'Estimated GPU Speedup Category' + categories: + - title: 'Not Applicable' + lowerBound: -1000000.0 + upperBound: 1.3 + - title: 'Small' + lowerBound: 1.3 + upperBound: 2.0 + - title: 'Medium' + lowerBound: 2.0 + upperBound: 3.0 + - title: 'Large' + lowerBound: 3.0 + upperBound: 1000000.0 + eligibilityConditions: + - columnName: 'Estimated GPU Speedup' + lowerBound: 1.3 + upperBound: 1000000.0 + - columnName: 'Unsupported Operators Stage Duration Percent' + lowerBound: 0.0 + upperBound: 25.0 + defaultCategory: 'Not Recommended' predictionModel: outputDirectory: 'xgboost_predictions' files: diff --git a/user_tools/src/spark_rapids_tools/tools/speedup_category.py b/user_tools/src/spark_rapids_tools/tools/speedup_category.py new file mode 100644 index 000000000..40dc6928d --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/speedup_category.py @@ -0,0 +1,93 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Implementation class for Speedup Category logic.""" + +from dataclasses import dataclass, field +from typing import Optional + +import pandas as pd + + +@dataclass +class SpeedupCategory: + """ + Encapsulates the logic to categorize the speedup values based on the range values. + """ + props: dict = field(default=None, init=True) + + def __build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame: + """ + Build the category column based on the range values of the speedup column. + Example: + props['categories'] = [ + {'title': 'Not Recommended', 'lowerBound': -100000, 'upperBound': 1.3}, + {'title': 'Small', 'lowerBound': 1.3, 'upperBound': 2}, + {'title': 'Medium', 'lowerBound': 2, 'upperBound': 3}, + {'title': 'Large', 'lowerBound': 3, 'upperBound': 100000} + ] + 1. input: row_1 = pd.Series({'speedup': 1.8}) + output: row_1 = pd.Series({'speedup': 1.8, 'speedup category': 'Small'}) + reason: Speedup Category will be 'Small' because the speedup is within the range (1.3-2). + 2. input: row_2 = pd.Series({'speedup': 3.5}) + output: row_2 = pd.Series({'speedup': 3.5, 'speedup category': 'Large'}) + reason: Speedup Category will be 'Large' because the speedup is within the range (3-100000). + """ + categories = self.props.get('categories') + category_col_name = self.props.get('categoryColumnName') + speedup_col_name = self.props.get('speedupColumnName') + + # Calculate the category based on the speedup value + def calculate_category(col_value) -> Optional[str]: + for category in categories: + if category.get('lowerBound') <= col_value < category.get('upperBound'): + return category.get('title') + return None + all_apps[category_col_name] = all_apps[speedup_col_name].apply(calculate_category) + return all_apps + + def __process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame: + """ + Process the speedup category column based on the eligibility criteria. If the row does not match + the criteria, the category column will be set to the `Not Recommended` category. + Example: + self.props['eligibilityConditions'] = [ + {'columnName': 'criteriaCol1', 'lowerBound': 18, 'upperBound': 30}, + {'columnName': 'criteriaCol2', 'lowerBound': 70, 'upperBound': 100} + ] + 1. input: row_1 = pd.Series({'criteriaCol1': 25, 'criteriaCol2': 85, 'speedup category': 'Large'}) + output: row_1 = pd.Series({'criteriaCol1': 25, 'criteriaCol2': 85, 'speedup category': 'Large'}) + reason: Category will remain 'Large' because the criteriaCol1 is within the range (18-30) and + the criteriaCol2 (85) is within the range (70-100). + 2. input: row_2 = pd.Series({'criteriaCol1': 15, 'criteriaCol2': 85, 'speedup category': 'Medium'}) + output: row_2 = pd.Series({'criteriaCol1': 15, 'criteriaCol2': 85, 'speedup category': 'Not Recommended'}) + reason: Category will be set to 'Not Recommended' because the criteriaCol1 is not within the range (18-30) + """ + category_col_name = self.props.get('categoryColumnName') + + def process_row(single_row: pd.Series) -> str: + for entry in self.props.get('eligibilityConditions'): + col_value = single_row[entry.get('columnName')] + # If the value is not within the range, set the category to default category (Not Recommended) + if not entry.get('lowerBound') <= col_value <= entry.get('upperBound'): + return self.props.get('defaultCategory') + return single_row.get(category_col_name) + + all_apps[category_col_name] = all_apps.apply(process_row, axis=1) + return all_apps + + def build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame: + apps_with_category = self.__build_category_column(all_apps) + processed_apps = self.__process_category(apps_with_category) + return processed_apps diff --git a/user_tools/src/spark_rapids_tools/tools/top_candidates.py b/user_tools/src/spark_rapids_tools/tools/top_candidates.py index a22001c64..f6ee5b167 100644 --- a/user_tools/src/spark_rapids_tools/tools/top_candidates.py +++ b/user_tools/src/spark_rapids_tools/tools/top_candidates.py @@ -15,8 +15,6 @@ """Implementation class for Top Candidates logic.""" from dataclasses import dataclass, field -from typing import Optional -from functools import partial import pandas as pd @@ -32,47 +30,16 @@ def filter_apps(self, all_apps: pd.DataFrame) -> pd.DataFrame: """ Generic method to filter applications based on criteria """ - filtered_apps = all_apps[all_apps.apply(self.__filter_single_row, axis=1)] - # Select output columns and sort - output_columns = self.props.get('outputColumns') - sorting_columns = self.props.get('sortingColumns') - return filtered_apps[output_columns].sort_values(by=sorting_columns, ascending=False) - - def __filter_single_row(self, single_row: pd.Series) -> bool: - """ - Used to create a filter for based on specified ranges. - Example: - self.props['ranges'] = [ - {'columnName': 'colA', 'lowerBound': 18, 'upperBound': 30}, - {'columnName': 'colB', 'lowerBound': 70, 'upperBound': 100} - ] - single_row = pd.Series({'colA': 25, 'colB': 85}) - The function will return True because the colA (25) is within the range (18-30) - and the colB (85) is within the range (70-100). - """ - for criteria in self.props.get('ranges'): - col_value = single_row[criteria.get('columnName')] - if not criteria.get('lowerBound') <= col_value <= criteria.get('upperBound'): - return False - return True + category_col_name = self.props.get('categoryColumnName') + eligible_categories = self.props.get('eligibleCategories') + # Filter applications based on categories + return all_apps[all_apps[category_col_name].isin(eligible_categories)] def prepare_output(self, all_apps: pd.DataFrame) -> pd.DataFrame: """ Generic method to transform applications for the output """ - output_props = self.props.get('output') - - # Function to remap column values based on recommended ranges - def remap_column(col_value, recommended_ranges: dict) -> Optional[str]: - for s_range in recommended_ranges: - if s_range['lowerBound'] <= col_value < s_range['upperBound']: - return s_range['title'] - return None - - # Iterate over each entry and apply remapping to respective columns - for remap_entry in output_props.get('remap', []): - column_name = remap_entry.get('columnName') - recommendation_ranges = remap_entry.get('recommendationRanges') - remap_func = partial(remap_column, recommended_ranges=recommendation_ranges) - all_apps[column_name] = all_apps[column_name].apply(remap_func) - return all_apps[output_props.get('columns', [])] + output_columns = self.props.get('outputColumns') + sorting_columns = self.props.get('sortingColumns') + # Sort columns and select output columns + return all_apps.sort_values(by=sorting_columns, ascending=False)[output_columns] diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index 7218a9496..8361aa764 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -19,6 +19,7 @@ import re import ssl import sys +import textwrap import urllib import xml.etree.ElementTree as elem_tree from functools import reduce @@ -298,3 +299,13 @@ def adjust_tools_resources(cls, 'rapidsThreads': prof_threads } } + + @classmethod + def squeeze_df_header(cls, df_row: pd.DataFrame, header_width: int) -> pd.DataFrame: + for column in df_row.columns: + if len(column) > header_width: + new_column_name = textwrap.fill(column, header_width, break_long_words=False) + if new_column_name != column: + df_row.columns = df_row.columns.str.replace(column, + new_column_name, regex=False) + return df_row