StateStoreRDD
is an RDD
for executing storeUpdateFunction with StateStore (and data from partitions of the data RDD).
StateStoreRDD
is created for the following stateful physical operators (using StateStoreOps.mapPartitionsWithStateStore):
StateStoreRDD
uses StateStoreCoordinator
for the preferred locations of a partition for job scheduling.
getPartitions
is exactly the partitions of the data RDD.
compute(
partition: Partition,
ctxt: TaskContext): Iterator[U]
Note
|
compute is part of the RDD Contract to compute a given partition.
|
compute
computes dataRDD passing the result on to storeUpdateFunction (with a configured StateStore).
Internally, (and similarly to getPreferredLocations) compute
creates a StateStoreProviderId
with StateStoreId
(using checkpointLocation, operatorId and the index of the input partition
) and queryRunId.
compute
then requests StateStore
for the store for the StateStoreProviderId.
In the end, compute
computes dataRDD (using the input partition
and ctxt
) followed by executing storeUpdateFunction (with the store and the result).
getPreferredLocations(partition: Partition): Seq[String]
Note
|
getPreferredLocations is a part of the RDD Contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
getPreferredLocations
creates a StateStoreProviderId
with StateStoreId
(using checkpointLocation, operatorId and the index of the input partition
) and queryRunId.
Note
|
checkpointLocation and operatorId are shared across different partitions and so the only difference in StateStoreProviderIds is the partition index.
|
In the end, getPreferredLocations
requests StateStoreCoordinatorRef for the location of the state store for StateStoreProviderId
.
Note
|
StateStoreCoordinator coordinates instances of StateStores across Spark executors in the cluster, and tracks their locations for job scheduling.
|
StateStoreRDD
takes the following to be created:
-
Store update function (
(StateStore, Iterator[T]) ⇒ Iterator[U]
whereT
is the type of rows in the data RDD) -
Optional StateStoreCoordinatorRef
StateStoreRDD
initializes the internal properties.