Skip to content

Latest commit

 

History

History
93 lines (63 loc) · 3.53 KB

spark-exercise-custom-scheduler-listener.adoc

File metadata and controls

93 lines (63 loc) · 3.53 KB

Exercise: Developing Custom SparkListener to monitor DAGScheduler in Scala

Introduction

The example shows how to develop a custom Spark Listener. You should read Spark Listeners first to understand the reasons for the example.

Requirements

  1. Typesafe Activator

  2. Access to Internet to download the Spark dependency - spark-core

Setting up Scala project using Typesafe Activator

$ activator new custom-spark-listener minimal-scala
$ cd custom-spark-listener

Add the following line to build.sbt (the main configuration file for the sbt project) that adds the dependency on Spark 1.5.1. Note the double % that are to select the proper version of the dependency for Scala 2.11.7.

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"

Custom Listener - pl.japila.spark.CustomSparkListener

Create the directory of your choice where the custom Spark listener is going to live, i.e. pl/japila/spark.

$ mkdir -p src/main/scala/pl/japila/spark

Create the following file CustomSparkListener.scala in src/main/scala/pl/japila/spark directory. The aim of the class is to listen to events about jobs being started and tasks completed.

package pl.japila.spark

import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListener, SparkListenerJobStart}

class CustomSparkListener extends SparkListener {
  override def onJobStart(jobStart: SparkListenerJobStart) {
    println(s"Job started with ${jobStart.stageInfos.size} stages: $jobStart")
  }

  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    println(s"Stage ${stageCompleted.stageInfo.stageId} completed with ${stageCompleted.stageInfo.numTasks} tasks.")
  }
}

Creating deployable package

package it, i.e. execute the command in activator shell.

[custom-spark-listener]> package
[info] Compiling 1 Scala source to /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/classes...
[info] Packaging /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 0 s, completed Nov 4, 2015 8:59:30 AM

You should have the result jar file with the custom scheduler listener ready (mine is /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar)

Activating Custom Listener in Spark shell

Start Spark shell with appropriate configurations for the extra custom listener and the jar that includes the class.

$ ./bin/spark-shell --conf spark.logConf=true --conf spark.extraListeners=pl.japila.spark.CustomSparkListener --driver-class-path /Users/jacek/dev/sandbox/custom-spark-listener/target/scala-2.11/custom-spark-listener_2.11-1.0.jar

Create an RDD and execute an action to start a job as follows:

scala> sc.parallelize(0 to 10).count
15/11/04 12:27:29 INFO SparkContext: Starting job: count at <console>:25
...
Job started with 1 stages: SparkListenerJobStart(0,1446636449441,WrappedArray(org.apache.spark.scheduler.StageInfo@4b08f37b),{})
...
Stage 0 completed with 8 tasks.

The last line that starts with Job started: is from the custom Spark listener you’ve just created. Congratulations! The exercise’s over.

BONUS Activating Custom Listener in Spark Application

Tip
Use sc.addSparkListener(myListener)

Questions

  1. What are the pros and cons of using the command line version vs inside a Spark application?