Skip to content

Latest commit

 

History

History
188 lines (116 loc) · 8.14 KB

spark-sparkenv.adoc

File metadata and controls

188 lines (116 loc) · 8.14 KB

SparkEnv - Spark Runtime Environment

Spark Runtime Environment (SparkEnv) is the runtime environment with Spark services that interact with each other to build the entire Spark computing platform.

Spark Runtime Environment is represented by a SparkEnv object that holds all the required services for a running Spark instance, i.e. a master or an executor.

Tip

Enable INFO or DEBUG logging level for org.apache.spark.SparkEnv logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.SparkEnv=DEBUG

Refer to Logging.

SparkEnv

SparkEnv holds all runtime objects for a running Spark instance, using SparkEnv.createDriverEnv() for a driver and SparkEnv.createExecutorEnv() for an executor.

You can access the Spark environment using SparkEnv.get.

scala> import org.apache.spark._
import org.apache.spark._

scala> SparkEnv.get
res0: org.apache.spark.SparkEnv = org.apache.spark.SparkEnv@2220c5f7

Creating "Base" SparkEnv (create method)

create(
  conf: SparkConf,
  executorId: String,
  hostname: String,
  port: Int,
  isDriver: Boolean,
  isLocal: Boolean,
  numUsableCores: Int,
  listenerBus: LiveListenerBus = null,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

create is a internal helper method to create a "base" SparkEnv regardless of the target environment — be it a driver or an executor.

When executed, create creates a Serializer (based on spark.serializer). You should see the following DEBUG message in the logs:

DEBUG Using serializer: [serializer.getClass]

It creates another Serializer (based on spark.closure.serializer).

It creates a ShuffleManager based on spark.shuffle.manager setting.

It creates a MemoryManager based on spark.memory.useLegacyMode setting (with UnifiedMemoryManager being the default).

It creates a BlockManagerMaster object with the BlockManagerMaster RPC endpoint reference (by registering or looking it up by name and BlockManagerMasterEndpoint), the input SparkConf, and the input isDriver flag.

sparkenv driver blockmanager
Figure 1. Creating BlockManager for the Driver
Note
create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.
sparkenv executor blockmanager
Figure 2. Creating BlockManager for Executor

It creates a BlockManager (using the above BlockManagerMaster object and other services).

It creates a BroadcastManager.

It creates a CacheManager.

It creates a MetricsSystem for a driver and a worker separately.

It initializes userFiles temporary directory used for downloading dependencies for a driver while this is the executor’s current working directory for an executor.

An OutputCommitCoordinator is created.

Note
create is called by createDriverEnv and createExecutorEnv.

Registering or Looking up RPC Endpoint by Name (registerOrLookupEndpoint method)

registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint)

registerOrLookupEndpoint registers or looks up a RPC endpoint by name.

If called from the driver, you should see the following INFO message in the logs:

INFO SparkEnv: Registering [name]

And the RPC endpoint is registered in the RPC environment.

Otherwise, it obtains a RPC endpoint reference by name.

Creating SparkEnv for Driver (createDriverEnv method)

createDriverEnv(
  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus,
  numCores: Int,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

createDriverEnv creates a SparkEnv execution environment for the driver.

sparkenv driver
Figure 3. Spark Environment for driver

The method accepts an instance of SparkConf, whether it runs in local mode or not, LiveListenerBus, the number of driver’s cores to use for execution in local mode or 0 otherwise, and a OutputCommitCoordinator (default: none).

createDriverEnv ensures that spark.driver.host and spark.driver.port settings are set in conf SparkConf.

It then passes the call straight on to the create helper method (with driver executor id, isDriver enabled, and the input parameters).

Note
createDriverEnv is exclusively used by SparkContext to create a SparkEnv (while a SparkContext is being created for the driver).

Creating SparkEnv for Executor (createExecutorEnv method)

SparkEnv.createExecutorEnv creates an executor’s (execution) environment that is the Spark execution environment for an executor.

sparkenv executor
Figure 4. Spark Environment for executor

It uses SparkConf, the executor’s identifier, hostname, port, the number of cores, and whether or not it runs in local mode.

For Akka-based RPC Environment (obsolete since Spark 1.6.0-SNAPSHOT), the name of the actor system for an executor is sparkExecutor.

It creates an MapOutputTrackerWorker object and looks up MapOutputTracker RPC endpoint. See MapOutputTracker.

It creates a MetricsSystem for executor and starts it.

An OutputCommitCoordinator is created and OutputCommitCoordinator RPC endpoint looked up.

serializer

Caution
FIXME

closureSerializer

Caution
FIXME

Settings

spark.driver.host

spark.driver.host is the name of the machine where the driver runs. It is set when SparkContext is created.

spark.driver.port

spark.driver.port is the port the driver listens to. It is first set to 0 in the driver when SparkContext is initialized. It is later set to the port of RpcEnv of the driver (in SparkEnv.create).

spark.serializer

spark.serializer (default: org.apache.spark.serializer.JavaSerializer) - the Serializer.

spark.closure.serializer

spark.closure.serializer (default: org.apache.spark.serializer.JavaSerializer) is the Serializer.

spark.shuffle.manager

spark.shuffle.manager (default: sort) - one of the three available implementations of ShuffleManager or a fully-qualified class name of a custom implementation of ShuffleManager:

  • hash or org.apache.spark.shuffle.hash.HashShuffleManager

  • sort or org.apache.spark.shuffle.sort.SortShuffleManager

  • tungsten-sort or org.apache.spark.shuffle.sort.SortShuffleManager

spark.memory.useLegacyMode

spark.memory.useLegacyMode (default: false) controls the MemoryManager in use. It is StaticMemoryManager when enabled (true) or UnifiedMemoryManager when disabled (false).