From 097ed8d5764446ada7112376f575ff1b8c04d954 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Fri, 13 Oct 2023 16:27:49 -0700 Subject: [PATCH] Add support in core tools for Dataproc GKE L4 instances Signed-off-by: Partho Sarthi --- core/docs/spark-qualification-tool.md | 5 +- .../operatorsScore-dataproc-gke-l4.csv | 256 ++++++++++++++++++ .../qualification/PluginTypeChecker.scala | 2 + .../qualification/QualificationArgs.scala | 5 +- .../PluginTypeCheckerSuite.scala | 6 + .../qualification/QualificationSuite.scala | 24 ++ 6 files changed, 294 insertions(+), 4 deletions(-) create mode 100644 core/src/main/resources/operatorsScore-dataproc-gke-l4.csv diff --git a/core/docs/spark-qualification-tool.md b/core/docs/spark-qualification-tool.md index 64ce149f0..0f9db8720 100644 --- a/core/docs/spark-qualification-tool.md +++ b/core/docs/spark-qualification-tool.md @@ -30,6 +30,7 @@ applicable environments. Here are the cluster information for the ETL benchmark | Dataproc (L4) | 8x n1-standard-16 | 8x g2-standard-16 | | Dataproc Serverless (L4) | 8x 16 cores | 8x 16 cores + 8x L4 24GB | | Dataproc GKE (T4) | 8x n1-standard-32 | 8x n1-standard-32 + 8x T4 16GB | +| Dataproc GKE (L4) | 8x n1-standard-32 | 8x n1-standard-32 + 8x L4 24GB | | EMR (T4) | 8x m5d.8xlarge | 4x g4dn.12xlarge | | EMR (A10) | 8x m5d.8xlarge | 8x g5.8xlarge | | Databricks AWS | 8x m6gd.8xlage | 8x g5.8xlarge | @@ -250,8 +251,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* --platform Cluster platform where Spark CPU workloads were executed. Options include onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, - emr-t4, emr-a10, databricks-aws, and databricks-azure. - Default is onprem. + dataproc-gke-l4, emr-t4, emr-a10, databricks-aws, + and databricks-azure. Default is onprem. -r, --report-read-schema Whether to output the read formats and datatypes to the CSV file. This can be very long. Default is false. diff --git a/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv b/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv new file mode 100644 index 000000000..1426aa047 --- /dev/null +++ b/core/src/main/resources/operatorsScore-dataproc-gke-l4.csv @@ -0,0 +1,256 @@ +CPUOperator,Score +CoalesceExec,3.74 +CollectLimitExec,3.74 +ExpandExec,4.07 +FileSourceScanExec,2.65 +FilterExec,3.52 +GenerateExec,3.74 +GlobalLimitExec,3.74 +LocalLimitExec,3.74 +ProjectExec,3.74 +RangeExec,3.74 +SampleExec,3.74 +SortExec,3.74 +TakeOrderedAndProjectExec,3.74 +HashAggregateExec,4.29 +ObjectHashAggregateExec,4.29 +SortAggregateExec,4.29 +DataWritingCommandExec,3.74 +ExecutedCommandExec,3.74 +BatchScanExec,2.65 +ShuffleExchangeExec,3.2 +BroadcastHashJoinExec,3.4 +BroadcastNestedLoopJoinExec,1.62 +CartesianProductExec,3.74 +ShuffledHashJoinExec,3.74 +SortMergeJoinExec,5.1 +WindowExec,3.74 +Abs,3.74 +Acos,3.74 +Acosh,3.74 +Add,3.74 +AggregateExpression,3.74 +Alias,3.74 +And,3.74 +ApproximatePercentile,3.74 +ArrayContains,3.74 +ArrayExcept,3.74 +ArrayExists,3.74 +ArrayIntersect,3.74 +ArrayMax,3.74 +ArrayMin,3.74 +ArrayRemove,3.74 +ArrayRepeat,3.74 +ArrayTransform,3.74 +ArrayUnion,3.74 +ArraysOverlap,3.74 +ArraysZip,3.74 +Asin,3.74 +Asinh,3.74 +AtLeastNNonNulls,3.74 +Atan,3.74 +Atanh,3.74 +AttributeReference,3.74 +Average,3.74 +BRound,3.74 +BitLength,3.74 +BitwiseAnd,3.74 +BitwiseNot,3.74 +BitwiseOr,3.74 +BitwiseXor,3.74 +CaseWhen,3.74 +Cbrt,3.74 +Ceil,3.74 +CheckOverflow,3.74 +Coalesce,3.74 +CollectList,3.74 +CollectSet,3.74 +Concat,3.74 +ConcatWs,3.74 +Contains,3.74 +Conv,3.74 +Cos,3.74 +Cosh,3.74 +Cot,3.74 +Count,3.74 +CreateArray,3.74 +CreateMap,3.74 +CreateNamedStruct,3.74 +CurrentRow$,3.74 +DateAdd,3.74 +DateAddInterval,3.74 +DateDiff,3.74 +DateFormatClass,3.74 +DateSub,3.74 +DayOfMonth,3.74 +DayOfWeek,3.74 +DayOfYear,3.74 +DenseRank,3.74 +Divide,3.74 +DynamicPruningExpression,3.74 +ElementAt,3.74 +EndsWith,3.74 +EqualNullSafe,3.74 +EqualTo,3.74 +Exp,3.74 +Explode,3.74 +Expm1,3.74 +First,3.74 +Flatten,3.74 +Floor,3.74 +FromUTCTimestamp,3.74 +FromUnixTime,3.74 +GetArrayItem,3.74 +GetArrayStructFields,3.74 +GetJsonObject,3.74 +GetMapValue,3.74 +GetStructField,3.74 +GetTimestamp,3.74 +GreaterThan,3.74 +GreaterThanOrEqual,3.74 +Greatest,3.74 +HiveGenericUDF,3.74 +HiveSimpleUDF,3.74 +Hour,3.74 +Hypot,3.74 +If,3.74 +In,3.74 +InSet,3.74 +InitCap,3.74 +InputFileBlockLength,3.74 +InputFileBlockStart,3.74 +InputFileName,3.74 +IntegralDivide,3.74 +IsNaN,3.74 +IsNotNull,3.74 +IsNull,3.74 +JsonToStructs,3.74 +JsonTuple,3.74 +KnownFloatingPointNormalized,3.74 +KnownNotNull,3.74 +Lag,3.74 +LambdaFunction,3.74 +Last,3.74 +LastDay,3.74 +Lead,3.74 +Least,3.74 +Length,3.74 +LessThan,3.74 +LessThanOrEqual,3.74 +Like,3.74 +Literal,3.74 +Log,3.74 +Log10,3.74 +Log1p,3.74 +Log2,3.74 +Logarithm,3.74 +Lower,3.74 +MakeDecimal,3.74 +MapConcat,3.74 +MapEntries,3.74 +MapFilter,3.74 +MapKeys,3.74 +MapValues,3.74 +Max,3.74 +Md5,3.74 +MicrosToTimestamp,3.74 +MillisToTimestamp,3.74 +Min,3.74 +Minute,3.74 +MonotonicallyIncreasingID,3.74 +Month,3.74 +Multiply,3.74 +Murmur3Hash,3.74 +NaNvl,3.74 +NamedLambdaVariable,3.74 +NormalizeNaNAndZero,3.74 +Not,3.74 +NthValue,3.74 +OctetLength,3.74 +Or,3.74 +PercentRank,3.74 +PivotFirst,3.74 +Pmod,3.74 +PosExplode,3.74 +Pow,3.74 +PreciseTimestampConversion,3.74 +PromotePrecision,3.74 +PythonUDF,3.74 +Quarter,3.74 +RLike,3.74 +RaiseError,3.74 +Rand,3.74 +Rank,3.74 +RegExpExtract,3.74 +RegExpExtractAll,3.74 +RegExpReplace,3.74 +Remainder,3.74 +ReplicateRows,3.74 +Reverse,3.74 +Rint,3.74 +Round,3.74 +RowNumber,3.74 +ScalaUDF,3.74 +ScalarSubquery,3.74 +Second,3.74 +SecondsToTimestamp,3.74 +Sequence,3.74 +ShiftLeft,3.74 +ShiftRight,3.74 +ShiftRightUnsigned,3.74 +Signum,3.74 +Sin,3.74 +Sinh,3.74 +Size,3.74 +SortArray,3.74 +SortOrder,3.74 +SparkPartitionID,3.74 +SpecifiedWindowFrame,3.74 +Sqrt,3.74 +StartsWith,3.74 +StddevPop,3.74 +StddevSamp,3.74 +StringInstr,3.74 +StringLPad,3.74 +StringLocate,3.74 +StringRPad,3.74 +StringRepeat,3.74 +StringReplace,3.74 +StringSplit,3.74 +StringToMap,3.74 +StringTranslate,3.74 +StringTrim,3.74 +StringTrimLeft,3.74 +StringTrimRight,3.74 +Substring,3.74 +SubstringIndex,3.74 +Subtract,3.74 +Sum,3.74 +Tan,3.74 +Tanh,3.74 +TimeAdd,3.74 +ToDegrees,3.74 +ToRadians,3.74 +ToUnixTimestamp,3.74 +TransformKeys,3.74 +TransformValues,3.74 +UnaryMinus,3.74 +UnaryPositive,3.74 +UnboundedFollowing$,3.74 +UnboundedPreceding$,3.74 +UnixTimestamp,3.74 +UnscaledValue,3.74 +Upper,3.74 +VariancePop,3.74 +VarianceSamp,3.74 +WeekDay,3.74 +WindowExpression,3.74 +WindowSpecDefinition,3.74 +XxHash64,3.74 +Year,3.74 +AggregateInPandasExec,1.2 +ArrowEvalPythonExec,1.2 +FlatMapGroupsInPandasExec,1.2 +FlatMapCoGroupsInPandasExec,1.2 +MapInPandasExec,1.2 +WindowInPandasExec,1.2 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index d14336618..7baf5455e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -49,6 +49,7 @@ class PluginTypeChecker(platform: String = "onprem", private val OPERATORS_SCORE_FILE_DATAPROC_L4 = "operatorsScore-dataproc-l4.csv" private val OPERATORS_SCORE_FILE_DATAPROC_SL_L4 = "operatorsScore-dataproc-serverless-l4.csv" private val OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 = "operatorsScore-dataproc-gke-t4.csv" + private val OPERATORS_SCORE_FILE_DATAPROC_GKE_L4 = "operatorsScore-dataproc-gke-l4.csv" private val OPERATORS_SCORE_FILE_EMR_T4 = "operatorsScore-emr-t4.csv" private val OPERATORS_SCORE_FILE_EMR_A10 = "operatorsScore-emr-a10.csv" private val OPERATORS_SCORE_FILE_DATABRICKS_AWS = "operatorsScore-databricks-aws.csv" @@ -106,6 +107,7 @@ class PluginTypeChecker(platform: String = "onprem", case "dataproc-l4" => OPERATORS_SCORE_FILE_DATAPROC_L4 case "dataproc-serverless-l4" => OPERATORS_SCORE_FILE_DATAPROC_SL_L4 case "dataproc-gke-t4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_T4 + case "dataproc-gke-l4" => OPERATORS_SCORE_FILE_DATAPROC_GKE_L4 // if no GPU specified, then default to emr-t4 for backward compatibility case "emr-t4" | "emr" => OPERATORS_SCORE_FILE_EMR_T4 case "emr-a10" => OPERATORS_SCORE_FILE_EMR_A10 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 76a2fba8e..0685d0e3f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -148,8 +148,9 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val platform: ScallopOption[String] = opt[String](required = false, descr = "Cluster platform where Spark CPU workloads were executed. Options include " + - "onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, emr-t4, " + - "emr-a10, databricks-aws, and databricks-azure. Default is onprem.", + "onprem, dataproc-t4, dataproc-l4, dataproc-serverless-l4, dataproc-gke-t4, " + + "dataproc-gke-l4, emr-t4, emr-a10, databricks-aws, and databricks-azure. Default " + + "is onprem.", default = Some("onprem")) val speedupFactorFile: ScallopOption[String] = opt[String](required = false, diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala index 3ba8badc9..61e8acf40 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeCheckerSuite.scala @@ -201,6 +201,12 @@ class PluginTypeCheckerSuite extends FunSuite with Logging { assert(checker.getSpeedupFactor("Ceil") == 3.65) } + test("supported operator score from dataproc-gke-l4") { + val checker = new PluginTypeChecker("dataproc-gke-l4") + assert(checker.getSpeedupFactor("WindowExec") == 3.74) + assert(checker.getSpeedupFactor("Ceil") == 3.74) + } + test("supported operator score from emr-a10") { val checker = new PluginTypeChecker("emr-a10") assert(checker.getSpeedupFactor("UnionExec") == 2.59) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 982a65ec8..307dcb00a 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1553,6 +1553,30 @@ class QualificationSuite extends BaseTestSuite { assert(outputActual.collect().size == 1) } + // run the qualification tool for dataproc-gke-l4 + TrampolineUtil.withTempDir { outpath => + val appArgs = new QualificationArgs(Array( + "--output-directory", + outpath.getAbsolutePath, + "--platform", + "dataproc-gke-l4", + eventLog)) + + val (exit, _) = + QualificationMain.mainInternal(appArgs) + assert(exit == 0) + + // the code above that runs the Spark query stops the Sparksession + // so create a new one to read in the csv file + createSparkSession() + + // validate that the SQL description in the csv file escapes commas properly + val outputResults = s"$outpath/rapids_4_spark_qualification_output/" + + s"rapids_4_spark_qualification_output.csv" + val outputActual = readExpectedFile(new File(outputResults)) + assert(outputActual.collect().size == 1) + } + // run the qualification tool for databricks-aws TrampolineUtil.withTempDir { outpath => val appArgs = new QualificationArgs(Array(