QueryExecution
is a part of Dataset and represents the query execution that will eventually produce the data in a Dataset
.
QueryExecution
is the result of executing a LogicalPlan
in a SparkSession
(and so you could create a Dataset
from a LogicalPlan
or use the QueryExecution
after executing the LogicalPlan
).
QueryExecution
uses the input SparkSession
to access the current SparkPlanner (through SessionState that could also return a HiveSessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan
exactly) using the planner. It is available as the sparkPlan
(lazy) attribute.
A streaming variant of QueryExecution
is IncrementalExecution.
debug
package object contains methods for debugging query execution that you can use to do the full analysis of your queries (as Dataset
objects).
Caution
|
FIXME What’s planner ? analyzed ? Why do we need assertAnalyzed and assertSupported ?
|
It belongs to org.apache.spark.sql.execution
package.
Note
|
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
|
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]
== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]
== Physical Plan ==
WholeStageCodegen
: +- Range 0, 1, 8, 5, [id#39L]
class QueryExecution(
val sparkSession: SparkSession,
val logical: LogicalPlan)
QueryExecution
requires a SparkSession and a LogicalPlan.
QueryExecution
holds the following lazy attributes (that are initializes when first read):
analyzed
being a LogicalPlan
that is the input logical
with unresolved attributes and relations resolved.
withCachedData
being a LogicalPlan
that is the analyzed
plan after being analyzed, checked (for unsupported operations) and replaced with cached segments.
optimizedPlan
is the LogicalPlan
(of a query) that is a result of executing the session-owned Optimizer to withCachedData.
executedPlan
being a SparkPlan
that is sparkPlan
prepared for execution by inserting shuffle operations and internal row format conversions where needed.
IncrementalExecution
is a custom QueryExecution
with OutputMode
, checkpointLocation
, and currentBatchId
.
It lives in org.apache.spark.sql.execution.streaming
package.
Caution
|
FIXME What is stateStrategy ?
|
Stateful operators in the query plan are numbered using operatorId
that starts with 0
.
IncrementalExecution
adds one Rule[SparkPlan]
called state
to preparations sequence of rules as the first element.
Caution
|
FIXME What does IncrementalExecution do? Where is it used?
|
It contains a sequence of rules called preparations
(of type Seq[Rule[SparkPlan]]
) that will be applied in order to the physical plan before execution, i.e. generates SparkPlan
by executing executedPlan lazy value.
preparations
rules are meant to allow access to the intermediate phases of query execution for developers.
executedPlan
lazy value is a SparkPlan
ready for execution after applying the rules in preparations.
debug
package object contains methods for debugging query execution that you can use to do the full analysis of your queries (as Dataset
objects).
debug()
debugCodegen()
The debug
package object belongs to org.apache.spark.sql.execution.debug
package
Import the package and do the full analysis using debug
method.
import org.apache.spark.sql.execution.debug._
scala> spark.range(10).where('id === 4).debug
Results returned: 1
== WholeStageCodegen ==
Tuples output: 1
id LongType: {java.lang.Long}
== Filter (id#12L = 4) ==
Tuples output: 0
id LongType: {}
== Range (0, 10, splits=8) ==
Tuples output: 0
id LongType: {}
You can also perform debugCodegen
.
import org.apache.spark.sql.execution.debug._
scala> spark.range(10).where('id === 4).debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Filter (id#8L = 4)
+- *Range (0, 10, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for
* Filter (id#8L = 4)
* +- Range (0, 10, splits=8)
*/
...
scala> spark.range(1, 1000).select('id+1+2+3, 'id+4+5+6).queryExecution.debug.codegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [(((id#0L + 1) + 2) + 3) AS (((id + 1) + 2) + 3)#3L,(((id#0L + 4) + 5) + 6) AS (((id + 4) + 5) + 6)#4L]
+- *Range (1, 1000, splits=8)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for
...
/* 111 */ if (shouldStop()) return;
/* 112 */ }
/* 113 */ }
/* 114 */ }