Note
|
Review SPARK-12795 Whole stage codegen to learn about the work to support it. |
Whole-Stage Code Generation (aka WholeStageCodegen or WholeStageCodegenExec) fuses multiple operators (as a subtree of plans that support codegen) together into a single Java function that is aimed at improving execution performance. It collapses a query into a single optimized function that eliminates virtual function calls and leverages CPU registers for intermediate data.
WholeStageCodegenExec
case class works with a SparkPlan to produce a codegened pipeline. It is a unary node in SparkPlan
with support for codegen.
Tip
|
Use Dataset.explain method to know the physical plan of a query and find out whether or not WholeStageCodegen is in use.
|
Tip
|
Consider using Debugging Query Execution facility to deep dive into whole stage codegen. |
scala> spark.range(10).select('id as 'asId).where('id === 4).explain
== Physical Plan ==
WholeStageCodegen
: +- Project [id#0L AS asId#3L]
: +- Filter (id#0L = 4)
: +- Range 0, 1, 8, 10, [id#0L]
SparkPlan
plans with support for codegen extend CodegenSupport.
Note
|
Whole stage codegen is used by some modern massively parallel processing (MPP) databases to archive great performance. See Efficiently Compiling Efficient Query Plans for Modern Hardware (PDF). |
Whole stage codegen uses spark.sql.codegen.wholeStage setting to control…FIXME
Note
|
Janino is used to compile a Java source code into a Java class. |
Before a query is executed, CollapseCodegenStages case class is used to find the plans that support codegen and collapse them together as WholeStageCodegen
. It is part of the sequence of rules QueryExecution.preparations that will be applied in order to the physical plan before execution.
CodegenSupport
is a custom SparkPlan for operators that support codegen.
It however allows custom implementations to optionally disable codegen using supportCodegen
predicate (that defaults to true
).
It assumes that custom implementations define:
-
doProduce(ctx: CodegenContext): String
SparkPlan
plans that support codegen extend CodegenSupport.
-
ProjectExec
foras
-
FilterExec
forwhere
orfilter
-
Range
-
SampleExec for
sample
-
RowDataSourceScanExec
Caution
|
FIXME Where is RowDataSourceScanExec used?
|
-
BatchedDataSourceScanExec
-
ExpandExec
-
BaseLimitExec
-
SortExec
-
WholeStageCodegenExec
andInputAdapter
-
TungstenAggregate
-
SortMergeJoinExec
BroadcastHashJoinExec
variables are prefixed with bhj
(see CodegenSupport.variablePrefix
).
val ds = Seq((0,"playing"), (1, "with"), (2, "broadcast")).toDS
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res18: String = 10485760
scala> ds.join(ds).explain(extended=true)
== Parsed Logical Plan ==
'Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#21, _2#22]
== Analyzed Logical Plan ==
_1: int, _2: string, _1: int, _2: string
Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#32, _2#33]
== Optimized Logical Plan ==
Join Inner
:- LocalRelation [_1#21, _2#22]
+- LocalRelation [_1#32, _2#33]
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, true
:- LocalTableScan [_1#21, _2#22]
+- BroadcastExchange IdentityBroadcastMode
+- LocalTableScan [_1#32, _2#33]
// Use broadcast function to mark the right-side Dataset
// eligible for broadcasting explicitly
scala> ds.join(broadcast(ds)).explain(extended=true)
== Parsed Logical Plan ==
'Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
+- LocalRelation [_1#21, _2#22]
== Analyzed Logical Plan ==
_1: int, _2: string, _1: int, _2: string
Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
+- LocalRelation [_1#43, _2#44]
== Optimized Logical Plan ==
Join Inner
:- LocalRelation [_1#21, _2#22]
+- BroadcastHint
+- LocalRelation [_1#43, _2#44]
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner, true
:- LocalTableScan [_1#21, _2#22]
+- BroadcastExchange IdentityBroadcastMode
+- LocalTableScan [_1#43, _2#44]
scala> spark.range(10).sample(false, 0.4).explain
== Physical Plan ==
WholeStageCodegen
: +- Sample 0.0, 0.4, false, -7634498724724501829
: +- Range 0, 1, 8, 10, [id#15L]
CollapseCodegenStages
is a Rule[SparkPlan]
, i.e. a transformation of SparkPlan into another SparkPlan
.
Note
|
CollapseCodegenStages is used in the sequence of rules to apply to a SparkPlan before query execution.
|
It searches for sub-plans (aka stages) that support codegen and collapse them together as a WholeStageCodegen
.
Note
|
Only CodegenSupport SparkPlans support codegen for which supportCodegen is enabled (true ).
|
It is assumed that all Expression
instances except CodegenFallback
support codegen.
CollapseCodegenStages
uses the internal setting spark.sql.codegen.maxFields
(default: 200
) to control the number of fields in input and output schemas before deactivating whole-stage codegen. It counts the fields included in complex types, i.e. StructType, MapType
, ArrayType
, UserDefinedType
, and their combinations, recursively. See SPARK-14554.
It inserts InputAdapter
leaf nodes in a SparkPlan recursively that is then used to generate code that consumes an RDD iterator of InternalRow
.
BenchmarkWholeStageCodegen
class provides a benchmark to measure whole stage codegen performance.
You can execute it using the command:
build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
Note
|
You need to un-ignore tests in BenchmarkWholeStageCodegen by replacing ignore with test .
|
$ build/sbt 'sql/testOnly *BenchmarkWholeStageCodegen'
...
Running benchmark: range/limit/sum
Running case: range/limit/sum codegen=false
22:55:23.028 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Running case: range/limit/sum codegen=true
Java HotSpot(TM) 64-Bit Server VM 1.8.0_77-b03 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
range/limit/sum codegen=false 376 / 433 1394.5 0.7 1.0X
range/limit/sum codegen=true 332 / 388 1581.3 0.6 1.1X
[info] - range/limit/sum (10 seconds, 74 milliseconds)