StateStoreSaveExec
is a unary physical operator that writes a streaming state to a state store with support for streaming watermark.
Note
|
A unary physical operator ( Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book. |
StateStoreSaveExec
is created exclusively when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation for execution (Aggregate
logical operators in the logical plan of a streaming query).
The optional properties, i.e. the StatefulOperatorStateInfo, the output mode, and the event-time watermark, are initially undefined when StateStoreSaveExec
is created. StateStoreSaveExec
is updated to hold execution-specific configuration when IncrementalExecution
is requested to prepare the logical plan (of a streaming query) for execution (when the state preparation rule is executed).
Note
|
Unlike StateStoreRestoreExec operator, StateStoreSaveExec takes output mode and event time watermark when created.
|
When executed, StateStoreSaveExec
creates a StateStoreRDD to map over partitions with storeUpdateFunction
that manages the StateStore
.
Note
|
The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the child physical plan. There will be that many |
Note
|
StateStoreSaveExec behaves differently per output mode.
|
When executed, StateStoreSaveExec
executes the child physical operator and creates a StateStoreRDD (with storeUpdateFunction
specific to the output mode).
The output schema of StateStoreSaveExec
is exactly the child's output schema.
The output partitioning of StateStoreSaveExec
is exactly the child's output partitioning.
StateStoreRestoreExec
uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).
Tip
|
Enable Add the following line to
Refer to Logging. |
Demo: StateStoreSaveExec Operator in Physical Plan of Streaming Aggregation (Dataset.groupBy Operator)
// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
val numShufflePartitions = 1
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, numShufflePartitions)
assert(spark.sessionState.conf.numShufflePartitions == numShufflePartitions)
// END: Only for easier debugging
val counts = spark
.readStream
.format("rate")
.load
.groupBy(window($"timestamp", "5 seconds") as "group")
.agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
.orderBy("group") // <-- makes for easier checking
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
+- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
+- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 3c4d9c60-5fda-4053-aeef-4ddd2b485605, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
+- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 3c4d9c60-5fda-4053-aeef-4ddd2b485605, opId = 0, ver = 0, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
+- Exchange hashpartitioning(window#11, 1)
+- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
+- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
+- *(1) Filter isnotnull(timestamp#0)
+- StreamingRelation rate, [timestamp#0, value#1L]
// Start the query and hence execute StateStoreSaveExec
import org.apache.spark.sql.streaming.OutputMode
val queryName = "rate2memory"
val checkpointLocation = s"/tmp/checkpoint-$queryName"
val query = counts
.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Complete)
.start
// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val lastExecution = query
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
.lastExecution
val logicalPlan = lastExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 Sort [group#5 ASC NULLS FIRST], true
01 +- Aggregate [window#11], [window#11 AS group#5, count(value#1L) AS value_count#10L]
02 +- Filter isnotnull(timestamp#0)
03 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#11, timestamp#0, value#1L]
04 +- Project [timestamp#249 AS timestamp#0, value#250L AS value#1L]
05 +- Streaming RelationV2 rate[timestamp#249, value#250L]
StateStoreSaveExec
uses the performance metrics of the parent StateStoreWriter.
Key | Name (in UI) | Description | ||||||
---|---|---|---|---|---|---|---|---|
|
||||||||
|
||||||||
|
||||||||
|
||||||||
|
Number of the state keys in the state store Corresponds to |
|||||||
|
Number of the state keys that were stored as updates in the state store in a trigger and for the keys in the result rows of the upstream physical operator.
|
|||||||
|
Memory used by the StateStore |
StateStoreSaveExec
takes the following to be created:
-
Key expressions, i.e. Catalyst attributes for the grouping keys
-
Execution-specific StatefulOperatorStateInfo (default:
None
) -
Execution-specific output mode (default:
None
) -
Version of the state format (based on the spark.sql.streaming.aggregation.stateFormatVersion configuration property)
doExecute(): RDD[InternalRow]
Note
|
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow] ).
|
Internally, doExecute
initializes metrics.
Note
|
doExecute requires that the optional outputMode is at this point defined (that should have happened when IncrementalExecution had prepared a streaming aggregation for execution).
|
doExecute
executes child physical operator and creates a StateStoreRDD with storeUpdateFunction
that:
-
Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child).
-
Branches off per output mode.
Output Mode | doExecute’s Behaviour | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|||||||||||||
The number of keys stored in the state store is recorded in numUpdatedStateRows metric.
|
|||||||||||||
Returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the "young" rows in the state store (one by one, i.e. every
Returns In
In |
doExecute
reports a UnsupportedOperationException
when executed with an invalid output mode.
Invalid output mode: [outputMode]