Skip to content

Latest commit

 

History

History
261 lines (172 loc) · 8.92 KB

spark-sql-streaming-HDFSBackedStateStoreProvider.adoc

File metadata and controls

261 lines (172 loc) · 8.92 KB

HDFSBackedStateStoreProvider — Default StateStoreProvider

HDFSBackedStateStoreProvider is the default StateStoreProvider (as specified by the spark.sql.streaming.stateStore.providerClass internal configuration property).

HDFSBackedStateStoreProvider is created and immediately requested to initialize when StateStoreProvider helper object is requested to create and initialize a StateStoreProvider.

HDFSBackedStateStoreProvider takes no arguments to be created.

HDFSBackedStateStoreProvider uses the state checkpoint base directory (that is the storeCheckpointLocation of the StateStoreId) for delta and snapshot state files. The checkpoint directory is created when HDFSBackedStateStoreProvider is requested to initialize.

Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider=ALL

Refer to Logging.

StateStoreId — Unique Identifier of State Store

As a StateStoreProvider, HDFSBackedStateStoreProvider is associated with a StateStoreId (which is a unique identifier of a state store for an operator and a partition).

HDFSBackedStateStoreProvider is given the StateStoreId at initialization (as requested by the StateStoreProvider contract).

The StateStoreId is then used for the following:

Textual Representation — toString Method

toString(): String
Note
toString is part of the java.lang.Object contract for the string representation of the object.

HDFSBackedStateStoreProvider uses the StateStoreId and the state checkpoint base directory for the textual representation:

HDFSStateStoreProvider[id = (op=[operatorId],part=[partitionId]),dir = [baseDir]]

Retrieving State Store by Version — getStore Method

getStore(version: Long): StateStore
Note
getStore is part of the StateStoreProvider Contract to get the StateStore for a given version.

getStore…​FIXME

deltaFile Internal Method

deltaFile(version: Long): Path

deltaFile creates a Hadoop Path of the [version].delta file in the baseDir.

Note
deltaFile is used when…​FIXME

fetchFiles Internal Method

fetchFiles(): Seq[StoreFile]

fetchFiles…​FIXME

Note
fetchFiles is used when HDFSBackedStateStoreProvider is requested to latestIterator, doSnapshot and cleanup.

Initializing StateStoreProvider — init Method

init(
  stateStoreId: StateStoreId,
  keySchema: StructType,
  valueSchema: StructType,
  indexOrdinal: Option[Int],
  storeConf: StateStoreConf,
  hadoopConf: Configuration): Unit
Note
init is part of the StateStoreProvider Contract to initialize itself.

init assigns the values of the input arguments to stateStoreId, keySchema, valueSchema, storeConf, and hadoopConf.

init uses the StateStoreConf to requests for the spark.sql.streaming.maxBatchesToRetainInMemory configuration property (that is then the numberOfVersionsToRetainInMemory).

In the end, init requests the CheckpointFileManager to create the baseDir directory (with subdirectories).

latestIterator Internal Method

latestIterator(): Iterator[UnsafeRowPair]

latestIterator…​FIXME

Note
latestIterator seems to be used exclusively in tests.

doSnapshot Internal Method

doSnapshot(): Unit

doSnapshot…​FIXME

Note
doSnapshot is used when…​FIXME

Cleaning Up — cleanup Internal Method

cleanup(): Unit

cleanup…​FIXME

Note
cleanup is used exclusively when doMaintenance.

Doing Maintenance — doMaintenance Method

doMaintenance(): Unit
Note
doMaintenance is part of the StateStoreProvider Contract to do maintenance if needed.

doMaintenance…​FIXME

Closing State Store Provider — close Method

close(): Unit
Note
close is part of the StateStoreProvider Contract to close the state store provider.

close…​FIXME

putStateIntoStateCacheMap Internal Method

putStateIntoStateCacheMap(
  newVersion: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow]): Unit

putStateIntoStateCacheMap…​FIXME

Note
putStateIntoStateCacheMap is used when HDFSBackedStateStoreProvider is requested to commitUpdates and loadMap.

commitUpdates Internal Method

commitUpdates(
  newVersion: Long,
  map: ConcurrentHashMap[UnsafeRow, UnsafeRow],
  output: DataOutputStream): Unit

commitUpdates…​FIXME

Note
commitUpdates is used exclusively when HDFSBackedStateStore is requested to commit state changes.

loadMap Internal Method

loadMap(version: Long): ConcurrentHashMap[UnsafeRow, UnsafeRow]

loadMap…​FIXME

Note
loadMap is used when HDFSBackedStateStoreProvider is requested to retrieve the state store for a specified version and latestIterator.

writeSnapshotFile Internal Method

writeSnapshotFile(
  version: Long,
  map: MapType): Unit

writeSnapshotFile…​FIXME

Note
writeSnapshotFile is used when…​FIXME

updateFromDeltaFile Internal Method

updateFromDeltaFile(
  version: Long,
  map: MapType): Unit

updateFromDeltaFile…​FIXME

Note
updateFromDeltaFile is used exclusively when HDFSBackedStateStoreProvider is requested to loadMap.

Internal Properties

Name Description

loadedMaps

loadedMaps: TreeMap[Long, ConcurrentHashMap[UnsafeRow, UnsafeRow]]

java.util.TreeMap of FIXME sorted according to the reversed natural ordering of the keys

The current size estimation of loadedMaps is the memoryUsedBytes metric in the metrics.

A new entry (a version and the associated map) is added when HDFSBackedStateStoreProvider is requested to putStateIntoStateCacheMap

Used when…​FIXME

numberOfVersionsToRetainInMemory

numberOfVersionsToRetainInMemory: Int

numberOfVersionsToRetainInMemory is the spark.sql.streaming.maxBatchesToRetainInMemory configuration property that sets the upper limit on the number of entries in the loadedMaps internal registry.

numberOfVersionsToRetainInMemory is used when HDFSBackedStateStoreProvider is requested to putStateIntoStateCacheMap.