BlockManagerMaster
runs on the driver and executors.
BlockManagerMaster
uses BlockManagerMasterEndpoint registered under BlockManagerMaster
RPC endpoint name on the driver (with the endpoint references on executors) to allow executors for sending block status updates to it and hence keep track of block statuses.
Note
|
An instance of BlockManagerMaster is created in SparkEnv (for the driver and executors), and immediately used to create their BlockManagers.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
An instance of BlockManagerMaster
requires a BlockManagerMaster RPC endpoint reference, SparkConf, and the isDriver
flag to control whether it is created for the driver or executors.
Note
|
An instance of BlockManagerMaster is created as part of creating an instance of SparkEnv for the driver and executors.
|
removeExecutor(execId: String): Unit
removeExecutor
posts RemoveExecutor(execId)
to BlockManagerMaster RPC endpoint and waits for a response.
If false
in response comes in, a SparkException
is thrown with the following message:
BlockManagerMasterEndpoint returned false, expected true.
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: Removed executor [execId]
removeBlock(blockId: BlockId)
removeBlock
removes blockId
block …FIXME
It posts a RemoveBlock
message to BlockManagerMaster RPC endpoint and waits for a response.
removeRdd(rddId: Int, blocking: Boolean)
removeRdd
removes all the blocks of rddId
RDD, possibly in a blocking
fashion.
It posts a RemoveRdd(rddId)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove RDD [rddId] - [exception]
If it is a blocking
operation, it waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
removeShuffle(shuffleId: Int, blocking: Boolean)
removeShuffle
removes all the blocks of shuffleId
shuffle, possibly in a blocking
fashion.
It posts a RemoveShuffle(shuffleId)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove shuffle [shuffleId] - [exception]
If it is a blocking
operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean)
removeBroadcast
removes all the blocks of broadcastId
broadcast, possibly in a blocking
fashion.
It posts a RemoveBroadcast(broadcastId, removeFromMaster)
message to BlockManagerMaster RPC endpoint on a separate thread.
If there is an issue, you should see the following WARN message in the logs and the entire exception:
WARN Failed to remove broadcast [broadcastId] with removeFromMaster = [removeFromMaster] - [exception]
If it is a blocking
operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
stop(): Unit
stop
sends a StopBlockManagerMaster
message to BlockManagerMaster RPC endpoint and waits for a response.
Note
|
It is only executed for the driver. |
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: BlockManagerMaster stopped
Otherwise, a SparkException
is thrown.
BlockManagerMasterEndpoint returned false, expected true.
registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): Unit
When registerBlockManager
runs, you should see the following INFO message in the logs:
INFO BlockManagerMaster: Trying to register BlockManager
It then informs the driver about the new BlockManager
by sending RegisterBlockManager
to BlockManagerMaster RPC endpoint and waiting for a response.
If all goes fine, you should see the following INFO message in the logs:
INFO BlockManagerMaster: Registered BlockManager
Otherwise, a SparkException
is thrown.
BlockManagerMasterEndpoint returned false, expected true.
Note
|
registerBlockManager is called while BlockManager is being initialized (on the driver and executors) and while re-registering blocks to the driver.
|
updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean
updateBlockInfo
sends a UpdateBlockInfo
message to BlockManagerMaster RPC endpoint and waits for a response.
You should see the following DEBUG message in the logs:
DEBUG BlockManagerMaster: Updated info of block [blockId]
The response from the BlockManagerMaster RPC endpoint is returned.
getLocations(blockId: BlockId): Seq[BlockManagerId]
getLocations
posts GetLocations(blockId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]
getLocations
posts GetLocationsMultipleBlockIds(blockIds)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId]
getPeers
posts GetPeers(blockManagerId)
message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef]
getExecutorEndpointRef
posts GetExecutorEndpointRef(executorId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getMemoryStatus: Map[BlockManagerId, (Long, Long)]
getMemoryStatus
posts a GetMemoryStatus
message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getStorageStatus: Array[StorageStatus]
getStorageStatus
posts a GetStorageStatus
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.
getBlockStatus(
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus]
getBlockStatus
posts a GetBlockStatus(blockId, askSlaves)
message to BlockManagerMaster RPC endpoint and waits for a response (of type Map[BlockManagerId, Future[Option[BlockStatus]]]
).
It then builds a sequence of future results that are BlockStatus
statuses and waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
No result leads to a SparkException
with the following message:
BlockManager returned null for BlockStatus query: [blockId]
getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId]
getMatchingBlockIds
posts a GetMatchingBlockIds(filter, askSlaves)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result for spark.rpc.askTimeout, spark.network.timeout or 120
secs.
hasCachedBlocks(executorId: String): Boolean
hasCachedBlocks
posts a HasCachedBlocks(executorId)
message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result.
BlockManagerMasterEndpoint
is the RPC endpoint for BlockManagerMaster on the driver (aka master node) to track statuses of the block managers on executors.
Note
|
It is used to register the BlockManagerMaster RPC endpoint when creating SparkEnv.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
blockLocations
is a collection of BlockId
and its locations (as BlockManagerId
).
Note
|
It is used in removeRdd to remove blocks for a RDD, removeBlockManager to remove blocks after a BlockManager gets removed, removeBlockFromWorkers , updateBlockInfo , and getLocations.
|
RemoveExecutor(execId: String)
When RemoveExecutor
is received, executor execId
is removed and the response true
sent back.
GetLocations(blockId: BlockId)
When GetLocations
comes in, the internal getLocations method is executed and the result becomes the response sent back.
Note
|
GetLocations is used to get the block locations of a single block.
|
RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
sender: RpcEndpointRef)
When RegisterBlockManager
is received, the internal register method is executed.
Note
|
RegisterBlockManager is used to register a BlockManager to the driver.
|
register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit
register
records the current time and registers BlockManager
by id
if it has not been already registered (using the internal blockManagerInfo
registry).
Registering a BlockManager can only happen once for an executor (identified by BlockManagerId.executorId
using the internal blockManagerIdByExecutor
registry).
If another BlockManager
has earlier been registered for the executor, you should see the following ERROR message in the logs:
ERROR Got two different block manager registrations on same executor - will replace old one [oldId] with new one [id]
And then executor is removed.
You should see the following INFO message in the logs:
INFO Registering block manager [hostPort] with [bytes] RAM, [id]
The BlockManager
is recorded in the internal registries: blockManagerIdByExecutor
and blockManagerInfo
.
Caution
|
FIXME Why does blockManagerInfo require a new System.currentTimeMillis() since time was already recorded?
|
In either case, SparkListenerBlockManagerAdded(time, id, maxMemSize) is posted to listenerBus.
Note
|
The method can only be executed on the driver where listenerBus is available.
|
Caution
|
FIXME Describe listenerBus + omnigraffle it.
|
-
UpdateBlockInfo
-
GetLocationsMultipleBlockIds
-
GetPeers
-
GetRpcHostPortForExecutor
-
GetMemoryStatus
-
GetStorageStatus
-
GetBlockStatus
-
GetMatchingBlockIds
-
RemoveRdd
-
RemoveShuffle
-
RemoveBroadcast
-
RemoveBlock
-
StopBlockManagerMaster
-
BlockManagerHeartbeat
-
HasCachedBlocks
removeExecutor(execId: String)
When executed, removeExecutor
prints the following INFO message to the logs:
INFO BlockManagerMasterEndpoint: Trying to remove executor [execId] from BlockManagerMaster.
If the execId
executor is found in the internal blockManagerIdByExecutor
registry, the BlockManager
for the executor is removed.
removeBlockManager(blockManagerId: BlockManagerId)
When executed, removeBlockManager
looks up blockManagerId
and removes the executor it was working on from the internal blockManagerIdByExecutor
as well as from blockManagerInfo
.
Note
|
It is a private helper method that is exclusively used while removing an executor. |
It then goes over all the blocks for the BlockManager
, and removes the executor for each block from blockLocations
registry.
SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId) is posted to listenerBus.
You should then see the following INFO message in the logs:
INFO BlockManagerMasterEndpoint: Removing block manager [blockManagerId]