diff --git a/.github/workflows/mvn-verify-check.yml b/.github/workflows/mvn-verify-check.yml index b9b9e11c9..12bcb9d79 100644 --- a/.github/workflows/mvn-verify-check.yml +++ b/.github/workflows/mvn-verify-check.yml @@ -22,7 +22,9 @@ on: jobs: build: runs-on: ubuntu-latest - + strategy: + matrix: + spark-version: ['311', '320', '330', '341'] steps: - uses: actions/checkout@v3 @@ -32,5 +34,5 @@ jobs: distribution: adopt java-version: 8 - - name: Run mvn verify - run: cd core && mvn verify + - name: Run mvn verify with Spark ${{ matrix.spark-version }} + run: cd core && mvn -Dbuildver=${{ matrix.spark-version }} verify diff --git a/.github/workflows/python-unit-test.yml b/.github/workflows/python-unit-test.yml index e10911978..f29f3caea 100644 --- a/.github/workflows/python-unit-test.yml +++ b/.github/workflows/python-unit-test.yml @@ -22,20 +22,23 @@ on: jobs: build: runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.8', '3.9', '3.10'] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v3 - - name: Set up Python - uses: actions/setup-python@v2 - with: - python-version: '3.8' + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} - - name: Install tox - run: | - python -m pip install --upgrade pip - python -m pip install tox - python -m pip install --pre tox-gh-actions + - name: Install tox + run: | + python -m pip install --upgrade pip + python -m pip install tox + python -m pip install --pre tox-gh-actions - - name: Run tox test - run: cd user_tools && tox -e pylint,flake8,python3.8 -- tests/test_diagnostic.py + - name: Run tox test + run: cd user_tools && tox diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e6b776e4f..002bf95ba 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,7 +16,9 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 + with: + token: ${{ secrets.NVAUTO_TOKEN }} - name: Build Changelog id: build_changelog @@ -25,7 +27,7 @@ jobs: configuration: ".github/workflows/configuration.json" # Configuration file for the changelog builder (optional)z outputFile: "CHANGELOG_BODY.md" env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.NVAUTO_TOKEN }} - name: Commit and Push Changelog if: steps.build_changelog.outputs.changes > 0 @@ -38,15 +40,15 @@ jobs: cat CURRENT_CHANGELOG.md CHANGELOG_BODY.md >> TEMP_CHANGELOG.md cat TEMP_CHANGELOG.md CHANGELOG.md > NEW_CHANGELOG.md - git config user.name "GitHub Actions" - git config user.email "actions@github.com" + git config user.name ${{ secrets.NVAUTO_USER }} + git config user.email ${{ secrets.NVAUTO_EMAIL }} git fetch origin main git checkout main mv NEW_CHANGELOG.md CHANGELOG.md git add CHANGELOG.md - git commit -m "Update changelogs" - git push origin main + git commit -s -m "Update changelogs" + git push -f https://nvauto:${{ secrets.NVAUTO_TOKEN }}@github.com/${GITHUB_REPOSITORY}.git main - name: Set Version Number id: set_version diff --git a/core/pom.xml b/core/pom.xml index 14af8997a..0a93b6cea 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -23,7 +23,7 @@ rapids-4-spark-tools_2.12 RAPIDS Accelerator for Apache Spark tools RAPIDS Accelerator for Apache Spark tools - 23.06.4-SNAPSHOT + 23.08.0-SNAPSHOT jar http://github.com/NVIDIA/spark-rapids-tools @@ -327,7 +327,7 @@ 3.3.2 3.3.3-SNAPSHOT 3.4.0 - 3.4.1-SNAPSHOT + 3.4.1 3.5.0-SNAPSHOT 2.12 4.3.0 @@ -361,7 +361,7 @@ 2.1.1 2.2.0 2.3.0 - 2.4.0rc1 + 2.4.0 ${delta10x.version} 1.8 3.11.0 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala index 8af8dacb2..6a974cf21 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/AutoTuner.scala @@ -763,8 +763,6 @@ class AutoTuner( && appInfoProvider.getRedundantReadSize > DEF_READ_SIZE_THRESHOLD) { appendRecommendation("spark.rapids.filecache.enabled", "true") appendComment("Enable file cache only if Spark local disks bandwidth is > 1 GB/s") - } else { - null } } diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index e20affa40..59c55a58e 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1154,7 +1154,16 @@ class QualificationSuite extends BaseTestSuite { try { val lines = inputSource.getLines.toSeq // 1 for header, 1 for values - assert(lines.size == 6) + + val expLinesSize = + if (ToolUtils.isSpark340OrLater()) { + 8 + } else if (!ToolUtils.isSpark320OrLater()) { + 6 + } else { + 7 + } + assert(lines.size == expLinesSize) assert(lines.head.contains("App ID,Unsupported Type,")) assert(lines(1).contains("\"Read\",\"JSON\",\"Types not supported - bigint:int\"")) } finally { diff --git a/user_tools/custom_speedup_factors/README.md b/user_tools/custom_speedup_factors/README.md index 118c81099..d2f3afa2d 100644 --- a/user_tools/custom_speedup_factors/README.md +++ b/user_tools/custom_speedup_factors/README.md @@ -24,16 +24,16 @@ spark_rapids_user_tools onprem profiling --csv --eventlogs CPU-3k --local_folder spark_rapids_user_tools onprem profiling --csv --eventlogs GPU-3k --local_folder GPU-3k-profile ``` 3. Speedup factor generation - 1. Run the speedup factor generation script, passing the CPU and GPU profiler output. + 1. Run the speedup factor generation script, passing the CPU and GPU profiler output along with a CSV output filename. ``` -python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile +python generate_speedup_factors.py --cpu CPU-3k-profile/rapids_4_spark_profile --gpu GPU-3k-profile/rapids_4_spark_profile --output newScores.csv ``` -The output will showcase what operators were detected in the benchmarks to be used as custom speedups. You can then update values from the default [operatorsScore.csv](https://github.com/NVIDIA/spark-rapids-tools/blob/dev/core/src/main/resources/operatorsScore.csv) file to create your own version with the custom speedup factors generated by the output. +The script will generate the new scores in the output specified by the `--output` argument. ## Running Workload Qualification with Custom Speedup Factors Now that you have a custom *operatorsScore.csv* file, you can run the Spark RAPIDS qualification tool using it to get estimations applicable for your environment. Here is the command to run with a custom speedup factor file: ``` -spark_rapids_user_tools onprem qualification --speedup-factor-file operatorsScore.csv --eventlogs +spark_rapids_user_tools onprem qualification --speedup-factor-file newScores.csv --eventlogs ``` diff --git a/user_tools/custom_speedup_factors/defaultScores.csv b/user_tools/custom_speedup_factors/defaultScores.csv new file mode 100644 index 000000000..b958090b8 --- /dev/null +++ b/user_tools/custom_speedup_factors/defaultScores.csv @@ -0,0 +1,18 @@ +CPUOperator,Score +AggregateInPandasExec,1.2 +ArrowEvalPythonExec,1.2 +FlatMapGroupsInPandasExec,1.2 +MapInPandasExec,1.2 +WindowInPandasExec,1.2 +KMeans-pyspark,8.86 +KMeans-scala,1 +PCA-pyspark,2.24 +PCA-scala,2.69 +LinearRegression-pyspark,2 +LinearRegression-scala,1 +RandomForestClassifier-pyspark,6.31 +RandomForestClassifier-scala,1 +RandomForestRegressor-pyspark,3.66 +RandomForestRegressor-scala,1 +XGBoost-pyspark,1 +XGBoost-scala,3.31 diff --git a/user_tools/custom_speedup_factors/generate_speedup_factors.py b/user_tools/custom_speedup_factors/generate_speedup_factors.py index 5224c588a..8ce04e566 100644 --- a/user_tools/custom_speedup_factors/generate_speedup_factors.py +++ b/user_tools/custom_speedup_factors/generate_speedup_factors.py @@ -27,12 +27,14 @@ parser = argparse.ArgumentParser(description="Speedup Factor Analysis") parser.add_argument("--cpu", type=str, help="Directory of CPU profiler logs", required=True) parser.add_argument("--gpu", type=str, help="Directory of GPU profiler logs", required=True) +parser.add_argument("--output", type=str, help="Filename for custom speedup factors", required=True) parser.add_argument("--verbose", action="store_true", help="flag to generate full verbose output for logging raw node results") parser.add_argument("--chdir", action="store_true", help="flag to change to work dir that's the script located") args = parser.parse_args() cpu_dir = args.cpu gpu_dir = args.gpu +output = args.output verbose = args.verbose cpu_stage_log = {} @@ -55,13 +57,26 @@ mapping_info = mapping_info.groupby(['SQL Node'])['Child Node'].apply(','.join).reset_index() # - process sql_plan_metrics_for_application.csv - # - load in "duration" (CPU) or "op time" (GPU) + # - load in "duration" (CPU) # - replace WholeStageCodegen (CPU only) with list of operators from mapping lookup file # - mapping_info.parent = sql_times.nodeName cpu_sql_info = pd.read_csv(cpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv") cpu_sql_times = cpu_sql_info[cpu_sql_info["name"] == "duration"] cpu_sql_combined = cpu_sql_times.set_index('nodeName').join(mapping_info.set_index('SQL Node'), how='left') + # - parse WholeStageCodegen durations with child node mapping + cpu_sql_times_df = cpu_sql_combined[['Child Node', 'max_value']] + + for index, row in cpu_sql_times_df.iterrows(): + operators = str(row['Child Node']).split(',') + duration = row['max_value']/len(operators)/1000.0 + for operator in operators: + if operator in cpu_stage_log[app_name]: + cpu_stage_log[app_name][operator] = cpu_stage_log[app_name][operator] + duration + else: + cpu_stage_log[app_name][operator] = duration + + # - parse top-level execs from sql_to_stage_information.csv cpu_stage_info = pd.read_csv(cpu_dir + "/" + app + "/sql_to_stage_information.csv") cpu_stage_times = cpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']] @@ -92,12 +107,8 @@ app_name = app_info.loc[0]["appName"] gpu_stage_log[app_name] = {} - # - process sql_plan_metrics_for_application.csv - # - load in "duration" (CPU) or "op time" (GPU) - # - mapping_info.parent = sql_times.nodeName - gpu_sql_info = pd.read_csv(gpu_dir + "/" + app + "/sql_plan_metrics_for_application.csv") - gpu_sql_times = gpu_sql_info[gpu_sql_info["name"] == "op time"] - + # - process sql_to_stage_information.csv to get stage durations + # - split up duration by operators listed in each stage gpu_stage_info = pd.read_csv(gpu_dir + "/" + app + "/sql_to_stage_information.csv") gpu_stage_times = gpu_stage_info[['Stage Duration', 'SQL Nodes(IDs)']] @@ -111,41 +122,76 @@ else: gpu_stage_log[app_name][op_key] = duration -# Sum up SQL operators for each -stage_totals = {} +cpu_stage_totals = {} +gpu_stage_totals = {} cpu_stage_total = 0.0 gpu_stage_total = 0.0 +# Sum up SQL operators for each operator found in CPU and GPU for app_key in cpu_stage_log: for op_key in cpu_stage_log[app_key]: - if op_key not in stage_totals: - stage_totals[op_key] = cpu_stage_log[app_key][op_key] + if op_key not in cpu_stage_totals: + cpu_stage_totals[op_key] = cpu_stage_log[app_key][op_key] else: - stage_totals[op_key] = stage_totals[op_key] + cpu_stage_log[app_key][op_key] + cpu_stage_totals[op_key] = cpu_stage_totals[op_key] + cpu_stage_log[app_key][op_key] cpu_stage_total = cpu_stage_total + cpu_stage_log[app_key][op_key] for app_key in gpu_stage_log: for op_key in gpu_stage_log[app_key]: - if op_key not in stage_totals: - stage_totals[op_key] = gpu_stage_log[app_key][op_key] + if op_key not in gpu_stage_totals: + gpu_stage_totals[op_key] = gpu_stage_log[app_key][op_key] else: - stage_totals[op_key] = stage_totals[op_key] + gpu_stage_log[app_key][op_key] + gpu_stage_totals[op_key] = gpu_stage_totals[op_key] + gpu_stage_log[app_key][op_key] gpu_stage_total = gpu_stage_total + gpu_stage_log[app_key][op_key] +# Create dictionary of execs where speedup factors can be calculated +scores_dict = {} + +if 'Filter' in cpu_stage_totals and 'GpuFilter' in gpu_stage_totals: + scores_dict["FilterExec"] = str(round(cpu_stage_totals['Filter'] / gpu_stage_totals['GpuFilter'], 2)) +if 'SortMergeJoin' in cpu_stage_totals and 'GpuShuffledHashJoin' in gpu_stage_totals: + scores_dict["SortExec"] = str(round(cpu_stage_totals['SortMergeJoin'] / gpu_stage_totals['GpuShuffledHashJoin'], 2)) +if 'BroadcastHashJoin' in cpu_stage_totals and 'GpuBroadcastHashJoin' in gpu_stage_totals: + scores_dict["BroadcastHashJoinExec"] = str(round(cpu_stage_totals['BroadcastHashJoin'] / gpu_stage_totals['GpuBroadcastHashJoin'], 2)) +if 'Exchange' in cpu_stage_totals and 'GpuColumnarExchange' in gpu_stage_totals: + scores_dict["ShuffleExchangeExec"] = str(round(cpu_stage_totals['Exchange'] / gpu_stage_totals['GpuColumnarExchange'], 2)) +if 'HashAggregate' in cpu_stage_totals and 'GpuHashAggregate' in gpu_stage_totals: + scores_dict["HashAggregateExec"] = str(round(cpu_stage_totals['HashAggregate'] / gpu_stage_totals['GpuHashAggregate'], 2)) +if all(cpu_keys in cpu_stage_totals for cpu_keys in ('SortMergeJoin', 'Sort' )) and all(gpu_keys in gpu_stage_totals for gpu_keys in ('GpuShuffledHashJoin', 'GpuSort')): + scores_dict["SortMergeJoinExec"] = str(round((cpu_stage_totals['SortMergeJoin'] + cpu_stage_totals['Sort']) / (gpu_stage_totals['GpuShuffledHashJoin'] + gpu_stage_totals['GpuSort']), 2)) + +overall_speedup = str(round(cpu_stage_total/gpu_stage_total, 2)) + # Print out node metrics (if verbose) if verbose: - print("# Operator metrics ") - for key in stage_totals: - print(key + "," + str(stage_totals[key])) - print("CPU Total," + str(cpu_stage_total)) - print("GPU Total," + str(gpu_stage_total)) - -# Print out speedup factors -print("# Speedup Factors ") -print("FilterExec," + str(round(stage_totals['Filter'] / stage_totals['GpuFilter'], 2))) -print("SortExec," + str(round(stage_totals['SortMergeJoin'] / stage_totals['GpuShuffledHashJoin'], 2))) -print("BroadcastHashJoinExec," + str(round(stage_totals['BroadcastHashJoin'] / stage_totals['GpuBroadcastHashJoin'], 2))) -print("ShuffleExchangeExec," + str(round(stage_totals['Exchange'] / stage_totals['GpuColumnarExchange'], 2))) -print("HashAggregateExec," + str(round(stage_totals['HashAggregate'] / stage_totals['GpuHashAggregate'], 2))) -print("SortMergeJoinExec," + str(round((stage_totals['SortMergeJoin']+stage_totals['Sort']) / (stage_totals['GpuShuffledHashJoin']+stage_totals['GpuSort']), 2))) + print("# CPU Operator Metrics") + for key in cpu_stage_totals: + print(key + " = " + str(cpu_stage_totals[key])) + print("# GPU Operator Metrics") + for key in gpu_stage_totals: + print(key + " = " + str(gpu_stage_totals[key])) + print("# Summary Metrics") + print("CPU Total = " + str(cpu_stage_total)) + print("GPU Total = " + str(gpu_stage_total)) + print("Overall speedup = " + overall_speedup) + + # Print out individual exec speedup factors + print("# Speedup Factors ") + for key in scores_dict: + print(f"{key} = {scores_dict[key]}") + +# Load in list of operators and set initial values to default speedup +scores_df = pd.read_csv("operatorsList.csv") +scores_df["Score"] = overall_speedup + +# Update operators that are found in benchmark +for key in scores_dict: + scores_df.loc[scores_df['CPUOperator'] == key, 'Score'] = scores_dict[key] + +# Add in hard-coded defaults +defaults_df = pd.read_csv("defaultScores.csv") + +# Generate output CSV file +final_df = pd.concat([scores_df, defaults_df]) +final_df.to_csv(output, index=False) diff --git a/user_tools/custom_speedup_factors/operatorsList.csv b/user_tools/custom_speedup_factors/operatorsList.csv new file mode 100644 index 000000000..9e04f5c67 --- /dev/null +++ b/user_tools/custom_speedup_factors/operatorsList.csv @@ -0,0 +1,247 @@ +CPUOperator +CoalesceExec +CollectLimitExec +ExpandExec +FileSourceScanExec +FilterExec +GenerateExec +GlobalLimitExec +LocalLimitExec +ProjectExec +RangeExec +SampleExec +SortExec +SubqueryBroadcastExec +TakeOrderedAndProjectExec +UnionExec +CustomShuffleReaderExec +HashAggregateExec +ObjectHashAggregateExec +SortAggregateExec +InMemoryTableScanExec +DataWritingCommandExec +ExecutedCommandExec +BatchScanExec +BroadcastExchangeExec +ShuffleExchangeExec +BroadcastHashJoinExec +BroadcastNestedLoopJoinExec +CartesianProductExec +ShuffledHashJoinExec +SortMergeJoinExec +FlatMapCoGroupsInPandasExec +WindowExec +HiveTableScanExec +Abs +Acos +Acosh +Add +AggregateExpression +Alias +And +ApproximatePercentile +ArrayContains +ArrayExcept +ArrayExists +ArrayIntersect +ArrayMax +ArrayMin +ArrayRemove +ArrayRepeat +ArrayTransform +ArrayUnion +ArraysOverlap +ArraysZip +Asin +Asinh +AtLeastNNonNulls +Atan +Atanh +AttributeReference +Average +BRound +BitLength +BitwiseAnd +BitwiseNot +BitwiseOr +BitwiseXor +CaseWhen +Cbrt +Ceil +CheckOverflow +Coalesce +CollectList +CollectSet +Concat +ConcatWs +Contains +Cos +Cosh +Cot +Count +CreateArray +CreateMap +CreateNamedStruct +CurrentRow$ +DateAdd +DateAddInterval +DateDiff +DateFormatClass +DateSub +DayOfMonth +DayOfWeek +DayOfYear +DenseRank +Divide +ElementAt +EndsWith +EqualNullSafe +EqualTo +Exp +Explode +Expm1 +First +Floor +FromUTCTimestamp +FromUnixTime +GetArrayItem +GetArrayStructFields +GetJsonObject +GetMapValue +GetStructField +GetTimestamp +GreaterThan +GreaterThanOrEqual +Greatest +HiveGenericUDF +HiveSimpleUDF +Hour +Hypot +If +In +InSet +InitCap +InputFileBlockLength +InputFileBlockStart +InputFileName +IntegralDivide +IsNaN +IsNotNull +IsNull +KnownFloatingPointNormalized +KnownNotNull +Lag +LambdaFunction +Last +LastDay +Lead +Least +Length +LessThan +LessThanOrEqual +Like +Literal +Log +Log10 +Log1p +Log2 +Logarithm +Lower +MakeDecimal +MapConcat +MapEntries +MapFilter +MapKeys +MapValues +Max +Md5 +Min +Minute +MonotonicallyIncreasingID +Month +Multiply +Murmur3Hash +NaNvl +NamedLambdaVariable +NormalizeNaNAndZero +Not +NthValue +OctetLength +Or +PercentRank +PivotFirst +Pmod +PosExplode +Pow +PreciseTimestampConversion +PromotePrecision +PythonUDF +Quarter +RLike +RaiseError +Rand +Rank +RegExpExtract +RegExpExtractAll +RegExpReplace +Remainder +ReplicateRows +Reverse +Rint +Round +RowNumber +ScalaUDF +ScalarSubquery +Second +Sequence +ShiftLeft +ShiftRight +ShiftRightUnsigned +Signum +Sin +Sinh +Size +SortArray +SortOrder +SparkPartitionID +SpecifiedWindowFrame +Sqrt +StartsWith +StddevPop +StddevSamp +StringInstr +StringLPad +StringLocate +StringRPad +StringRepeat +StringReplace +StringSplit +StringToMap +StringTrim +StringTrimLeft +StringTrimRight +Substring +SubstringIndex +Subtract +Sum +Tan +Tanh +TimeAdd +ToDegrees +ToRadians +ToUnixTimestamp +TransformKeys +TransformValues +UnaryMinus +UnaryPositive +UnboundedFollowing$ +UnboundedPreceding$ +UnixTimestamp +UnscaledValue +Upper +VariancePop +VarianceSamp +WeekDay +WindowExpression +WindowSpecDefinition +Year diff --git a/user_tools/docs/index.md b/user_tools/docs/index.md index d37d95ead..48481b004 100644 --- a/user_tools/docs/index.md +++ b/user_tools/docs/index.md @@ -74,7 +74,8 @@ The following table summarizes the commands supported for each cloud platform: | EMR | qualification | spark_rapids_user_tools \ | 23.02+ | | | | emr qualification [ARGS] | | | +---------------+-----------------------------------------+----------+ -| | profiling | N/A | TBD | +| | profiling | spark_rapids_user_tools \ | 23.08+ | +| | | emr profiling [ARGS] | | | +---------------+-----------------------------------------+----------+ | | bootstrap | spark_rapids_user_tools \ | 23.02+ | | | | emr bootstrap [ARGS] | | diff --git a/user_tools/docs/user-tools-aws-emr.md b/user_tools/docs/user-tools-aws-emr.md index d65f6fe91..da464f591 100644 --- a/user_tools/docs/user-tools-aws-emr.md +++ b/user_tools/docs/user-tools-aws-emr.md @@ -161,6 +161,148 @@ The command creates a directory with UUID that contains the following: └── ui ``` +## Profiling command + +### Local deployment + +``` +spark_rapids_user_tools emr profiling [options] +spark_rapids_user_tools emr profiling -- --help +``` + +The local deployment runs on the local development machine. It requires: +1. Installing and configuring the AWS CLI +2. Java 1.8+ development environment +3. Internet access to download JAR dependencies from mvn: `spark-*.jar`, `hadoop-aws-*.jar`, and `aws-java-sdk-bundle*.jar` +4. Dependencies are cached on the local disk to reduce the overhead of the download. + +#### Command options + +| Option | Description | Default | Required | +|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------| +| **gpu_cluster** | The EMR-cluster on which the Spark applications were executed. The argument can be an EMR-cluster or a valid path to the cluster's properties file (json format) generated by the AWS CLI command `aws emr describe-cluster` | If missing, then the argument worker_info has to be provided. | N | +| **worker_info** | A path pointing to a yaml file containing the system information of a worker node. It is assumed that all workers are homogenous. The format of the file is described in the following section. | None | N | +| **eventlogs** | A comma seperated list of S3 urls pointing to event logs or S3 directory | Reads the Spark's property `spark.eventLog.dir` defined in `gpu_cluster`. This property should be included in the output of `emr describe-cluster`. Note that the wrapper will raise an exception if the property is not set. | N | +| **remote_folder** | The S3 folder where the output of the wrapper's output is copied. If missing, the output will be available only on local disk | N/A | N | +| **local_folder** | Local work-directory path to store the output and to be used as root directory for temporary folders/files. The final output will go into a subdirectory named `prof-${EXEC_ID}` where `exec_id` is an auto-generated unique identifier of the execution. | If the argument is NONE, the default value is the env variable `RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY` if any; or the current working directory. | N | +| **profile** | A named AWS profile to get the settings/credentials of the AWS account. | "DEFAULT" | N | +| **jvm_heap_size** | The maximum heap size of the JVM in gigabytes | 24 | N | +| **tools_jar** | Path to a bundled jar including RAPIDS tool. The path is a local filesystem, or remote S3 url | Downloads the latest `rapids-4-spark-tools_*.jar` from mvn repo | N | +| **verbose** | True or False to enable verbosity to the wrapper script | False if `RAPIDS_USER_TOOLS_LOG_DEBUG` is not set | N | +| **rapids_options**** | A list of valid [Profiling tool options](../../core/docs/spark-profiling-tool.md#qualification-tool-options). Note that (`output-directory`, `auto-tuner`, `combined`) flags are ignored | N/A | N | + +If the CLI does not provide an argument `gpu_cluster`, then a valid path to yaml file must be +provided through the arg `worker_info`. +The `worker_info` is a yaml file that contains the HW description of the workers. It must contain +the following properties: +- `system.numCores`: number of cores of a single worker node +- `system.memory`: RAM size in MiB of a single node +- `system.numWorkers`: number of workers +- `gpu.name`: the accelerator installed on the worker node +- `gpu.memory`: memory size of the accelerator in MiB. (i.e., 16GB for Nvidia-T4) +- `softwareProperties`: Spark default-configurations of the target cluster + +An example of valid `worker_info.yaml`: + + ``` + system: + numCores: 32 + memory: 212992MiB + numWorkers: 5 + gpu: + memory: 15109MiB + count: 4 + name: T4 + softwareProperties: + spark.driver.maxResultSize: 7680m + spark.driver.memory: 15360m + spark.executor.cores: '8' + spark.executor.instances: '2' + spark.executor.memory: 47222m + spark.executorEnv.OPENBLAS_NUM_THREADS: '1' + spark.scheduler.mode: FAIR + spark.sql.cbo.enabled: 'true' + spark.ui.port: '0' + spark.yarn.am.memory: 640m + ``` + +#### Use case scenario + +A typical workflow to successfully run the `profiling` command in local mode is described as follows: + +1. Store the Apache Spark event logs in S3 folder. +2. A user sets up his development machine: + 1. configures Java + 2. installs AWS CLI and configures the profile and the credentials to make sure the AWS CLI + commands can access the S3 resources `LOGS_BUCKET`. + 3. installs `spark_rapids_user_tools` +3. If the results of the wrapper need to be stored on S3, then another S3 uri is required `REMOTE_FOLDER=s3://OUT_BUCKET/` +4. Depending on the accessibility of the cluster properties, the user chooses one of the 2 cases below (_"Case-A"_, and _"Case-B"_) to trigger the CLI. + +For each successful execution, the wrapper generates a new directory in the format of +`prof__<0x%08X>`. The directory contains `profiling_summary.log` in addition to +the actual folder of the RAPIDS Profiling tool. The directory will be mirrored to S3 folder if the +argument `--remote_folder` was a valid S3 path. + + ``` + ./prof__<0x%08X>/profiling_summary.log + ./prof__<0x%08X>/rapids_4_spark_profile/ + ``` + +**Case-A: A gpu-cluster property file is accessible:** + +A cluster property is still accessible if one of the following conditions applies: + +1. The cluster is listed by the `aws emr list-clusters` cmd. In this case, the CLI will be triggered by providing + `--gpu_cluster $CLUSTER_NAME` + + ``` + # run the command using the GPU cluster name + export RAPIDS_USER_TOOLS_CACHE_FOLDER=my_cache_folder + export EVENTLOGS=s3://LOGS_BUCKET/eventlogs/ + export CLUSTER_NAME=my-emr-gpu-cluster + export REMOTE_FOLDER=s3://OUT_BUCKET/wrapper_output + + spark_rapids_user_tools emr profiling \ + --eventlogs $EVENTLOGS \ + --gpu_cluster $CLUSTER_NAME \ + --remote_folder $REMOTE_FOLDER + ``` +2. The cluster properties file is accessible on local disk or a valid S3 path. + + ``` + $> export CLUSTER_PROPS_FILE=cluster-props.json + $> aws emr describe-cluster --cluster-id $(aws emr list-clusters --query 'Clusters[?Name==$CLUSTER_NAME].Id' --output text) > $CLUSTER_PROPS_FILE + ``` + Trigger the CLI by providing the path to the properties file `--gpu_cluster $CLUSTER_PROPS_FILE` + + ``` + $> spark_rapids_user_tools emr profiling \ + --eventlogs $EVENTLOGS \ + --gpu_cluster $CLUSTER_PROPS_FILE \ + --remote_folder $REMOTE_FOLDER + ``` + +**Case-B: GPU cluster information is missing:** + +In this scenario, users can write down a simple yaml file to describe the shape of the worker nodes. +This case is relevant to the following plans: +1. Users who might want to experiment with different configurations before deciding on the final + cluster shape. +2. Users who have no access to the properties of the cluster. + +The CLI is triggered by providing the location where the yaml file is stored `--worker_info $WORKER_INFO_PATH` + + ``` + # First, create a yaml file as described in previous section + $> export WORKER_INFO_PATH=worker-info.yaml + # Run the profiling cmd + $> spark_rapids_user_tools emr profiling \ + --eventlogs $EVENTLOGS \ + --worker_info $WORKER_INFO_PATH \ + --remote_folder $REMOTE_FOLDER + ``` + ## Bootstrap command ``` diff --git a/user_tools/src/spark_rapids_pytools/__init__.py b/user_tools/src/spark_rapids_pytools/__init__.py index f678a18d6..ebfe23cb0 100644 --- a/user_tools/src/spark_rapids_pytools/__init__.py +++ b/user_tools/src/spark_rapids_pytools/__init__.py @@ -16,5 +16,5 @@ from spark_rapids_pytools.build import get_version -VERSION = '23.06.4' +VERSION = '23.08.0' __version__ = get_version(VERSION) diff --git a/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py b/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py index 082ff4738..c390b9d88 100644 --- a/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py +++ b/user_tools/src/spark_rapids_pytools/wrappers/emr_wrapper.py @@ -21,6 +21,7 @@ from spark_rapids_pytools.rapids.diagnostic import Diagnostic from spark_rapids_pytools.rapids.qualification import QualFilterApp, QualificationAsLocal, \ QualGpuClusterReshapeType +from spark_rapids_pytools.rapids.profiling import ProfilingAsLocal class CliEmrLocalMode: # pylint: disable=too-few-public-methods @@ -119,6 +120,77 @@ def qualification(cpu_cluster: str = None, wrapper_options=wrapper_qual_options, rapids_options=rapids_options).launch() + @staticmethod + def profiling(gpu_cluster: str = None, + worker_info: str = None, + eventlogs: str = None, + profile: str = None, + local_folder: str = None, + remote_folder: str = None, + tools_jar: str = None, + jvm_heap_size: int = 24, + verbose: bool = False, + **rapids_options) -> None: + """ + The Profiling tool analyzes both CPU or GPU generated event logs and generates information + which can be used for debugging and profiling Apache Spark applications. + + :param gpu_cluster: The EMR-cluster on which the Spark applications were executed. The argument + can be an EMR-cluster or a valid path to the cluster's properties file (json format) + generated by the AWS CLI. If missing, then the argument worker_info has to be provided. + :param worker_info: A path pointing to a yaml file containing the system information of a + worker node. It is assumed that all workers are homogenous. + If missing, the wrapper pulls the worker info from the "gpu_cluster". + :param eventlogs: Event log filenames or S3 storage directories + containing event logs (comma separated). If missing, the wrapper Reads the Spark's + property `spark.eventLog.dir` defined in `gpu_cluster`. This property should be included + in the output of `aws emr describe-cluster`. Note that the wrapper will raise an exception + if the property is not set. + :param profile: A named AWS profile to get the settings/credentials of the AWS account. + :param local_folder: Local work-directory path to store the output and to be used as root + directory for temporary folders/files. The final output will go into a subdirectory called + ${local_folder}/prof-${EXEC_ID} where exec_id is an auto-generated unique identifier of the + execution. If the argument is NONE, the default value is the env variable + RAPIDS_USER_TOOLS_OUTPUT_DIRECTORY if any; or the current working directory. + :param remote_folder: A S3 folder where the output is uploaded at the end of execution. + If no value is provided, the output will be only available on local disk. + :param tools_jar: Path to a bundled jar including Rapids tool. The path is a local filesystem, + or remote S3 url. If missing, the wrapper downloads the latest rapids-4-spark-tools_*.jar + from maven repo. + :param verbose: True or False to enable verbosity to the wrapper script. + :param jvm_heap_size: The maximum heap size of the JVM in gigabytes. + :param rapids_options: A list of valid Profiling tool options. + Note that the wrapper ignores ["output-directory", "worker-info"] flags, and it does not support + multiple "spark-property" arguments. + For more details on Profiling tool options, please visit + https://nvidia.github.io/spark-rapids/docs/spark-profiling-tool.html#profiling-tool-options + """ + if verbose: + # when debug is set to true set it in the environment. + ToolLogging.enable_debug_mode() + wrapper_prof_options = { + 'platformOpts': { + 'profile': profile, + 'deployMode': DeployMode.LOCAL, + }, + 'migrationClustersProps': { + 'gpuCluster': gpu_cluster + }, + 'jobSubmissionProps': { + 'remoteFolder': remote_folder, + 'platformArgs': { + 'jvmMaxHeapSize': jvm_heap_size + } + }, + 'eventlogs': eventlogs, + 'toolsJar': tools_jar, + 'autoTunerFileInput': worker_info + } + ProfilingAsLocal(platform_type=CloudPlatform.EMR, + output_folder=local_folder, + wrapper_options=wrapper_prof_options, + rapids_options=rapids_options).launch() + @staticmethod def bootstrap(cluster: str, profile: str = None, @@ -211,5 +283,6 @@ class EMRWrapper: # pylint: disable=too-few-public-methods def __init__(self): self.qualification = CliEmrLocalMode.qualification + self.profiling = CliEmrLocalMode.profiling self.bootstrap = CliEmrLocalMode.bootstrap self.diagnostic = CliEmrLocalMode.diagnostic diff --git a/user_tools/tox.ini b/user_tools/tox.ini index 0aa43cd25..9e8f6b87c 100644 --- a/user_tools/tox.ini +++ b/user_tools/tox.ini @@ -11,6 +11,12 @@ envlist = flake8 isolated_build = True +[gh-actions] +python = + 3.8: python3.8, pylint, flake8 + 3.9: python3.9, pylint, flake8 + 3.10: python3.10, pylint, flake8 + [testenv] deps = pytest-cov