The example shows how to develop a custom Spark Listener. You should read Spark Listeners first to understand the reasons for the example.
-
Access to Internet to download the Spark dependency -
spark-core
$ activator new custom-spark-listener minimal-scala
$ cd custom-spark-listener
Add the following line to build.sbt
(the main configuration file for the sbt project) that adds the dependency on Spark 1.5.1. Note the double %
that are to select the proper version of the dependency for Scala 2.11.7.
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"
Create the directory of your choice where the custom Spark listener is going to live, i.e. pl/japila/spark
.
$ mkdir -p src/main/scala/pl/japila/spark
Create the following file CustomSparkListener.scala
in src/main/scala/pl/japila/spark
directory. The aim of the class is to listen to events about jobs being started and tasks completed.
package pl.japila.spark
import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListener, SparkListenerJobStart}
class CustomSparkListener extends SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart) {
println(s"Job started with ${jobStart.stageInfos.size} stages: $jobStart")
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
println(s"Stage ${stageCompleted.stageInfo.stageId} completed with ${stageCompleted.stageInfo.numTasks} tasks.")
}
}
package
it, i.e. execute the command in activator shell.
[custom-spark-listener]> package
[info] Compiling 1 Scala source to /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/classes...
[info] Packaging /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed Nov 4, 2015 8:59:30 AM
You should have the result jar file with the custom scheduler listener ready (mine is /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar
)
Start Spark shell with appropriate configurations for the extra custom listener and the jar that includes the class.
$ ./bin/spark-shell --conf spark.logConf=true --conf spark.extraListeners=pl.japila.spark.CustomSparkListener --driver-class-path /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar
Create an RDD and execute an action to start a job as follows:
scala> sc.parallelize(0 to 10).count
15/11/04 12:27:29 INFO SparkContext: Starting job: count at <console>:25
...
Job started with 1 stages: SparkListenerJobStart(0,1446636449441,WrappedArray(org.apache.spark.scheduler.StageInfo@4b08f37b),{})
...
Stage 0 completed with 8 tasks.
The last line that starts with Job started:
is from the custom Spark listener you’ve just created. Congratulations! The exercise’s over.