Executors are distributed agents that execute tasks.
They typically (i.e. not always) run for the entire lifetime of a Spark application. Executors send active task metrics to a driver and inform executor backends about task status updates (task results including).
Note
|
Executors are managed exclusively by executor backends. |
Executors provide in-memory storage for RDDs that are cached in Spark applications (via Block Manager).
When executors are started they register themselves with the driver and communicate directly to execute tasks.
Executor offers are described by executor id and the host on which an executor runs (see Resource Offers in this document).
Executors can run multiple tasks over its lifetime, both in parallel and sequentially. They track running tasks (by their task ids in runningTasks
internal map). Consult Launching Tasks section.
Executors use a thread pool for launching tasks and sending metrics.
It is recommended to have as many executors as data nodes and as many cores as you can get from the cluster.
Executors are described by their id, hostname, environment (as SparkEnv
), and classpath (and, less importantly, and more for internal optimization, whether they run in local or cluster mode).
Tip
|
Enable Add the following line to
Refer to Logging. |
Executor
requires executorId
, executorHostname
, a SparkEnv (as env
), userClassPath
and whether it runs in local or non-local mode (as isLocal
that is non-local by default).
Note
|
isLocal is enabled exclusively for LocalEndpoint (for Spark in local mode).
|
While an executor is being created you should see the following INFO messages in the logs:
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
INFO Executor: Using REPL class URI: http://[executorHostname]:56131
It creates an RPC endpoint for sending hearbeats to the driver (using the internal startDriverHeartbeater method).
The BlockManager is initialized (only when in non-local/cluster mode).
Note
|
The BlockManager for an executor is available in SparkEnv passed to the constructor.
|
A worker requires the additional services (beside the common ones like …):
-
executorActorSystemName
-
RPC Environment (for Akka only)
-
MetricsSystem with the name
executor
Note
|
A Executor is created when CoarseGrainedExecutorBackend receives RegisteredExecutor message, in MesosExecutorBackend.registered and when LocalEndpoint is created.
|
Caution
|
FIXME How many cores are assigned per executor? |
launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit
launchTask
creates a TaskRunner object, registers it in the internal runningTasks
map (by taskId
), and executes it on Executor task launch worker Thread Pool.
Note
|
launchTask is called by CoarseGrainedExecutorBackend (when it handles LaunchTask message), MesosExecutorBackend, and LocalEndpoint that represent different cluster managers.
|
Executors keep sending metrics for active tasks to the driver every spark.executor.heartbeatInterval (defaults to 10s
with some random initial delay so the heartbeats from different executors do not pile up on the driver).
An executor sends heartbeats using the internal heartbeater - Heartbeat Sender Thread.
For each task in TaskRunner (in the internal runningTasks registry), the task’s metrics are computed (i.e. mergeShuffleReadMetrics
and setJvmGCTime
) that become part of the heartbeat (with accumulators).
Caution
|
FIXME How do mergeShuffleReadMetrics and setJvmGCTime influence accumulators ?
|
Note
|
Executors track the TaskRunner that run tasks. A task might not be assigned to a TaskRunner yet when the executor sends a heartbeat. |
A blocking Heartbeat message that holds the executor id, all accumulator updates (per task id), and BlockManagerId is sent to HeartbeatReceiver RPC endpoint (with spark.executor.heartbeatInterval timeout).
Caution
|
FIXME When is heartbeatReceiverRef created?
|
If the response requests to reregister BlockManager, you should see the following INFO message in the logs:
INFO Executor: Told to re-register on heartbeat
The internal heartbeatFailures counter is reset (i.e. becomes 0
).
If there are any issues with communicating with the driver, you should see the following WARN message in the logs:
WARN Executor: Issue communicating with driver in heartbeater
The internal heartbeatFailures is incremented and checked to be less than the acceptable number of failures. If the number is greater, the following ERROR is printed out to the logs:
ERROR Executor: Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times
The executor exits (using System.exit
and exit code 56).
Tip
|
Read about TaskMetrics in TaskMetrics.
|
heartbeater
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is driver-heartbeater.
Coarse-grained executors are executors that use CoarseGrainedExecutorBackend for task scheduling.
Caution
|
FIXME |
FetchFailedException
exception is thrown when an executor (more specifically TaskRunner) has failed to fetch a shuffle block.
It contains the following:
-
the unique identifier for a BlockManager (as
BlockManagerId
) -
shuffleId
-
mapId
-
reduceId
-
message
- a short exception message -
cause
- aThrowable
object
TaskRunner catches it and informs ExecutorBackend about the case (using statusUpdate
with TaskState.FAILED
task state).
Caution
|
FIXME Image with the call to ExecutorBackend. |
Read resourceOffers in TaskSchedulerImpl and resourceOffer in TaskSetManager.
Executors use daemon cached thread pools called Executor task launch worker-ID (with ID
being the task id) for launching tasks.
You can control the amount of memory per executor using spark.executor.memory setting. It sets the available memory equally for all executors per application.
Note
|
The amount of memory per executor is looked up when SparkContext is created. |
You can change the assigned memory per executor per node in standalone cluster using SPARK_EXECUTOR_MEMORY environment variable.
You can find the value displayed as Memory per Node in web UI for standalone Master (as depicted in the figure below).
The above figure shows the result of running Spark shell with the amount of memory per executor defined explicitly (on command line), i.e.
./bin/spark-shell --master spark://localhost:7077 -c spark.executor.memory=2g
Executors use Metrics System (via ExecutorSource
) to report metrics about internal status.
Note
|
Metrics are only available for cluster modes, i.e. local mode turns metrics off.
|
The name of the source is executor.
It emits the following numbers:
-
threadpool.activeTasks - the approximate number of threads that are actively executing tasks (using ThreadPoolExecutor.getActiveCount())
-
threadpool.completeTasks - the approximate total number of tasks that have completed execution (using ThreadPoolExecutor.getCompletedTaskCount())
-
threadpool.currentPool_size - the current number of threads in the pool (using ThreadPoolExecutor.getPoolSize())
-
threadpool.maxPool_size - the maximum allowed number of threads that have ever simultaneously been in the pool (using ThreadPoolExecutor.getMaximumPoolSize())
-
filesystem.hdfs / read_bytes using FileSystem.getAllStatistics() and
getBytesRead()
-
filesystem.hdfs.write_bytes using FileSystem.getAllStatistics() and
getBytesWritten()
-
filesystem.hdfs.read_ops using FileSystem.getAllStatistics() and
getReadOps()
-
filesystem.hdfs.largeRead_ops using FileSystem.getAllStatistics() and
getLargeReadOps()
-
filesystem.hdfs.write_ops using FileSystem.getAllStatistics() and
getWriteOps()
-
filesystem.file.read_bytes
-
filesystem.file.write_bytes
-
filesystem.file.read_ops
-
filesystem.file.largeRead_ops
-
filesystem.file.write_ops
spark.executor.extraClassPath
is a list of URLs representing a user’s CLASSPATH.
Each entry is separated by system-dependent path separator, i.e. :
on Unix/MacOS systems and ;
on Microsoft Windows.
spark.executor.extraJavaOptions
- extra Java options for executors.
spark.executor.extraLibraryPath
- a list of additional library paths separated by system-dependent path separator, i.e. :
on Unix/MacOS systems and ;
on Microsoft Windows.
spark.executor.userClassPathFirst
(default: false
) controls whether to load classes in user jars before those in Spark jars.
spark.executor.heartbeatInterval
(default: 10s
) - the interval after which an executor reports heartbeat and metrics for active tasks to the driver. Refer to Sending heartbeats and partial metrics for active tasks.
spark.executor.heartbeat.maxFailures
(default: 60
) controls how many times an executor will try to send heartbeats to the driver before it gives up and exits (with exit code 56
).
Note
|
It was introduced in SPARK-13522 Executor should kill itself when it’s unable to heartbeat to the driver more than N times |
spark.executor.instances
(default: 0
) sets the number of executors to use.
When greater than 0
, it disables dynamic allocation.
spark.executor.memory
(default: 1g
) - the amount of memory to use per executor process (equivalent to SPARK_EXECUTOR_MEMORY environment variable).
See Executor Memory - spark.executor.memory setting in this document.
-
spark.executor.logs.rolling.maxSize
-
spark.executor.logs.rolling.maxRetainedFiles
-
spark.executor.logs.rolling.strategy
-
spark.executor.logs.rolling.time.interval
-
spark.executor.port
-
spark.executor.uri
- equivalent toSPARK_EXECUTOR_URI
-
spark.repl.class.uri
(default:null
) used when inspark-shell
to create REPL ClassLoader to load new classes defined in the Scala REPL as a user types code.Enable
INFO
logging level fororg.apache.spark.executor.Executor
logger to have the value printed out to the logs:INFO Using REPL class URI: [classUri]
-
spark.akka.frameSize
(default:128
MB, maximum:2047
MB) - the configured max frame size for Akka messages. If a task result is bigger, executors use block manager to send results back. -
spark.driver.maxResultSize
(default:1g
)
Caution
|
FIXME spark.driver.maxResultSize is used in few other pages so decide where it should belong to and link the other places.
|