Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:NVIDIA/spark-rapids-tools into cpu_g…
Browse files Browse the repository at this point in the history
…pu_transition_time
  • Loading branch information
nartal1 committed Aug 22, 2023
2 parents 29f0e8c + 85b564e commit a52803a
Show file tree
Hide file tree
Showing 15 changed files with 609 additions and 62 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/mvn-verify-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ on:
jobs:
build:
runs-on: ubuntu-latest

strategy:
matrix:
spark-version: ['311', '320', '330', '341']
steps:
- uses: actions/checkout@v3

Expand All @@ -32,5 +34,5 @@ jobs:
distribution: adopt
java-version: 8

- name: Run mvn verify
run: cd core && mvn verify
- name: Run mvn verify with Spark ${{ matrix.spark-version }}
run: cd core && mvn -Dbuildver=${{ matrix.spark-version }} verify
27 changes: 15 additions & 12 deletions .github/workflows/python-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ on:
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Install tox
run: |
python -m pip install --upgrade pip
python -m pip install tox
python -m pip install --pre tox-gh-actions
- name: Install tox
run: |
python -m pip install --upgrade pip
python -m pip install tox
python -m pip install --pre tox-gh-actions
- name: Run tox test
run: cd user_tools && tox -e pylint,flake8,python3.8 -- tests/test_diagnostic.py
- name: Run tox test
run: cd user_tools && tox
14 changes: 8 additions & 6 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
token: ${{ secrets.NVAUTO_TOKEN }}

- name: Build Changelog
id: build_changelog
Expand All @@ -25,7 +27,7 @@ jobs:
configuration: ".github/workflows/configuration.json" # Configuration file for the changelog builder (optional)z
outputFile: "CHANGELOG_BODY.md"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_TOKEN: ${{ secrets.NVAUTO_TOKEN }}

- name: Commit and Push Changelog
if: steps.build_changelog.outputs.changes > 0
Expand All @@ -38,15 +40,15 @@ jobs:
cat CURRENT_CHANGELOG.md CHANGELOG_BODY.md >> TEMP_CHANGELOG.md
cat TEMP_CHANGELOG.md CHANGELOG.md > NEW_CHANGELOG.md
git config user.name "GitHub Actions"
git config user.email "[email protected]"
git config user.name ${{ secrets.NVAUTO_USER }}
git config user.email ${{ secrets.NVAUTO_EMAIL }}
git fetch origin main
git checkout main
mv NEW_CHANGELOG.md CHANGELOG.md
git add CHANGELOG.md
git commit -m "Update changelogs"
git push origin main
git commit -s -m "Update changelogs"
git push -f https://nvauto:${{ secrets.NVAUTO_TOKEN }}@github.com/${GITHUB_REPOSITORY}.git main
- name: Set Version Number
id: set_version
Expand Down
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<artifactId>rapids-4-spark-tools_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark tools</name>
<description>RAPIDS Accelerator for Apache Spark tools</description>
<version>23.06.4-SNAPSHOT</version>
<version>23.08.0-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://github.com/NVIDIA/spark-rapids-tools</url>

Expand Down Expand Up @@ -327,7 +327,7 @@
<spark332.version>3.3.2</spark332.version>
<spark333.version>3.3.3-SNAPSHOT</spark333.version>
<spark340.version>3.4.0</spark340.version>
<spark341.version>3.4.1-SNAPSHOT</spark341.version>
<spark341.version>3.4.1</spark341.version>
<spark350.version>3.5.0-SNAPSHOT</spark350.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.plugin.version>4.3.0</scala.plugin.version>
Expand Down Expand Up @@ -361,7 +361,7 @@
<delta21x.version>2.1.1</delta21x.version>
<delta22x.version>2.2.0</delta22x.version>
<delta23x.version>2.3.0</delta23x.version>
<delta24x.version>2.4.0rc1</delta24x.version>
<delta24x.version>2.4.0</delta24x.version>
<delta.core.version>${delta10x.version}</delta.core.version>
<java.version>1.8</java.version>
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,6 @@ class AutoTuner(
&& appInfoProvider.getRedundantReadSize > DEF_READ_SIZE_THRESHOLD) {
appendRecommendation("spark.rapids.filecache.enabled", "true")
appendComment("Enable file cache only if Spark local disks bandwidth is > 1 GB/s")
} else {
null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,16 @@ class QualificationSuite extends BaseTestSuite {
try {
val lines = inputSource.getLines.toSeq
// 1 for header, 1 for values
assert(lines.size == 6)

val expLinesSize =
if (ToolUtils.isSpark340OrLater()) {
8
} else if (!ToolUtils.isSpark320OrLater()) {
6
} else {
7
}
assert(lines.size == expLinesSize)
assert(lines.head.contains("App ID,Unsupported Type,"))
assert(lines(1).contains("\"Read\",\"JSON\",\"Types not supported - bigint:int\""))
} finally {
Expand Down
8 changes: 4 additions & 4 deletions user_tools/custom_speedup_factors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ spark_rapids_user_tools onprem profiling --csv --eventlogs CPU-3k --local_folder
spark_rapids_user_tools onprem profiling --csv --eventlogs GPU-3k --local_folder GPU-3k-profile
```
3. Speedup factor generation
1. Run the speedup factor generation script, passing the CPU and GPU profiler output.
1. Run the speedup factor generation script, passing the CPU and GPU profiler output along with a CSV output filename.
```
python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile
python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile --output newScores.csv
```

The output will showcase what operators were detected in the benchmarks to be used as custom speedups. You can then update values from the default [operatorsScore.csv](https://github.com/NVIDIA/spark-rapids-tools/blob/dev/core/src/main/resources/operatorsScore.csv) file to create your own version with the custom speedup factors generated by the output.
The script will generate the new scores in the output specified by the `--output` argument.

## Running Workload Qualification with Custom Speedup Factors

Now that you have a custom *operatorsScore.csv* file, you can run the Spark RAPIDS qualification tool using it to get estimations applicable for your environment. Here is the command to run with a custom speedup factor file:
```
spark_rapids_user_tools onprem qualification --speedup-factor-file operatorsScore.csv --eventlogs <CPU-event-logs>
spark_rapids_user_tools onprem qualification --speedup-factor-file newScores.csv --eventlogs <CPU-event-logs>
```
18 changes: 18 additions & 0 deletions user_tools/custom_speedup_factors/defaultScores.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CPUOperator,Score
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
MapInPandasExec,1.2
WindowInPandasExec,1.2
KMeans-pyspark,8.86
KMeans-scala,1
PCA-pyspark,2.24
PCA-scala,2.69
LinearRegression-pyspark,2
LinearRegression-scala,1
RandomForestClassifier-pyspark,6.31
RandomForestClassifier-scala,1
RandomForestRegressor-pyspark,3.66
RandomForestRegressor-scala,1
XGBoost-pyspark,1
XGBoost-scala,3.31
104 changes: 75 additions & 29 deletions user_tools/custom_speedup_factors/generate_speedup_factors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
parser = argparse.ArgumentParser(description="Speedup Factor Analysis")
parser.add_argument("--cpu", type=str, help="Directory of CPU profiler logs", required=True)
parser.add_argument("--gpu", type=str, help="Directory of GPU profiler logs", required=True)
parser.add_argument("--output", type=str, help="Filename for custom speedup factors", required=True)
parser.add_argument("--verbose", action="store_true", help="flag to generate full verbose output for logging raw node results")
parser.add_argument("--chdir", action="store_true", help="flag to change to work dir that's the script located")
args = parser.parse_args()

cpu_dir = args.cpu
gpu_dir = args.gpu
output = args.output
verbose = args.verbose

cpu_stage_log = {}
Expand All @@ -55,13 +57,26 @@
mapping_info = mapping_info.groupby(['SQL Node'])['Child Node'].apply(','.join).reset_index()

# - process sql_plan_metrics_for_application.csv
# - load in "duration" (CPU) or "op time" (GPU)
# - load in "duration" (CPU)
# - replace WholeStageCodegen (CPU only) with list of operators from mapping lookup file
# - mapping_info.parent = sql_times.nodeName
cpu_sql_info = pd.read_csv(cpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv")
cpu_sql_times = cpu_sql_info[cpu_sql_info["name"] == "duration"]
cpu_sql_combined = cpu_sql_times.set_index('nodeName').join(mapping_info.set_index('SQL Node'), how='left')

# - parse WholeStageCodegen durations with child node mapping
cpu_sql_times_df = cpu_sql_combined[['Child Node', 'max_value']]

for index, row in cpu_sql_times_df.iterrows():
operators = str(row['Child Node']).split(',')
duration = row['max_value']/len(operators)/1000.0
for operator in operators:
if operator in cpu_stage_log[app_name]:
cpu_stage_log[app_name][operator] = cpu_stage_log[app_name][operator] + duration
else:
cpu_stage_log[app_name][operator] = duration

# - parse top-level execs from sql_to_stage_information.csv
cpu_stage_info = pd.read_csv(cpu_dir + "/" + app + "/sql_to_stage_information.csv")
cpu_stage_times = cpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']]

Expand Down Expand Up @@ -92,12 +107,8 @@
app_name = app_info.loc[0]["appName"]
gpu_stage_log[app_name] = {}

# - process sql_plan_metrics_for_application.csv
# - load in "duration" (CPU) or "op time" (GPU)
# - mapping_info.parent = sql_times.nodeName
gpu_sql_info = pd.read_csv(gpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv")
gpu_sql_times = gpu_sql_info[gpu_sql_info["name"] == "op time"]

# - process sql_to_stage_information.csv to get stage durations
# - split up duration by operators listed in each stage
gpu_stage_info = pd.read_csv(gpu_dir + "/" + app + "/sql_to_stage_information.csv")
gpu_stage_times = gpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']]

Expand All @@ -111,41 +122,76 @@
else:
gpu_stage_log[app_name][op_key] = duration

# Sum up SQL operators for each
stage_totals = {}
cpu_stage_totals = {}
gpu_stage_totals = {}
cpu_stage_total = 0.0
gpu_stage_total = 0.0

# Sum up SQL operators for each operator found in CPU and GPU
for app_key in cpu_stage_log:
for op_key in cpu_stage_log[app_key]:
if op_key not in stage_totals:
stage_totals[op_key] = cpu_stage_log[app_key][op_key]
if op_key not in cpu_stage_totals:
cpu_stage_totals[op_key] = cpu_stage_log[app_key][op_key]
else:
stage_totals[op_key] = stage_totals[op_key] + cpu_stage_log[app_key][op_key]
cpu_stage_totals[op_key] = cpu_stage_totals[op_key] + cpu_stage_log[app_key][op_key]
cpu_stage_total = cpu_stage_total + cpu_stage_log[app_key][op_key]


for app_key in gpu_stage_log:
for op_key in gpu_stage_log[app_key]:
if op_key not in stage_totals:
stage_totals[op_key] = gpu_stage_log[app_key][op_key]
if op_key not in gpu_stage_totals:
gpu_stage_totals[op_key] = gpu_stage_log[app_key][op_key]
else:
stage_totals[op_key] = stage_totals[op_key] + gpu_stage_log[app_key][op_key]
gpu_stage_totals[op_key] = gpu_stage_totals[op_key] + gpu_stage_log[app_key][op_key]
gpu_stage_total = gpu_stage_total + gpu_stage_log[app_key][op_key]

# Create dictionary of execs where speedup factors can be calculated
scores_dict = {}

if 'Filter' in cpu_stage_totals and 'GpuFilter' in gpu_stage_totals:
scores_dict["FilterExec"] = str(round(cpu_stage_totals['Filter'] / gpu_stage_totals['GpuFilter'], 2))
if 'SortMergeJoin' in cpu_stage_totals and 'GpuShuffledHashJoin' in gpu_stage_totals:
scores_dict["SortExec"] = str(round(cpu_stage_totals['SortMergeJoin'] / gpu_stage_totals['GpuShuffledHashJoin'], 2))
if 'BroadcastHashJoin' in cpu_stage_totals and 'GpuBroadcastHashJoin' in gpu_stage_totals:
scores_dict["BroadcastHashJoinExec"] = str(round(cpu_stage_totals['BroadcastHashJoin'] / gpu_stage_totals['GpuBroadcastHashJoin'], 2))
if 'Exchange' in cpu_stage_totals and 'GpuColumnarExchange' in gpu_stage_totals:
scores_dict["ShuffleExchangeExec"] = str(round(cpu_stage_totals['Exchange'] / gpu_stage_totals['GpuColumnarExchange'], 2))
if 'HashAggregate' in cpu_stage_totals and 'GpuHashAggregate' in gpu_stage_totals:
scores_dict["HashAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2))
if all(cpu_keys in cpu_stage_totals for cpu_keys in ('SortMergeJoin', 'Sort' )) and all(gpu_keys in gpu_stage_totals for gpu_keys in ('GpuShuffledHashJoin', 'GpuSort')):
scores_dict["SortMergeJoinExec"] = str(round((cpu_stage_totals['SortMergeJoin'] + cpu_stage_totals['Sort']) / (gpu_stage_totals['GpuShuffledHashJoin'] + gpu_stage_totals['GpuSort']), 2))

overall_speedup = str(round(cpu_stage_total/gpu_stage_total, 2))

# Print out node metrics (if verbose)
if verbose:
print("# Operator metrics ")
for key in stage_totals:
print(key + "," + str(stage_totals[key]))
print("CPU Total," + str(cpu_stage_total))
print("GPU Total," + str(gpu_stage_total))

# Print out speedup factors
print("# Speedup Factors ")
print("FilterExec," + str(round(stage_totals['Filter'] / stage_totals['GpuFilter'], 2)))
print("SortExec," + str(round(stage_totals['SortMergeJoin'] / stage_totals['GpuShuffledHashJoin'], 2)))
print("BroadcastHashJoinExec," + str(round(stage_totals['BroadcastHashJoin'] / stage_totals['GpuBroadcastHashJoin'], 2)))
print("ShuffleExchangeExec," + str(round(stage_totals['Exchange'] / stage_totals['GpuColumnarExchange'], 2)))
print("HashAggregateExec," + str(round(stage_totals['HashAggregate'] / stage_totals['GpuHashAggregate'], 2)))
print("SortMergeJoinExec," + str(round((stage_totals['SortMergeJoin']+stage_totals['Sort']) / (stage_totals['GpuShuffledHashJoin']+stage_totals['GpuSort']), 2)))
print("# CPU Operator Metrics")
for key in cpu_stage_totals:
print(key + " = " + str(cpu_stage_totals[key]))
print("# GPU Operator Metrics")
for key in gpu_stage_totals:
print(key + " = " + str(gpu_stage_totals[key]))
print("# Summary Metrics")
print("CPU Total = " + str(cpu_stage_total))
print("GPU Total = " + str(gpu_stage_total))
print("Overall speedup = " + overall_speedup)

# Print out individual exec speedup factors
print("# Speedup Factors ")
for key in scores_dict:
print(f"{key} = {scores_dict[key]}")

# Load in list of operators and set initial values to default speedup
scores_df = pd.read_csv("operatorsList.csv")
scores_df["Score"] = overall_speedup

# Update operators that are found in benchmark
for key in scores_dict:
scores_df.loc[scores_df['CPUOperator'] == key, 'Score'] = scores_dict[key]

# Add in hard-coded defaults
defaults_df = pd.read_csv("defaultScores.csv")

# Generate output CSV file
final_df = pd.concat([scores_df, defaults_df])
final_df.to_csv(output, index=False)
Loading

0 comments on commit a52803a

Please sign in to comment.