Skip to content

Commit

Permalink
Adding speedup factors for Dataproc Serverless and docs fix (#603)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Ahrens <[email protected]>
  • Loading branch information
mattahrens authored Sep 29, 2023
1 parent da8f88f commit 924f52c
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 15 deletions.
23 changes: 12 additions & 11 deletions core/docs/spark-qualification-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ Spark resources.
The estimations for GPU duration are available for different environments and are based on benchmarks run in the
applicable environments. Here are the cluster information for the ETL benchmarks used for the estimates:

| Environment | CPU Cluster | GPU Cluster |
|------------------|-------------------|--------------------------------|
| On-prem | 8x 128-core | 8x 128-core + 8x A100 40 GB |
| Dataproc (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB |
| Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 |
| EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge |
| EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge |
| Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge |
| Databricks Azure | 8x E8ds_v4 | 8x NC8as_T4_v3 |
| Environment | CPU Cluster | GPU Cluster |
|--------------------------|-------------------|--------------------------------|
| On-prem | 8x 128-core | 8x 128-core + 8x A100 40 GB |
| Dataproc (T4) | 4x n1-standard-32 | 4x n1-standard-32 + 8x T4 16GB |
| Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 |
| Dataproc Serverless (L4) | 8x 16 cores | 8x 16 cores + 8x L4 24GB |
| EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge |
| EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge |
| Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge |
| Databricks Azure | 8x E8ds_v4 | 8x NC8as_T4_v3 |

Note that all benchmarks were run using the [NDS benchmark](https://github.com/NVIDIA/spark-rapids-benchmarks/tree/dev/nds) at SF3K (3 TB).

Expand Down Expand Up @@ -247,8 +248,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
-p, --per-sql Report at the individual SQL query level.
--platform <arg> Cluster platform where Spark CPU workloads were
executed. Options include onprem, dataproc-t4,
dataproc-l4, emr-t4, emr-a10, databricks-aws, and
databricks-azure.
dataproc-l4, dataproc-serverless-l4, emr-t4,
emr-a10, databricks-aws, and databricks-azure.
Default is onprem.
-r, --report-read-schema Whether to output the read formats and
datatypes to the CSV file. This can be very
Expand Down
266 changes: 266 additions & 0 deletions core/src/main/resources/operatorsScore-dataproc-serverless-l4.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
CPUOperator,Score
CoalesceExec,4.25
CollectLimitExec,4.25
ExpandExec,7.76
FileSourceScanExec,3.64
FilterExec,4.47
GenerateExec,4.25
GlobalLimitExec,4.25
LocalLimitExec,4.25
ProjectExec,4.25
RangeExec,4.25
SampleExec,4.25
SortExec,4.25
TakeOrderedAndProjectExec,20.96
HashAggregateExec,5.54
ObjectHashAggregateExec,5.54
SortAggregateExec,5.54
DataWritingCommandExec,4.25
ExecutedCommandExec,4.25
BatchScanExec,3.64
ShuffleExchangeExec,5.21
BroadcastHashJoinExec,6.42
BroadcastNestedLoopJoinExec,17.46
CartesianProductExec,4.25
ShuffledHashJoinExec,4.25
SortMergeJoinExec,7.4
WindowExec,4.25
Abs,4.25
Acos,4.25
Acosh,4.25
Add,4.25
AggregateExpression,4.25
Alias,4.25
And,4.25
ApproximatePercentile,4.25
ArrayContains,4.25
ArrayExcept,4.25
ArrayExists,4.25
ArrayIntersect,4.25
ArrayMax,4.25
ArrayMin,4.25
ArrayRemove,4.25
ArrayRepeat,4.25
ArrayTransform,4.25
ArrayUnion,4.25
ArraysOverlap,4.25
ArraysZip,4.25
Asin,4.25
Asinh,4.25
AtLeastNNonNulls,4.25
Atan,4.25
Atanh,4.25
AttributeReference,4.25
Average,4.25
BRound,4.25
BitLength,4.25
BitwiseAnd,4.25
BitwiseNot,4.25
BitwiseOr,4.25
BitwiseXor,4.25
CaseWhen,4.25
Cbrt,4.25
Ceil,4.25
CheckOverflow,4.25
Coalesce,4.25
CollectList,4.25
CollectSet,4.25
Concat,4.25
ConcatWs,4.25
Contains,4.25
Conv,4.25
Cos,4.25
Cosh,4.25
Cot,4.25
Count,4.25
CreateArray,4.25
CreateMap,4.25
CreateNamedStruct,4.25
CurrentRow$,4.25
DateAdd,4.25
DateAddInterval,4.25
DateDiff,4.25
DateFormatClass,4.25
DateSub,4.25
DayOfMonth,4.25
DayOfWeek,4.25
DayOfYear,4.25
DenseRank,4.25
Divide,4.25
ElementAt,4.25
EndsWith,4.25
EqualNullSafe,4.25
EqualTo,4.25
Exp,4.25
Explode,4.25
Expm1,4.25
First,4.25
Flatten,4.25
Floor,4.25
FromUTCTimestamp,4.25
FromUnixTime,4.25
GetArrayItem,4.25
GetArrayStructFields,4.25
GetJsonObject,4.25
GetMapValue,4.25
GetStructField,4.25
GetTimestamp,4.25
GreaterThan,4.25
GreaterThanOrEqual,4.25
Greatest,4.25
HiveGenericUDF,4.25
HiveSimpleUDF,4.25
Hour,4.25
Hypot,4.25
If,4.25
In,4.25
InSet,4.25
InitCap,4.25
InputFileBlockLength,4.25
InputFileBlockStart,4.25
InputFileName,4.25
IntegralDivide,4.25
IsNaN,4.25
IsNotNull,4.25
IsNull,4.25
JsonTuple,4.25
KnownFloatingPointNormalized,4.25
KnownNotNull,4.25
Lag,4.25
LambdaFunction,4.25
Last,4.25
LastDay,4.25
Lead,4.25
Least,4.25
Length,4.25
LessThan,4.25
LessThanOrEqual,4.25
Like,4.25
Literal,4.25
Log,4.25
Log10,4.25
Log1p,4.25
Log2,4.25
Logarithm,4.25
Lower,4.25
MakeDecimal,4.25
MapConcat,4.25
MapEntries,4.25
MapFilter,4.25
MapKeys,4.25
MapValues,4.25
Max,4.25
Md5,4.25
MicrosToTimestamp,4.25
MillisToTimestamp,4.25
Min,4.25
Minute,4.25
MonotonicallyIncreasingID,4.25
Month,4.25
Multiply,4.25
Murmur3Hash,4.25
NaNvl,4.25
NamedLambdaVariable,4.25
NormalizeNaNAndZero,4.25
Not,4.25
NthValue,4.25
OctetLength,4.25
Or,4.25
PercentRank,4.25
PivotFirst,4.25
Pmod,4.25
PosExplode,4.25
Pow,4.25
PreciseTimestampConversion,4.25
PromotePrecision,4.25
PythonUDF,4.25
Quarter,4.25
RLike,4.25
RaiseError,4.25
Rand,4.25
Rank,4.25
RegExpExtract,4.25
RegExpExtractAll,4.25
RegExpReplace,4.25
Remainder,4.25
ReplicateRows,4.25
Reverse,4.25
Rint,4.25
Round,4.25
RowNumber,4.25
ScalaUDF,4.25
ScalarSubquery,4.25
Second,4.25
SecondsToTimestamp,4.25
Sequence,4.25
ShiftLeft,4.25
ShiftRight,4.25
ShiftRightUnsigned,4.25
Signum,4.25
Sin,4.25
Sinh,4.25
Size,4.25
SortArray,4.25
SortOrder,4.25
SparkPartitionID,4.25
SpecifiedWindowFrame,4.25
Sqrt,4.25
StartsWith,4.25
StddevPop,4.25
StddevSamp,4.25
StringInstr,4.25
StringLPad,4.25
StringLocate,4.25
StringRPad,4.25
StringRepeat,4.25
StringReplace,4.25
StringSplit,4.25
StringToMap,4.25
StringTranslate,4.25
StringTrim,4.25
StringTrimLeft,4.25
StringTrimRight,4.25
Substring,4.25
SubstringIndex,4.25
Subtract,4.25
Sum,4.25
Tan,4.25
Tanh,4.25
TimeAdd,4.25
ToDegrees,4.25
ToRadians,4.25
ToUnixTimestamp,4.25
TransformKeys,4.25
TransformValues,4.25
UnaryMinus,4.25
UnaryPositive,4.25
UnboundedFollowing$,4.25
UnboundedPreceding$,4.25
UnixTimestamp,4.25
UnscaledValue,4.25
Upper,4.25
VariancePop,4.25
VarianceSamp,4.25
WeekDay,4.25
WindowExpression,4.25
WindowSpecDefinition,4.25
XxHash64,4.25
Year,4.25
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
FlatMapGroupsInPandasExec,1.2
FlatMapCoGroupsInPandasExec,1.2
MapInPandasExec,1.2
WindowInPandasExec,1.2
KMeans-pyspark,8.86
KMeans-scala,1.0
PCA-pyspark,2.24
PCA-scala,2.69
LinearRegression-pyspark,2.0
LinearRegression-scala,1.0
RandomForestClassifier-pyspark,6.31
RandomForestClassifier-scala,1.0
RandomForestRegressor-pyspark,3.66
RandomForestRegressor-scala,1.0
XGBoost-pyspark,1.0
XGBoost-scala,3.31
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PluginTypeChecker(platform: String = "onprem",
private val OPERATORS_SCORE_FILE_ONPREM = "operatorsScore.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_T4 = "operatorsScore-dataproc-t4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv"
private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv"
private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv"
private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv"
private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv"
Expand Down Expand Up @@ -102,6 +103,7 @@ class PluginTypeChecker(platform: String = "onprem",
// if no GPU specified, then default to dataproc-t4 for backward compatibility
case "dataproc-t4" | "dataproc" => OPERATORS_SCORE_FILE_DATAPROC_T4
case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4
case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4
// if no GPU specified, then default to emr-t4 for backward compatibility
case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4
case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
val platform: ScallopOption[String] =
opt[String](required = false,
descr = "Cluster platform where Spark CPU workloads were executed. Options include " +
"onprem, dataproc-t4, dataproc-l4, emr-t4, emr-a10, databricks-aws, and " +
"databricks-azure. " +
"onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, emr-t4, emr-a10, " +
"databricks-aws, and databricks-azure. " +
"Default is onprem.",
default = Some("onprem"))
val speedupFactorFile: ScallopOption[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ class PluginTypeCheckerSuite extends FunSuite with Logging {
assert(checker.getSpeedupFactor("Ceil") == 2.73)
}

test("supported operator score from dataproc-serverless-l4") {
val checker = new PluginTypeChecker("dataproc-serverless-l4")
assert(checker.getSpeedupFactor("WindowExec") == 4.25)
assert(checker.getSpeedupFactor("Ceil") == 4.25)
}
test("supported operator score from dataproc-l4") {
val checker = new PluginTypeChecker("dataproc-l4")
assert(checker.getSpeedupFactor("UnionExec") == 4.16)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,30 @@ class QualificationSuite extends BaseTestSuite {
assert(outputActual.collect().size == 1)
}

// run the qualification tool for dataproc-serverless-l4
TrampolineUtil.withTempDir { outpath =>
val appArgs = new QualificationArgs(Array(
"--output-directory",
outpath.getAbsolutePath,
"--platform",
"dataproc-serverless-l4",
eventLog))

val (exit, sumInfo) =
QualificationMain.mainInternal(appArgs)
assert(exit == 0)

// the code above that runs the Spark query stops the Sparksession
// so create a new one to read in the csv file
createSparkSession()

// validate that the SQL description in the csv file escapes commas properly
val outputResults = s"$outpath/rapids_4_spark_qualification_output/" +
s"rapids_4_spark_qualification_output.csv"
val outputActual = readExpectedFile(new File(outputResults))
assert(outputActual.collect().size == 1)
}

// run the qualification tool for databricks-aws
TrampolineUtil.withTempDir { outpath =>
val appArgs = new QualificationArgs(Array(
Expand Down
4 changes: 2 additions & 2 deletions user_tools/custom_speedup_factors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ There is a utility script in the directory to allow for validation of custom spe

Example execution of the script:
```
python validate_speedup_factors.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup
python validate_qualification_estimates.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup
```

The script also allows you to pass in a custom speedup factor file if you have previously generated them. Example:
```
python validate_speedup_factors.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup --speedups test-scores.csv
python validate_qualification_estimates.py --cpu_log CPU-nds-eventlog --gpu_log GPU-nds-eventlog --output test-speedup --speedups test-scores.csv
```

Other options include passing in the CPU and/or GPU profiler output if that has already been done via the `cpu_profile` and `gpu_profile` arguments. Additionally, you can pass in a custom tools jar via `--jar` if that is needed.

0 comments on commit 924f52c

Please sign in to comment.