Skip to content

Latest commit

 

History

History
178 lines (124 loc) · 6.19 KB

spark-sql-query-execution.adoc

File metadata and controls

178 lines (124 loc) · 6.19 KB

Query Execution

QueryExecution is a part of Dataset and represents the query execution that will eventually produce the data in a Dataset.

QueryExecution is the result of executing a LogicalPlan in a SparkSession (and so you could create a Dataset from a LogicalPlan or use the QueryExecution after executing the LogicalPlan).

QueryExecution uses the input SparkSession to access the current SparkPlanner (through SessionState that could also return a HiveSessionState) when it is created. It then computes a SparkPlan (a PhysicalPlan exactly) using the planner. It is available as the sparkPlan (lazy) attribute.

A streaming variant of QueryExecution is IncrementalExecution.

debug package object contains methods for debugging query execution that you can use to do the full analysis of your queries (as Dataset objects).

Caution
FIXME What’s planner? analyzed? Why do we need assertAnalyzed and assertSupported?

It belongs to org.apache.spark.sql.execution package.

Note
QueryExecution is a transient feature of a Dataset, i.e. it is not preserved across serializations.
val ds = spark.range(5)
scala> ds.queryExecution
res17: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Analyzed Logical Plan ==
id: bigint
Range 0, 5, 1, 8, [id#39L]

== Optimized Logical Plan ==
Range 0, 5, 1, 8, [id#39L]

== Physical Plan ==
WholeStageCodegen
:  +- Range 0, 1, 8, 5, [id#39L]

Creating QueryExecution Instance

class QueryExecution(
  val sparkSession: SparkSession,
  val logical: LogicalPlan)

QueryExecution requires a SparkSession and a LogicalPlan.

Lazy Attributes

QueryExecution holds the following lazy attributes (that are initializes when first read):

analyzed Attribute

analyzed being a LogicalPlan that is the input logical with unresolved attributes and relations resolved.

withCachedData Attribute

withCachedData being a LogicalPlan that is the analyzed plan after being analyzed, checked (for unsupported operations) and replaced with cached segments.

Optimized Logical Query Plan (optimizedPlan Attribute)

optimizedPlan is the LogicalPlan (of a query) that is a result of executing the session-owned Optimizer to withCachedData.

sparkPlan Attribute

sparkPlan being a SparkPlan that was computed using planner on optimizedPlan.

executedPlan Attribute

executedPlan being a SparkPlan that is sparkPlan prepared for execution by inserting shuffle operations and internal row format conversions where needed.

toRdd Attribute

toRdd being a RDD[InternalRow].

IncrementalExecution

IncrementalExecution is a custom QueryExecution with OutputMode, checkpointLocation, and currentBatchId.

It lives in org.apache.spark.sql.execution.streaming package.

Caution
FIXME What is stateStrategy?

Stateful operators in the query plan are numbered using operatorId that starts with 0.

IncrementalExecution adds one Rule[SparkPlan] called state to preparations sequence of rules as the first element.

Caution
FIXME What does IncrementalExecution do? Where is it used?

preparations - Rules to apply before Query Execution

It contains a sequence of rules called preparations (of type Seq[Rule[SparkPlan]]) that will be applied in order to the physical plan before execution, i.e. generates SparkPlan by executing executedPlan lazy value.

preparations rules are meant to allow access to the intermediate phases of query execution for developers.

executedPlan SparkPlan

executedPlan lazy value is a SparkPlan ready for execution after applying the rules in preparations.

Debugging Query Execution

debug package object contains methods for debugging query execution that you can use to do the full analysis of your queries (as Dataset objects).

debug()
debugCodegen()

The debug package object belongs to org.apache.spark.sql.execution.debug package

Import the package and do the full analysis using debug method.

import org.apache.spark.sql.execution.debug._

scala> spark.range(10).where('id === 4).debug
Results returned: 1
== WholeStageCodegen ==
Tuples output: 1
 id LongType: {java.lang.Long}
== Filter (id#12L = 4) ==
Tuples output: 0
 id LongType: {}
== Range (0, 10, splits=8) ==
Tuples output: 0
 id LongType: {}

You can also perform debugCodegen.

import org.apache.spark.sql.execution.debug._

scala> spark.range(10).where('id === 4).debugCodegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Filter (id#8L = 4)
+- *Range (0, 10, splits=8)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
 * Filter (id#8L = 4)
 * +- Range (0, 10, splits=8)
 */
...
scala> spark.range(1, 1000).select('id+1+2+3, 'id+4+5+6).queryExecution.debug.codegen()
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [(((id#0L + 1) + 2) + 3) AS (((id + 1) + 2) + 3)#3L,(((id#0L + 4) + 5) + 6) AS (((id + 4) + 5) + 6)#4L]
+- *Range (1, 1000, splits=8)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for
...
/* 111 */       if (shouldStop()) return;
/* 112 */     }
/* 113 */   }
/* 114 */ }