Skip to content

Latest commit

 

History

History
324 lines (219 loc) · 20.1 KB

spark-sql-streaming-StateStoreSaveExec.adoc

File metadata and controls

324 lines (219 loc) · 20.1 KB

StateStoreSaveExec Unary Physical Operator — Saving State of Streaming Aggregates

StateStoreSaveExec is a unary physical operator that writes a streaming state to a state store with support for streaming watermark.

Note

A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.

Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book.

StateStoreSaveExec is created exclusively when StatefulAggregationStrategy execution planning strategy is requested to plan a streaming aggregation for execution (Aggregate logical operators in the logical plan of a streaming query).

StateStoreSaveExec StatefulAggregationStrategy
Figure 1. StateStoreSaveExec and StatefulAggregationStrategy

The optional properties, i.e. the StatefulOperatorStateInfo, the output mode, and the event-time watermark, are initially undefined when StateStoreSaveExec is created. StateStoreSaveExec is updated to hold execution-specific configuration when IncrementalExecution is requested to prepare the logical plan (of a streaming query) for execution (when the state preparation rule is executed).

StateStoreSaveExec IncrementalExecution
Figure 2. StateStoreSaveExec and IncrementalExecution
Note
Unlike StateStoreRestoreExec operator, StateStoreSaveExec takes output mode and event time watermark when created.

When executed, StateStoreSaveExec creates a StateStoreRDD to map over partitions with storeUpdateFunction that manages the StateStore.

StateStoreSaveExec StateStoreRDD
Figure 3. StateStoreSaveExec creates StateStoreRDD
StateStoreSaveExec StateStoreRDD count
Figure 4. StateStoreSaveExec and StateStoreRDD (after streamingBatch.toRdd.count)
Note

The number of partitions of StateStoreRDD (and hence the number of Spark tasks) is what was defined for the child physical plan.

There will be that many StateStores as there are partitions in StateStoreRDD.

Note
StateStoreSaveExec behaves differently per output mode.

When executed, StateStoreSaveExec executes the child physical operator and creates a StateStoreRDD (with storeUpdateFunction specific to the output mode).

The output schema of StateStoreSaveExec is exactly the child's output schema.

The output partitioning of StateStoreSaveExec is exactly the child's output partitioning.

StateStoreRestoreExec uses a StreamingAggregationStateManager (that is created for the keyExpressions, the output of the child physical operator and the stateFormatVersion).

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.StateStoreSaveExec to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.StateStoreSaveExec=ALL

Refer to Logging.

Demo: StateStoreSaveExec Operator in Physical Plan of Streaming Aggregation (Dataset.groupBy Operator)

// START: Only for easier debugging
// The state is then only for one partition
// which should make monitoring easier
val numShufflePartitions = 1
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, numShufflePartitions)

assert(spark.sessionState.conf.numShufflePartitions == numShufflePartitions)
// END: Only for easier debugging

val counts = spark
  .readStream
  .format("rate")
  .load
  .groupBy(window($"timestamp", "5 seconds") as "group")
  .agg(count("value") as "value_count") // <-- creates an Aggregate logical operator
  .orderBy("group")  // <-- makes for easier checking
scala> counts.explain
== Physical Plan ==
*(5) Sort [group#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(group#5 ASC NULLS FIRST, 1)
   +- *(4) HashAggregate(keys=[window#11], functions=[count(value#1L)])
      +- StateStoreSave [window#11], state info [ checkpoint = <unknown>, runId = 3c4d9c60-5fda-4053-aeef-4ddd2b485605, opId = 0, ver = 0, numPartitions = 1], Append, 0, 2
         +- *(3) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
            +- StateStoreRestore [window#11], state info [ checkpoint = <unknown>, runId = 3c4d9c60-5fda-4053-aeef-4ddd2b485605, opId = 0, ver = 0, numPartitions = 1], 2
               +- *(2) HashAggregate(keys=[window#11], functions=[merge_count(value#1L)])
                  +- Exchange hashpartitioning(window#11, 1)
                     +- *(1) HashAggregate(keys=[window#11], functions=[partial_count(value#1L)])
                        +- *(1) Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / 5000000.0)) END + 0) - 1) * 5000000) + 5000000), LongType, TimestampType)) AS window#11, value#1L]
                           +- *(1) Filter isnotnull(timestamp#0)
                              +- StreamingRelation rate, [timestamp#0, value#1L]

// Start the query and hence execute StateStoreSaveExec
import org.apache.spark.sql.streaming.OutputMode
val queryName = "rate2memory"
val checkpointLocation = s"/tmp/checkpoint-$queryName"
val query = counts
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(OutputMode.Complete)
  .start

// Give the streaming query a moment (one micro-batch)
// So lastExecution is available for the checkpointLocation
import scala.concurrent.duration._
query.awaitTermination(1.second.toMillis)

import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
val lastExecution = query
  .asInstanceOf[StreamingQueryWrapper]
  .streamingQuery
  .lastExecution
val logicalPlan = lastExecution.logical
scala> println(logicalPlan.numberedTreeString)
00 Sort [group#5 ASC NULLS FIRST], true
01 +- Aggregate [window#11], [window#11 AS group#5, count(value#1L) AS value_count#10L]
02    +- Filter isnotnull(timestamp#0)
03       +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#0, TimestampType, LongType) - 0) as double) / cast(5000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 5000000) + 0) + 5000000), LongType, TimestampType)) AS window#11, timestamp#0, value#1L]
04          +- Project [timestamp#249 AS timestamp#0, value#250L AS value#1L]
05             +- Streaming RelationV2 rate[timestamp#249, value#250L]

Performance Metrics (SQLMetrics)

StateStoreSaveExec uses the performance metrics of the parent StateStoreWriter.

Key Name (in UI) Description

allUpdatesTimeMs

allRemovalsTimeMs

commitTimeMs

numOutputRows

numTotalStateRows

Number of the state keys in the state store

Corresponds to numRowsTotal in stateOperators in StreamingQueryProgress (and is available as sq.lastProgress.stateOperators(0).numRowsTotal for 0th operator).

numUpdatedStateRows

Number of the state keys that were stored as updates in the state store in a trigger and for the keys in the result rows of the upstream physical operator.

  • In Complete output mode, numUpdatedStateRows is the number of input rows (which should be exactly the number of output rows from the upstream operator)

Caution
FIXME
  • In Append output mode, numUpdatedStateRows is the number of input rows with keys that have not expired yet (per required watermark)

  • In Update output mode, numUpdatedStateRows is exactly number of output rows, i.e. the number of keys that have not expired yet if watermark has been defined at all (which is optional).

Caution
FIXME
Note
You can see the current value as numRowsUpdated attribute in stateOperators in StreamingQueryProgress (that is available as StreamingQuery.lastProgress.stateOperators(n).numRowsUpdated for nth operator).

stateMemory

Memory used by the StateStore

StateStoreSaveExec webui query details
Figure 5. StateStoreSaveExec in web UI (Details for Query)

Creating StateStoreSaveExec Instance

StateStoreSaveExec takes the following to be created:

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

Internally, doExecute initializes metrics.

Note
doExecute requires that the optional outputMode is at this point defined (that should have happened when IncrementalExecution had prepared a streaming aggregation for execution).

doExecute executes child physical operator and creates a StateStoreRDD with storeUpdateFunction that:

  1. Generates an unsafe projection to access the key field (using keyExpressions and the output schema of child).

  2. Branches off per output mode.

Table 1. doExecute’s Behaviour per Output Mode
Output Mode doExecute’s Behaviour

Append

Note
Append is the default output mode when unspecified.
Note
Append output mode requires that a streaming query defines event time watermark (using withWatermark operator) on the event time column that is used in aggregation (directly or using window function).
  1. Finds late (aggregate) rows from child physical operator (that have expired per watermark)

  2. Stores the late rows in the state store (and increments numUpdatedStateRows metric)

  3. Gets all the added (late) rows from the state store

  4. Creates an iterator that removes the late rows from the state store when requested the next row and in the end commits the state updates

Note
numUpdatedStateRows metric is the number of rows that…​FIXME
Tip
Refer to Demo: StateStoreSaveExec with Append Output Mode for an example of StateStoreSaveExec in Append output mode.
Caution
FIXME When is "Filtering state store on:" printed out?
Caution
FIXME Track numUpdatedStateRows metric

  1. Uses watermarkPredicateForData predicate to exclude matching rows and (like in Complete output mode) stores all the remaining rows in StateStore.

  2. (like in Complete output mode) While storing the rows, increments numUpdatedStateRows metric (for every row) and records the total time in allUpdatesTimeMs metric.

  3. Takes all the rows from StateStore and returns a NextIterator that:

Complete

  1. Takes all UnsafeRow rows (from the parent iterator)

  2. Stores the rows by key in the state store eagerly (i.e. all rows that are available in the parent iterator before proceeding)

  3. Commits the state updates

  4. In the end, doExecute reads the key-row pairs from the state store and passes the rows along (i.e. to the following physical operator)

The number of keys stored in the state store is recorded in numUpdatedStateRows metric.

Note
In Complete output mode numOutputRows metric is exactly numTotalStateRows metric.
Tip
Refer to Demo: StateStoreSaveExec with Complete Output Mode for an example of StateStoreSaveExec in Complete output mode.

  1. Stores all rows (as UnsafeRow) in StateStore.

  2. While storing the rows, increments numUpdatedStateRows metric (for every row) and records the total time in allUpdatesTimeMs metric.

  3. Records 0 in allRemovalsTimeMs metric.

  4. Commits the state updates to StateStore and records the time in commitTimeMs metric.

  5. Records StateStore metrics.

  6. In the end, takes all the rows stored in StateStore and increments numOutputRows metric.

Update

Returns an iterator that filters out late aggregate rows (per watermark if defined) and stores the "young" rows in the state store (one by one, i.e. every next). With no more rows available, that removes the late rows from the state store (all at once) and commits the state updates.

Tip
Refer to Demo: StateStoreSaveExec with Update Output Mode for an example of StateStoreSaveExec in Update output mode.

Returns Iterator of rows that uses watermarkPredicateForData predicate to filter out late rows.

In hasNext, when rows are no longer available:

  1. Records the total time to iterate over all the rows in allUpdatesTimeMs metric.

  2. removeKeysOlderThanWatermark and records the time in allRemovalsTimeMs metric.

  3. Commits the updates to StateStore and records the time in commitTimeMs metric.

  4. Records StateStore metrics.

In next, stores a row in StateStore and increments numOutputRows and numUpdatedStateRows metrics.

doExecute reports a UnsupportedOperationException when executed with an invalid output mode.

Invalid output mode: [outputMode]