Spark Runtime Environment (SparkEnv
) is the runtime environment with Spark services that interact with each other to build the entire Spark computing platform.
Spark Runtime Environment is represented by a SparkEnv object that holds all the required services for a running Spark instance, i.e. a master or an executor.
Tip
|
Enable Add the following line to
Refer to Logging. |
SparkEnv holds all runtime objects for a running Spark instance, using SparkEnv.createDriverEnv() for a driver and SparkEnv.createExecutorEnv() for an executor.
You can access the Spark environment using SparkEnv.get
.
scala> import org.apache.spark._
import org.apache.spark._
scala> SparkEnv.get
res0: org.apache.spark.SparkEnv = org.apache.spark.SparkEnv@2220c5f7
create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
numUsableCores: Int,
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv
create
is a internal helper method to create a "base" SparkEnv
regardless of the target environment — be it a driver or an executor.
When executed, create
creates a Serializer
(based on spark.serializer). You should see the following DEBUG
message in the logs:
DEBUG Using serializer: [serializer.getClass]
It creates another Serializer
(based on spark.closure.serializer).
It creates a ShuffleManager based on spark.shuffle.manager setting.
It creates a MemoryManager based on spark.memory.useLegacyMode setting (with UnifiedMemoryManager being the default).
It creates a NettyBlockTransferService.
It creates a BlockManagerMaster object with the BlockManagerMaster
RPC endpoint reference (by registering or looking it up by name and BlockManagerMasterEndpoint), the input SparkConf, and the input isDriver
flag.
Note
|
create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.
|
It creates a BlockManager (using the above BlockManagerMaster
object and other services).
It creates a BroadcastManager.
It creates a CacheManager.
It creates a MetricsSystem for a driver and a worker separately.
It initializes userFiles
temporary directory used for downloading dependencies for a driver while this is the executor’s current working directory for an executor.
An OutputCommitCoordinator is created.
Note
|
create is called by createDriverEnv and createExecutorEnv.
|
registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint)
registerOrLookupEndpoint
registers or looks up a RPC endpoint by name
.
If called from the driver, you should see the following INFO message in the logs:
INFO SparkEnv: Registering [name]
And the RPC endpoint is registered in the RPC environment.
Otherwise, it obtains a RPC endpoint reference by name
.
createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv
createDriverEnv
creates a SparkEnv
execution environment for the driver.
The method accepts an instance of SparkConf, whether it runs in local mode or not, LiveListenerBus, the number of driver’s cores to use for execution in local mode or 0
otherwise, and a OutputCommitCoordinator (default: none).
createDriverEnv
ensures that spark.driver.host and spark.driver.port settings are set in conf
SparkConf.
It then passes the call straight on to the create helper method (with driver
executor id, isDriver
enabled, and the input parameters).
Note
|
createDriverEnv is exclusively used by SparkContext to create a SparkEnv (while a SparkContext is being created for the driver).
|
SparkEnv.createExecutorEnv
creates an executor’s (execution) environment that is the Spark execution environment for an executor.
It uses SparkConf, the executor’s identifier, hostname, port, the number of cores, and whether or not it runs in local mode.
For Akka-based RPC Environment (obsolete since Spark 1.6.0-SNAPSHOT), the name of the actor system for an executor is sparkExecutor.
It creates an MapOutputTrackerWorker object and looks up MapOutputTracker
RPC endpoint. See MapOutputTracker.
It creates a MetricsSystem for executor and starts it.
An OutputCommitCoordinator is created and OutputCommitCoordinator RPC endpoint looked up.
spark.driver.host
is the name of the machine where the driver runs. It is set when SparkContext is created.
spark.driver.port
is the port the driver listens to. It is first set to 0
in the driver when SparkContext is initialized. It is later set to the port of RpcEnv of the driver (in SparkEnv.create).
spark.serializer
(default: org.apache.spark.serializer.JavaSerializer
) - the Serializer.
spark.closure.serializer
(default: org.apache.spark.serializer.JavaSerializer
) is the Serializer.
spark.shuffle.manager
(default: sort
) - one of the three available implementations of ShuffleManager or a fully-qualified class name of a custom implementation of ShuffleManager
:
-
hash
ororg.apache.spark.shuffle.hash.HashShuffleManager
-
sort
ororg.apache.spark.shuffle.sort.SortShuffleManager
-
tungsten-sort
ororg.apache.spark.shuffle.sort.SortShuffleManager
spark.memory.useLegacyMode
(default: false
) controls the MemoryManager in use. It is StaticMemoryManager
when enabled (true
) or UnifiedMemoryManager when disabled (false
).