From 924f52cce59d921afc24bfc93eb810eb82c56043 Mon Sep 17 00:00:00 2001 From: Matt Ahrens Date: Fri, 29 Sep 2023 12:58:43 -0500 Subject: [PATCH] Adding speedup factors for Dataproc Serverless and docs fix (#603) Signed-off-by: Matt Ahrens --- core/docs/spark-qualification-tool.md | 23 +- .../operatorsScore-dataproc-serverless-l4.csv | 266 ++++++++++++++++++ .../qualification/PluginTypeChecker.scala | 2 + .../qualification/QualificationArgs.scala | 4 +- .../PluginTypeCheckerSuite.scala | 5 + .../qualification/QualificationSuite.scala | 24 ++ user_tools/custom_speedup_factors/README.md | 4 +- 7 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv diff --git a/core/docs/spark-qualification-tool.md b/core/docs/spark-qualification-tool.md index 8d5312fdd..023f4318b 100644 --- a/core/docs/spark-qualification-tool.md +++ b/core/docs/spark-qualification-tool.md @@ -23,15 +23,16 @@ Spark resources. The estimations for GPU duration are available for different environments and are based on benchmarks run in the applicable environments. Here are the cluster information for the ETL benchmarks used for the estimates: -| Environment | CPU Cluster | GPU Cluster | -|------------------|-------------------|--------------------------------| -| On-prem | 8x 128-core | 8x 128-core + 8x A100 40 GB | -| Dataproc (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB | -| Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 | -| EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge | -| EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge | -| Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge | -| Databricks Azure | 8x E8ds_v4 | 8x NC8as_T4_v3 | +| Environment | CPU Cluster | GPU Cluster | +|--------------------------|-------------------|--------------------------------| +| On-prem | 8x 128-core | 8x 128-core + 8x A100 40 GB | +| Dataproc (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB | +| Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 | +| Dataproc Serverless (L4) | 8x 16 cores | 8x 16 cores + 8x L4 24GB | +| EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge | +| EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge | +| Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge | +| Databricks Azure | 8x E8ds_v4 | 8x NC8as_T4_v3 | Note that all benchmarks were run using the [NDS benchmark](https://github.com/NVIDIA/spark-rapids-benchmarks/tree/dev/nds) at SF3K (3 TB). @@ -247,8 +248,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* -p, --per-sql Report at the individual SQL query level. --platform Cluster platform where Spark CPU workloads were executed. Options include onprem, dataproc-t4, - dataproc-l4, emr-t4, emr-a10, databricks-aws, and - databricks-azure. + dataproc-l4, dataproc-serverless-l4, emr-t4, + emr-a10, databricks-aws, and databricks-azure. Default is onprem. -r, --report-read-schema Whether to output the read formats and datatypes to the CSV file. This can be very diff --git a/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv b/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv new file mode 100644 index 000000000..d740eed27 --- /dev/null +++ b/core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv @@ -0,0 +1,266 @@ +CPUOperator,Score +CoalesceExec,4.25 +CollectLimitExec,4.25 +ExpandExec,7.76 +FileSourceScanExec,3.64 +FilterExec,4.47 +GenerateExec,4.25 +GlobalLimitExec,4.25 +LocalLimitExec,4.25 +ProjectExec,4.25 +RangeExec,4.25 +SampleExec,4.25 +SortExec,4.25 +TakeOrderedAndProjectExec,20.96 +HashAggregateExec,5.54 +ObjectHashAggregateExec,5.54 +SortAggregateExec,5.54 +DataWritingCommandExec,4.25 +ExecutedCommandExec,4.25 +BatchScanExec,3.64 +ShuffleExchangeExec,5.21 +BroadcastHashJoinExec,6.42 +BroadcastNestedLoopJoinExec,17.46 +CartesianProductExec,4.25 +ShuffledHashJoinExec,4.25 +SortMergeJoinExec,7.4 +WindowExec,4.25 +Abs,4.25 +Acos,4.25 +Acosh,4.25 +Add,4.25 +AggregateExpression,4.25 +Alias,4.25 +And,4.25 +ApproximatePercentile,4.25 +ArrayContains,4.25 +ArrayExcept,4.25 +ArrayExists,4.25 +ArrayIntersect,4.25 +ArrayMax,4.25 +ArrayMin,4.25 +ArrayRemove,4.25 +ArrayRepeat,4.25 +ArrayTransform,4.25 +ArrayUnion,4.25 +ArraysOverlap,4.25 +ArraysZip,4.25 +Asin,4.25 +Asinh,4.25 +AtLeastNNonNulls,4.25 +Atan,4.25 +Atanh,4.25 +AttributeReference,4.25 +Average,4.25 +BRound,4.25 +BitLength,4.25 +BitwiseAnd,4.25 +BitwiseNot,4.25 +BitwiseOr,4.25 +BitwiseXor,4.25 +CaseWhen,4.25 +Cbrt,4.25 +Ceil,4.25 +CheckOverflow,4.25 +Coalesce,4.25 +CollectList,4.25 +CollectSet,4.25 +Concat,4.25 +ConcatWs,4.25 +Contains,4.25 +Conv,4.25 +Cos,4.25 +Cosh,4.25 +Cot,4.25 +Count,4.25 +CreateArray,4.25 +CreateMap,4.25 +CreateNamedStruct,4.25 +CurrentRow$,4.25 +DateAdd,4.25 +DateAddInterval,4.25 +DateDiff,4.25 +DateFormatClass,4.25 +DateSub,4.25 +DayOfMonth,4.25 +DayOfWeek,4.25 +DayOfYear,4.25 +DenseRank,4.25 +Divide,4.25 +ElementAt,4.25 +EndsWith,4.25 +EqualNullSafe,4.25 +EqualTo,4.25 +Exp,4.25 +Explode,4.25 +Expm1,4.25 +First,4.25 +Flatten,4.25 +Floor,4.25 +FromUTCTimestamp,4.25 +FromUnixTime,4.25 +GetArrayItem,4.25 +GetArrayStructFields,4.25 +GetJsonObject,4.25 +GetMapValue,4.25 +GetStructField,4.25 +GetTimestamp,4.25 +GreaterThan,4.25 +GreaterThanOrEqual,4.25 +Greatest,4.25 +HiveGenericUDF,4.25 +HiveSimpleUDF,4.25 +Hour,4.25 +Hypot,4.25 +If,4.25 +In,4.25 +InSet,4.25 +InitCap,4.25 +InputFileBlockLength,4.25 +InputFileBlockStart,4.25 +InputFileName,4.25 +IntegralDivide,4.25 +IsNaN,4.25 +IsNotNull,4.25 +IsNull,4.25 +JsonTuple,4.25 +KnownFloatingPointNormalized,4.25 +KnownNotNull,4.25 +Lag,4.25 +LambdaFunction,4.25 +Last,4.25 +LastDay,4.25 +Lead,4.25 +Least,4.25 +Length,4.25 +LessThan,4.25 +LessThanOrEqual,4.25 +Like,4.25 +Literal,4.25 +Log,4.25 +Log10,4.25 +Log1p,4.25 +Log2,4.25 +Logarithm,4.25 +Lower,4.25 +MakeDecimal,4.25 +MapConcat,4.25 +MapEntries,4.25 +MapFilter,4.25 +MapKeys,4.25 +MapValues,4.25 +Max,4.25 +Md5,4.25 +MicrosToTimestamp,4.25 +MillisToTimestamp,4.25 +Min,4.25 +Minute,4.25 +MonotonicallyIncreasingID,4.25 +Month,4.25 +Multiply,4.25 +Murmur3Hash,4.25 +NaNvl,4.25 +NamedLambdaVariable,4.25 +NormalizeNaNAndZero,4.25 +Not,4.25 +NthValue,4.25 +OctetLength,4.25 +Or,4.25 +PercentRank,4.25 +PivotFirst,4.25 +Pmod,4.25 +PosExplode,4.25 +Pow,4.25 +PreciseTimestampConversion,4.25 +PromotePrecision,4.25 +PythonUDF,4.25 +Quarter,4.25 +RLike,4.25 +RaiseError,4.25 +Rand,4.25 +Rank,4.25 +RegExpExtract,4.25 +RegExpExtractAll,4.25 +RegExpReplace,4.25 +Remainder,4.25 +ReplicateRows,4.25 +Reverse,4.25 +Rint,4.25 +Round,4.25 +RowNumber,4.25 +ScalaUDF,4.25 +ScalarSubquery,4.25 +Second,4.25 +SecondsToTimestamp,4.25 +Sequence,4.25 +ShiftLeft,4.25 +ShiftRight,4.25 +ShiftRightUnsigned,4.25 +Signum,4.25 +Sin,4.25 +Sinh,4.25 +Size,4.25 +SortArray,4.25 +SortOrder,4.25 +SparkPartitionID,4.25 +SpecifiedWindowFrame,4.25 +Sqrt,4.25 +StartsWith,4.25 +StddevPop,4.25 +StddevSamp,4.25 +StringInstr,4.25 +StringLPad,4.25 +StringLocate,4.25 +StringRPad,4.25 +StringRepeat,4.25 +StringReplace,4.25 +StringSplit,4.25 +StringToMap,4.25 +StringTranslate,4.25 +StringTrim,4.25 +StringTrimLeft,4.25 +StringTrimRight,4.25 +Substring,4.25 +SubstringIndex,4.25 +Subtract,4.25 +Sum,4.25 +Tan,4.25 +Tanh,4.25 +TimeAdd,4.25 +ToDegrees,4.25 +ToRadians,4.25 +ToUnixTimestamp,4.25 +TransformKeys,4.25 +TransformValues,4.25 +UnaryMinus,4.25 +UnaryPositive,4.25 +UnboundedFollowing$,4.25 +UnboundedPreceding$,4.25 +UnixTimestamp,4.25 +UnscaledValue,4.25 +Upper,4.25 +VariancePop,4.25 +VarianceSamp,4.25 +WeekDay,4.25 +WindowExpression,4.25 +WindowSpecDefinition,4.25 +XxHash64,4.25 +Year,4.25 +AggregateInPandasExec,1.2 +ArrowEvalPythonExec,1.2 +FlatMapGroupsInPandasExec,1.2 +FlatMapCoGroupsInPandasExec,1.2 +MapInPandasExec,1.2 +WindowInPandasExec,1.2 +KMeans-pyspark,8.86 +KMeans-scala,1.0 +PCA-pyspark,2.24 +PCA-scala,2.69 +LinearRegression-pyspark,2.0 +LinearRegression-scala,1.0 +RandomForestClassifier-pyspark,6.31 +RandomForestClassifier-scala,1.0 +RandomForestRegressor-pyspark,3.66 +RandomForestRegressor-scala,1.0 +XGBoost-pyspark,1.0 +XGBoost-scala,3.31 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index 2b41fcd41..bd5641363 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -47,6 +47,7 @@ class PluginTypeChecker(platform: String = "onprem", private val OPERATORS_SCORE_FILE_ONPREM = "operatorsScore.csv" private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv" private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv" + private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv" private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv" private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv" private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv" @@ -102,6 +103,7 @@ class PluginTypeChecker(platform: String = "onprem", // if no GPU specified, then default to dataproc-t4 for backward compatibility case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4 case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4 + case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4 // if no GPU specified, then default to emr-t4 for backward compatibility case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4 case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 925001724..075c3512f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -148,8 +148,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val platform: ScallopOption[String] = opt[String](required = false, descr = "Cluster platform where Spark CPU workloads were executed. Options include " + - "onprem, dataproc-t4, dataproc-l4, emr-t4, emr-a10, databricks-aws, and " + - "databricks-azure. " + + "onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, emr-t4, emr-a10, " + + "databricks-aws, and databricks-azure. " + "Default is onprem.", default = Some("onprem")) val speedupFactorFile: ScallopOption[String] = diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index 00933d7cc..e7720fd28 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -183,6 +183,11 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { assert(checker.getSpeedupFactor("Ceil") == 2.73) } + test("supported operator score from dataproc-serverless-l4") { + val checker = new PluginTypeChecker("dataproc-serverless-l4") + assert(checker.getSpeedupFactor("WindowExec") == 4.25) + assert(checker.getSpeedupFactor("Ceil") == 4.25) + } test("supported operator score from dataproc-l4") { val checker = new PluginTypeChecker("dataproc-l4") assert(checker.getSpeedupFactor("UnionExec") == 4.16) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 59c55a58e..502ddffc1 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1505,6 +1505,30 @@ class QualificationSuite extends BaseTestSuite { assert(outputActual.collect().size == 1) } + // run the qualification tool for dataproc-serverless-l4 + TrampolineUtil.withTempDir { outpath => + val appArgs = new QualificationArgs(Array( + "--output-directory", + outpath.getAbsolutePath, + "--platform", + "dataproc-serverless-l4", + eventLog)) + + val (exit, sumInfo) = + QualificationMain.mainInternal(appArgs) + assert(exit == 0) + + // the code above that runs the Spark query stops the Sparksession + // so create a new one to read in the csv file + createSparkSession() + + // validate that the SQL description in the csv file escapes commas properly + val outputResults = s"$outpath/rapids_4_spark_qualification_output/" + + s"rapids_4_spark_qualification_output.csv" + val outputActual = readExpectedFile(new File(outputResults)) + assert(outputActual.collect().size == 1) + } + // run the qualification tool for databricks-aws TrampolineUtil.withTempDir { outpath => val appArgs = new QualificationArgs(Array( diff --git a/user_tools/custom_speedup_factors/README.md b/user_tools/custom_speedup_factors/README.md index 4e8aace35..2746965be 100644 --- a/user_tools/custom_speedup_factors/README.md +++ b/user_tools/custom_speedup_factors/README.md @@ -44,12 +44,12 @@ There is a utility script in the directory to allow for validation of custom spe Example execution of the script: ``` -python validate_speedup_factors.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup +python validate_qualification_estimates.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup ``` The script also allows you to pass in a custom speedup factor file if you have previously generated them. Example: ``` -python validate_speedup_factors.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup --speedups test-scores.csv +python validate_qualification_estimates.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup --speedups test-scores.csv ``` Other options include passing in the CPU and/or GPU profiler output if that has already been done via the `cpu_profile` and `gpu_profile` arguments. Additionally, you can pass in a custom tools jar via `--jar` if that is needed.