Skip to content

Latest commit

 

History

History
177 lines (129 loc) · 5.71 KB

spark-taskscheduler-taskcontext.adoc

File metadata and controls

177 lines (129 loc) · 5.71 KB

TaskContext

TaskContext allows a task to access contextual information about itself as well as register task listeners.

Using TaskContext you can access local properties that were set by the driver. You can also access task metrics.

You can access the active TaskContext instance using TaskContext.get method.

TaskContext belongs to org.apache.spark package.

import org.apache.spark.TaskContext
Note
TaskContext is serializable.

Contextual Information

  • stageId is the id of the stage the task belongs to.

  • partitionId is the id of the partition computed by the task.

  • attemptNumber is to denote how many times the task has been attempted (starting from 0).

  • taskAttemptId is the id of the attempt of the task.

  • isCompleted returns true when a task is completed.

  • isInterrupted returns true when a task was killed.

All these attributes are accessible using appropriate getters, e.g. getPartitionId for the partition id.

Registering Task Listeners

Using TaskContext object you can register task listeners for task completion regardless of the final state and task failures only.

addTaskCompletionListener

addTaskCompletionListener registers a TaskCompletionListener listener that will be executed on task completion.

Note
It will be executed regardless of the final state of a task - success, failure, or cancellation.
val rdd = sc.range(0, 5, numSlices = 1)

import org.apache.spark.TaskContext
val printTaskInfo = (tc: TaskContext) => {
  val msg = s"""|-------------------
                |partitionId:   ${tc.partitionId}
                |stageId:       ${tc.stageId}
                |attemptNum:    ${tc.attemptNumber}
                |taskAttemptId: ${tc.taskAttemptId}
                |-------------------""".stripMargin
  println(msg)
}

rdd.foreachPartition { _ =>
  val tc = TaskContext.get
  tc.addTaskCompletionListener(printTaskInfo)
}

addTaskFailureListener

addTaskFailureListener registers a TaskFailureListener listener that will only be executed on task failure. It can be executed multiple times since a task can be re-attempted when it fails.

val rdd = sc.range(0, 2, numSlices = 2)

import org.apache.spark.TaskContext
val printTaskErrorInfo = (tc: TaskContext, error: Throwable) => {
  val msg = s"""|-------------------
                |partitionId:   ${tc.partitionId}
                |stageId:       ${tc.stageId}
                |attemptNum:    ${tc.attemptNumber}
                |taskAttemptId: ${tc.taskAttemptId}
                |error:         ${error.toString}
                |-------------------""".stripMargin
  println(msg)
}

val throwExceptionForOddNumber = (n: Long) => {
  if (n % 2 == 1) {
    throw new Exception(s"No way it will pass for odd number: $n")
  }
}

// FIXME It won't work.
rdd.map(throwExceptionForOddNumber).foreachPartition { _ =>
  val tc = TaskContext.get
  tc.addTaskFailureListener(printTaskErrorInfo)
}

// Listener registration matters.
rdd.mapPartitions { (it: Iterator[Long]) =>
  val tc = TaskContext.get
  tc.addTaskFailureListener(printTaskErrorInfo)
  it
}.map(throwExceptionForOddNumber).count

Accessing Local Properties (getLocalProperty method)

getLocalProperty(key: String): String

You can use getLocalProperty method to access local properties that were set by the driver using SparkContext.setLocalProperty.

Task Metrics

taskMetrics(): TaskMetrics

taskMetrics method is part of the Developer API that allows to access the instance of TaskMetrics for a task.

getMetricsSources(sourceName: String): Seq[Source]

getMetricsSources allows to access all metrics sources for sourceName name which are associated with the instance that runs the task.

Accessing Active TaskContext (TaskContext.get method)

get(): TaskContext

TaskContext.get method returns TaskContext instance for the active task (as a TaskContextImpl object). There can only be one instance and tasks can use the object to access contextual information about themselves.

val rdd = sc.range(0, 3, numSlices = 3)

scala> rdd.partitions.size
res0: Int = 3

rdd.foreach { n =>
  import org.apache.spark.TaskContext
  val tc = TaskContext.get
  val msg = s"""|-------------------
                |partitionId:   ${tc.partitionId}
                |stageId:       ${tc.stageId}
                |attemptNum:    ${tc.attemptNumber}
                |taskAttemptId: ${tc.taskAttemptId}
                |-------------------""".stripMargin
  println(msg)
}
Note
TaskContext object uses ThreadLocal to keep it thread-local, i.e. to associate state with the thread of a task.

TaskContextImpl

TaskContextImpl is the only implementation of TaskContext abstract class.

Caution
FIXME
  • stage

  • partition

  • task attempt

  • attempt number

  • runningLocally = false

  • taskMemoryManager

Caution
FIXME Where and how is TaskMemoryManager used?

markInterrupted

Caution
FIXME

Creating TaskContextImpl Instance

Caution
FIXME