(aka Spark context) is the entry point to Spark for a Spark application.
You could also assume that a SparkContext instance is a Spark application. |
It sets up internal services and establishes a connection to a Spark execution environment (deployment mode).
Once a SparkContext
instance is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs.
A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application (don’t get confused with the other meaning of Master in Spark, though).
offers the following functions:
Getting current configuration
Setting configuration
Creating objects
Accessing services, e.g. TaskScheduler, LiveListenerBus, BlockManager, SchedulerBackends, ShuffleManager.
Setting up custom Scheduler Backend, TaskScheduler and DAGScheduler
Read the scaladoc of org.apache.spark.SparkContext. |
persistRDD(rdd: RDD[_])
is a private[spark]
method to register rdd
in persistentRdds registry.
offers a developer API for dynamic allocation of executors:
(private!) requestTotalExecutors
(private!) getExecutorIds
is a private[spark]
method that is a part of ExecutorAllocationClient contract. It simply passes the call on to the current coarse-grained scheduler backend, i.e. calls getExecutorIds
It works for coarse-grained scheduler backends only. |
When called for other scheduler backends you should see the following WARN message in the logs:
WARN Requesting executors is only supported in coarse-grained mode
FIXME Why does SparkContext implement the method for coarse-grained scheduler backends? Why doesn’t SparkContext throw an exception when the method is called? Nobody seems to be using it (!) |
requestExecutors(numAdditionalExecutors: Int): Boolean
requests numAdditionalExecutors
executors from CoarseGrainedSchedulerBackend.
killExecutors(executorIds: Seq[String]): Boolean
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
is a private[spark]
method that requests the exact number of executors from a coarse-grained scheduler backend.
It works for coarse-grained scheduler backends only. |
When called for other scheduler backends you should see the following WARN message in the logs:
WARN Requesting executors is only supported in coarse-grained mode
You can create a SparkContext
instance with or without creating a SparkConf object first.
getOrCreate(): SparkContext
getOrCreate(conf: SparkConf): SparkContext
methods allow you to get the existing SparkContext
or create a new one.
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()
// Using an explicit SparkConf object
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setAppName("SparkMe App")
val sc = SparkContext.getOrCreate(conf)
The no-param getOrCreate
method requires that the two mandatory Spark settings - master and application name - are specified using spark-submit.
SparkContext(conf: SparkConf)
SparkContext(master: String, appName: String, conf: SparkConf)
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map())
You can create a SparkContext
instance using the four constructors.
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setAppName("SparkMe App")
import org.apache.spark.SparkContext
val sc = new SparkContext(conf)
When a Spark context starts up you should see the following INFO in the logs (amongst the other messages that come from the Spark services):
INFO SparkContext: Running Spark version 2.0.0-SNAPSHOT
Only one SparkContext may be running in a single JVM (check out SPARK-2243 Support multiple SparkContexts in the same JVM). Sharing access to a SparkContext in the JVM is the solution to share data within Spark (without relying on other means of data sharing using external data stores). |
getConf: SparkConf
returns the current SparkConf.
Changing the SparkConf object does not change the current configuration (as the method returns a copy).
master: String
method returns the current value of spark.master which is the deployment environment in use.
appName: String
returns the value of the mandatory setting.
It is used in SparkDeploySchedulerBackend (to create a ApplicationDescription when it starts), for SparkUI.createLiveUI (when spark.ui.enabled is enabled), when postApplicationStart is executed, and for Mesos and checkpointing in Spark Streaming.
deployMode: String
returns the current value of spark.submit.deployMode setting or client
if not set.
getSchedulingMode: SchedulingMode.SchedulingMode
returns the current Scheduling Mode.
getPoolForName(pool: String): Option[Schedulable]
returns a Schedulable by the pool
name, if one exists.
getPoolForName is part of the Developer’s API and may change in the future.
Internally, it requests the TaskScheduler for the root pool and looks up the Schedulable
by the pool
It is exclusively used to show pool details in web UI (for a stage).
getAllPools: Seq[Schedulable]
collects the Pools in TaskScheduler.rootPool.
TaskScheduler.rootPool is part of the TaskScheduler Contract.
getAllPools is part of the Developer’s API.
FIXME Where is the method used? |
getAllPools is used to calculate pool names for Stages tab in web UI with FAIR scheduling mode used.
Default level of parallelism is the number of partitions in RDDs when created without specifying them explicitly by a user.
It is used for the methods like SparkContext.parallelize
, SparkContext.range
and SparkContext.makeRDD
(as well as Spark Streaming's DStream.countByValue
and DStream.countByValueAndWindow
and few other places). It is also used to instantiate HashPartitioner or for the minimum number of partitions in HadoopRDDs.
Internally, defaultParallelism
relays requests for the default level of parallelism to TaskScheduler (it is a part of its contract).
setLocalProperty(key: String, value: String): Unit
sets a local thread-scoped key
property to value
sc.setLocalProperty("spark.scheduler.pool", "myPool")
The goal of the local property concept is to differentiate between or group jobs submitted from different threads by local properties.
It is used to group jobs into pools in FAIR job scheduler by spark.scheduler.pool per-thread property and in SQLExecution.withNewExecutionId Helper Methods |
If value
is null
the key
property is removed the key
from the local properties
sc.setLocalProperty("spark.scheduler.pool", null)
A common use case for the local property concept is to set a local property in a thread, say spark.scheduler.pool, after which all jobs submitted within the thread will be grouped, say into a pool by FAIR job scheduler.
val rdd = sc.parallelize(0 to 9)
sc.setLocalProperty("spark.scheduler.pool", "myPool")
// these two jobs (one per action) will run in the myPool pool
sc.setLocalProperty("spark.scheduler.pool", null)
// this job will run in the default pool
submits a job in an asynchronous, non-blocking way (using DAGScheduler.submitJob method).
It cleans the processPartition
input function argument and returns an instance of SimpleFutureAction that holds the JobWaiter instance (it has received from DAGScheduler.submitJob
FIXME What are resultFunc ?
It is used in:
You use a Spark context to create RDDs (see Creating RDD).
When an RDD is created, it belongs to and is completely owned by the Spark context it originated from. RDDs can’t by design be shared between SparkContexts.
allows you to create many different RDDs from input sources like:
Scala’s collections, i.e.
sc.parallelize(0 to 100)
local or remote filesystems, i.e.
Any Hadoop
It removes an RDD from the master’s Block Manager (calls removeRdd(rddId: Int, blocking: Boolean)
) and the internal persistentRdds mapping.
It finally posts SparkListenerUnpersistRDD message to listenerBus
setCheckpointDir(directory: String)
method is used to set up the checkpoint directory…FIXME
register(acc: AccumulatorV2[_, _]): Unit
register(acc: AccumulatorV2[_, _], name: String): Unit
registers the acc
accumulator. You can optionally give an accumulator a name
You can create built-in accumulators for longs, doubles, and collection types using specialized methods. |
longAccumulator: LongAccumulator
longAccumulator(name: String): LongAccumulator
doubleAccumulator: DoubleAccumulator
doubleAccumulator(name: String): DoubleAccumulator
collectionAccumulator[T]: CollectionAccumulator[T]
collectionAccumulator[T](name: String): CollectionAccumulator[T]
You can use longAccumulator
, doubleAccumulator
or collectionAccumulator
to create and register accumulators for simple and collection values.
returns LongAccumulator with the zero value 0
returns DoubleAccumulator with the zero value 0.0
returns CollectionAccumulator with the zero value java.util.List[T]
scala> val acc = sc.longAccumulator
acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: None, value: 0)
scala> val counter = sc.longAccumulator("counter")
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: Some(counter), value: 0)
scala> counter.value
res0: Long = 0
scala> sc.parallelize(0 to 9).foreach(n => counter.add(n))
scala> counter.value
res3: Long = 45
The name
input parameter allows you to give a name to an accumulator and have it displayed in Spark UI (under Stages tab for a given stage).
You can register custom accumulators using register methods. |
broadcast[T](value: T): Broadcast[T]
method creates a broadcast variable that is a shared memory with value
on all Spark executors.
scala> val hello = sc.broadcast("hello")
hello: org.apache.spark.broadcast.Broadcast[String] = Broadcast(0)
Spark transfers the value to Spark executors once, and tasks can share it without incurring repetitive network transmissions when requested multiple times.
When a broadcast value is created the following INFO message appears in the logs:
INFO SparkContext: Created broadcast [id] from broadcast at <console>:25
Spark does not support broadcasting RDDs.
The jar you specify with SparkContext.addJar
will be copied to all the worker nodes.
The configuration setting spark.jars
is a comma-separated list of jar paths to be included in all tasks executed from this SparkContext. A path can either be a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path
for a file on every worker node.
scala> sc.addJar("build.sbt")
15/11/11 21:54:54 INFO SparkContext: Added JAR build.sbt at with timestamp 1447275294457
FIXME Why is HttpFileServer used for addJar? |
SparkContext keeps track of:
shuffle ids using
internal field for registering shuffle dependencies to Shuffle Service.
runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit
runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U]
runJob[T, U](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U]
runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
runJob[T, U](rdd: RDD[T], func: Iterator[T] => U): Array[U]
runJob[T, U](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit)
RDD actions in Spark run jobs using one of runJob
methods. It executes a function on one or many partitions of a RDD to produce a collection of values per partition.
For some actions, e.g. first() and lookup() , there is no need to compute all the partitions of the RDD in a job. And Spark knows it.
import org.apache.spark.TaskContext
scala> sc.runJob(lines, (t: TaskContext, i: Iterator[String]) => 1) // (1)
res0: Array[Int] = Array(1, 1) // (2)
Run a job using
RDD with a function that returns 1 for every partition (oflines
RDD). -
What can you say about the number of partitions of the
RDD? Is your resultres0
different than mine? Why?
Read about TaskContext in TaskContext.
Running a job is essentially executing a func
function on all or a subset of partitions in an rdd
RDD and returning the result as an array (with elements being the results per partition).
When executed, runJob
prints out the following INFO message:
INFO Starting job: ...
And it follows up on spark.logLineage and then hands over the execution to DAGScheduler.runJob.
Before the method finishes, it does checkpointing and posts JobSubmitted
event (see Event loop).
Spark can only run jobs when a Spark context is available and active, i.e. started. See Stopping Spark context. Since SparkContext runs inside a Spark driver, i.e. a Spark application, it must be alive to run jobs. |
stop(): Unit
You can stop a SparkContext
using stop
method. Stopping a Spark context stops the Spark Runtime Environment and shuts down the entire Spark application (see Anatomy of Spark Application).
Calling stop
many times leads to the following INFO message in the logs:
INFO SparkContext: SparkContext already stopped.
An attempt to use a stopped SparkContext’s services will result in java.lang.IllegalStateException: SparkContext has been shutdown
scala> sc.stop
scala> sc.parallelize(0 to 5)
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
When a SparkContext is being stopped, it does the following:
Posts a application end event SparkListenerApplicationEnd to LiveListenerBus
Stops web UI
Requests MetricSystem to report metrics from all registered sinks (using
) -
Stops ContextCleaner
Stops DAGScheduler
Stops LiveListenerBus
Stops EventLoggingListener
Stops HeartbeatReceiver
Stops optional ConsoleProgressBar
It clears the reference to TaskScheduler (i.e.
) -
Stops SparkEnv and calls
FIXME SparkEnv.set(null) what is this doing?
It clears SPARK_YARN_MODE flag.
It calls
FIXME What is SparkContext.clearActiveContext() doing?
If all went fine till now you should see the following INFO message in the logs:
INFO SparkContext: Successfully stopped SparkContext
addSparkListener(listener: SparkListenerInterface): Unit
You can register a custom SparkListenerInterface using addSparkListener
You can also register custom listeners using spark.extraListeners setting. |
By default, SparkContext uses (private[spark]
class) org.apache.spark.scheduler.DAGScheduler
, but you can develop your own custom DAGScheduler implementation, and use (private[spark]
) SparkContext.dagScheduler_=(ds: DAGScheduler)
method to assign yours.
It is also applicable to SchedulerBackend
and TaskScheduler
using schedulerBackend_=(sb: SchedulerBackend)
and taskScheduler_=(ts: TaskScheduler)
methods, respectively.
FIXME Make it an advanced exercise. |
When a Spark context starts, it triggers SparkListenerEnvironmentUpdate and SparkListenerApplicationStart messages.
Refer to the section SparkContext’s initialization.
setLogLevel(logLevel: String)
allows you to set the root logging level in a Spark application, e.g. Spark shell.
Internally, setLogLevel
calls org.apache.log4j.Level.toLevel(logLevel)
and org.apache.log4j.Logger.getRootLogger().setLevel(l)
requires a Spark context to work. It is created as part of SparkContext’s initialization.
SparkStatusTracker is only used by ConsoleProgressBar.
shows the progress of active stages in console (to stderr
). It polls the status of stages from SparkStatusTracker periodically and prints out active stages with more than one task. It keeps overwriting itself to hold in one line for at most 3 first concurrent stages at a time.
[Stage 0:====> (316 + 4) / 1000][Stage 1:> (0 + 0) / 1000][Stage 2:> (0 + 0) / 1000]]]
The progress includes the stage’s id, the number of completed, active, and total tasks.
It is useful when you ssh
to workers and want to see the progress of active stages.
It is only instantiated if the value of the boolean property spark.ui.showConsoleProgress
(default: true
) is true
and the log level of org.apache.spark.SparkContext
logger is WARN
or higher (refer to Logging).
import org.apache.log4j._
To print the progress nicely ConsoleProgressBar uses COLUMNS
environment variable to know the width of the terminal. It assumes 80
The progress bar prints out the status after a stage has ran at least 500ms
, every 200ms
(the values are not configurable).
See the progress bar in Spark shell with the following:
$ ./bin/spark-shell --conf spark.ui.showConsoleProgress=true # (1)
scala> sc.setLogLevel("OFF") // (2)
scala> Logger.getLogger("org.apache.spark.SparkContext").setLevel(Level.WARN) // (3)
scala> sc.parallelize(1 to 4, 4).map { n => Thread.sleep(500 + 200 * n); n }.count // (4)
[Stage 2:> (0 + 4) / 4]
[Stage 2:==============> (1 + 3) / 4]
[Stage 2:=============================> (2 + 2) / 4]
[Stage 2:============================================> (3 + 1) / 4]
Make sure
. It is by default. -
Disable (
) the root logger (that includes Spark’s logger) -
Make sure
logger is at leastWARN
. -
Run a job with 4 tasks with 500ms initial sleep and 200ms sleep chunks to see the progress bar.
Watch the short video that show ConsoleProgressBar in action.
You may want to use the following example to see the progress bar in full glory - all 3 concurrent stages in console (borrowed from a comment to [SPARK-4017] show progress bar in console #3029):
> ./bin/spark-shell
scala> val a = sc.makeRDD(1 to 1000, 10000).map(x => (x, x)).reduceByKey(_ + _)
scala> val b = sc.makeRDD(1 to 1000, 10000).map(x => (x, x)).reduceByKey(_ + _)
scala> a.union(b).count()
Every time an action is called, Spark cleans up the closure, i.e. the body of the action, before it is serialized and sent over the wire to executors.
SparkContext comes with clean(f: F, checkSerializable: Boolean = true)
method that does this. It in turn calls ClosureCleaner.clean
Not only does ClosureCleaner.clean
method clean the closure, but also does it transitively, i.e. referenced closures are cleaned transitively.
A closure is considered serializable as long as it does not explicitly reference unserializable objects. It does so by traversing the hierarchy of enclosing closures and null out any references that are not actually used by the starting closure.
Enable Add the following line to
logging level you should see the following messages in the logs:
+++ Cleaning closure [func] ([func.getClass.getName]) +++
+ declared fields: [declaredFields.size]
+++ closure [func] ([func.getClass.getName]) is now cleaned +++
Serialization is verified using a new instance of Serializer
(as closure Serializer). Refer to Serialization.
FIXME an example, please. |
While a SparkContext
is being created, so is a Hadoop configuration (as an instance of org.apache.hadoop.conf.Configuration that is available as _hadoopConfiguration
SparkHadoopUtil.get.newConfiguration is used. |
If a SparkConf is provided it is used to build the configuration as described. Otherwise, the default Configuration
object is returned.
are both available, the following settings are set for the Hadoop configuration:
are set to the value ofAWS_ACCESS_KEY_ID
, andfs.s3a.secret.key
are set to the value ofAWS_SECRET_ACCESS_KEY
Every spark.hadoop.
setting becomes a setting of the configuration with the prefix spark.hadoop.
removed for the key.
The value of spark.buffer.size
(default: 65536
) is used as the value of io.file.buffer.size
is a LiveListenerBus object that acts as a mechanism to announce events to other services on the driver.
It is created while SparkContext is being created and, since it is a single-JVM event bus, it is exclusively used on the driver. |
listenerBus is a private[spark] value in SparkContext .
startTime: Long
is the time in milliseconds when SparkContext was created.
scala> sc.startTime
res0: Long = 1464425605653
sparkUser: String
is the user who started the SparkContext
It is computed when SparkContext is created using Utils.getCurrentUserName. |
Quoting the scaladoc of org.apache.spark.SparkContext:
Only one SparkContext may be active per JVM. You must
the active SparkContext before creating a new one.
You can however control the behaviour using spark.driver.allowMultipleContexts
It is disabled, i.e. false
, by default.
If enabled (i.e. true
), Spark prints the following WARN message to the logs:
WARN Multiple running SparkContexts detected in the same JVM!
If disabled (default), it will throw an SparkException
Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
When creating an instance of SparkContext
, Spark marks the current thread as having it being created (very early in the instantiation process).
It’s not guaranteed that Spark will work properly with two or more SparkContexts. Consider the feature a work in progress. |
sets the amount of memory to allocate to each executor. See Executor Memory.
is the user who is running SparkContext
. It is available later as sparkUser.