StateStoreRestoreExec
is a unary physical operator that reads (restores) a streaming state from a state store (for the keys from the child physical operator).
Note
|
A unary physical operator ( Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book. |
StateStoreRestoreExec
is created exclusively when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation for execution (Aggregate
logical operators in the logical plan of a streaming query).
The optional StatefulOperatorStateInfo is initially undefined (i.e. when StateStoreRestoreExec
is created). StateStoreRestoreExec
is updated to hold the streaming batch-specific execution property when IncrementalExecution
prepares a streaming physical plan for execution (and state preparation rule is executed when StreamExecution
plans a streaming query for a streaming batch).
When executed, StateStoreRestoreExec
executes the child physical operator and creates a StateStoreRDD to map over partitions with storeUpdateFunction
that restores the state for the keys in the input rows if available.
The output schema of StateStoreRestoreExec
is exactly the child's output schema.
The output partitioning of StateStoreRestoreExec
is exactly the child's output partitioning.
Demo: StateStoreRestoreExec Operator in Physical Plan of Streaming Aggregation (Dataset.groupBy Operator)
val query = spark.
readStream.
format("rate").
load.
withWatermark(eventTime = "timestamp", delayThreshold = "20 seconds").
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "value_count").
orderBy($"value_count".asc)
// Logical plan with Aggregate logical operator
val logicalPlan = query.queryExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 'Sort ['value_count ASC NULLS FIRST], true
01 +- Aggregate [window#25-T20000ms], [window#25-T20000ms AS group#19, count(value#15L) AS value_count#24L]
02 +- Filter isnotnull(timestamp#14-T20000ms)
03 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#25-T20000ms, timestamp#14-T20000ms, value#15L]
04 +- EventTimeWatermark timestamp#14: timestamp, interval 20 seconds
05 +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@6364e814, rate, [timestamp#14, value#15L]
// Physical plan with StateStoreRestoreExec (as StateStoreRestore in the output)
scala> query.explain
== Physical Plan ==
*(5) Sort [value_count#24L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(value_count#24L ASC NULLS FIRST, 200)
+- *(4) HashAggregate(keys=[window#25-T20000ms], functions=[count(value#15L)])
+- StateStoreSave [window#25-T20000ms], state info [ checkpoint = <unknown>, runId = 13c39b49-2fda-4575-bffb-08e94614420a, opId = 0, ver = 0, numPartitions = 200], Append, 0, 2
+- *(3) HashAggregate(keys=[window#25-T20000ms], functions=[merge_count(value#15L)])
+- StateStoreRestore [window#25-T20000ms], state info [ checkpoint = <unknown>, runId = 13c39b49-2fda-4575-bffb-08e94614420a, opId = 0, ver = 0, numPartitions = 200], 2
+- *(2) HashAggregate(keys=[window#25-T20000ms], functions=[merge_count(value#15L)])
+- Exchange hashpartitioning(window#25-T20000ms, 200)
+- *(1) HashAggregate(keys=[window#25-T20000ms], functions=[partial_count(value#15L)])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#14-T20000ms, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#25-T20000ms, value#15L]
+- *(1) Filter isnotnull(timestamp#14-T20000ms)
+- EventTimeWatermark timestamp#14: timestamp, interval 20 seconds
+- StreamingRelation rate, [timestamp#14, value#15L]
Key | Name (in UI) | Description |
---|---|---|
|
number of output rows |
The number of input rows from the child physical operator (for which |
StateStoreRestoreExec
takes the following to be created:
-
Key expressions, i.e. Catalyst attributes for the grouping keys
-
Optional StatefulOperatorStateInfo (default:
None
) -
Version of the state format (based on the spark.sql.streaming.aggregation.stateFormatVersion configuration property)
stateManager: StreamingAggregationStateManager
stateManager
is a StreamingAggregationStateManager that is created together with StateStoreRestoreExec
.
The StreamingAggregationStateManager
is created for the keys, the output schema of the child physical operator and the version of the state format.
The StreamingAggregationStateManager
is used when StateStoreRestoreExec
is requested to generate a recipe for a distributed computation (as a RDD[InternalRow]) for the following:
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
Internally, doExecute
executes child physical operator and creates a StateStoreRDD with storeUpdateFunction
that does the following per child operator’s RDD partition:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child operator).
-
For every input row (as
InternalRow
)-
Extracts the key from the row (using the unsafe projection above)
-
Gets the saved state in
StateStore
for the key if available (it might not be if the key appeared in the input the first time) -
Increments numOutputRows metric (that in the end is the number of rows from the child operator)
-
Generates collection made up of the current row and possibly the state for the key if available
-
Note
|
The number of rows from StateStoreRestoreExec is the number of rows from the child operator with additional rows for the saved state.
|
Note
|
There is no way in StateStoreRestoreExec to find out how many rows had associated state available in a state store. You would have to use the corresponding StateStoreSaveExec operator’s metrics (most likely number of total state rows but that could depend on the output mode).
|