Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a few more stage level metrics #11821

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ object GpuSemaphore {
}

private val MAX_PERMITS = 1000
val DEFAULT_PRIORITY = 0L

def computeNumPermits(conf: SQLConf): Int = {
val concurrentStr = conf.getConfString(RapidsConf.CONCURRENT_GPU_TASKS.key, null)
Expand Down Expand Up @@ -184,7 +185,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long)
* If this task holds the GPU semaphore or not.
*/
private var hasSemaphore = false
private var lastHeld: Long = 0
private var lastAcquired: Long = GpuSemaphore.DEFAULT_PRIORITY
private var lastReleased: Long = GpuSemaphore.DEFAULT_PRIORITY

type GpuBackingSemaphore = PrioritySemaphore[Long]

Expand Down Expand Up @@ -256,11 +258,12 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long)
if (!done && shouldBlockOnSemaphore) {
// We cannot be in a synchronized block and wait on the semaphore
// so we have to release it and grab it again afterwards.
semaphore.acquire(numPermits, lastHeld, taskAttemptId)
semaphore.acquire(numPermits, lastReleased, taskAttemptId)
synchronized {
// We now own the semaphore so we need to wake up all of the other tasks that are
// waiting.
hasSemaphore = true
lastAcquired = System.nanoTime()
if (trackSemaphore) {
nvtxRange =
Some(new NvtxUniqueRange(s"Stage ${stageId} Task ${taskAttemptId} owning GPU",
Expand Down Expand Up @@ -296,9 +299,10 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long)
} else {
if (blockedThreads.size() == 0) {
// No other threads for this task are waiting, so we might be able to grab this directly
val ret = semaphore.tryAcquire(numPermits, lastHeld, taskAttemptId)
val ret = semaphore.tryAcquire(numPermits, lastReleased, taskAttemptId)
if (ret) {
hasSemaphore = true
lastAcquired = System.nanoTime()
activeThreads.add(t)
// no need to notify because there are no other threads and we are holding the lock
// to ensure that.
Expand All @@ -316,7 +320,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long)
if (hasSemaphore) {
semaphore.release(numPermits)
hasSemaphore = false
lastHeld = System.currentTimeMillis()
lastReleased = System.nanoTime()
GpuTaskMetrics.get.addSemaphoreHoldingTime(lastReleased - lastAcquired)
nvtxRange.foreach(_.close())
nvtxRange = None
}
Expand All @@ -333,7 +338,7 @@ private final class GpuSemaphore() extends Logging {
import GpuSemaphore._

type GpuBackingSemaphore = PrioritySemaphore[Long]
private val semaphore = new GpuBackingSemaphore(MAX_PERMITS)
private val semaphore = new GpuBackingSemaphore(MAX_PERMITS, GpuSemaphore.DEFAULT_PRIORITY)
// A map of taskAttemptId => semaphoreTaskInfo.
// This map keeps track of all tasks that are both active on the GPU and blocked waiting
// on the GPU.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package com.nvidia.spark.rapids
import java.util.PriorityQueue
import java.util.concurrent.locks.{Condition, ReentrantLock}

class PrioritySemaphore[T](val maxPermits: Int)(implicit ordering: Ordering[T]) {
import scala.collection.JavaConverters.asScalaIteratorConverter

import org.apache.spark.sql.rapids.GpuTaskMetrics

class PrioritySemaphore[T](val maxPermits: Int, val priorityForNonStarted: T)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there any case where we want to use a different value of priorityForNonStarted than GpuSemaphore.DEFAULT_PRIORITY? If not, we can probably just use GpuSemaphore.DEFAULT_PRIORITY directly in this class instead of passing it to the constructor. The reason I prefer using the static variable directly is that this constructor parameter seems to tell me that it can be other values than GpuSemaphore.DEFAULT_PRIORITY in some cases, which I'm not sure if it's true. This is more for code readability and easy to change later if needed, so I'm OK with the current code as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the major reason for val priorityForNonStarted: T is that PrioritySemaphore has a template class T (I have no idea why template class is needed here.)

(implicit ordering: Ordering[T]) {
// This lock is used to generate condition variables, which affords us the flexibility to notify
// specific threads at a time. If we use the regular synchronized pattern, we have to either
// notify randomly, or if we try creating condition variables not tied to a shared lock, they
Expand Down Expand Up @@ -69,6 +74,11 @@ class PrioritySemaphore[T](val maxPermits: Int)(implicit ordering: Ordering[T])
val info = ThreadInfo(priority, condition, numPermits, taskAttemptId)
try {
waitingQueue.add(info)
// only count tasks that had held semaphore before,
// so they're very likely to have remaining data on GPU
GpuTaskMetrics.get.recordOnGpuTasksWaitingNumber(
waitingQueue.iterator().asScala.count(_.priority != priorityForNonStarted))
jihoonson marked this conversation as resolved.
Show resolved Hide resolved

while (!info.signaled) {
info.condition.await()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,95 @@ class HighWatermarkAccumulator extends AccumulatorV2[jl.Long, Long] {
override def value: Long = _value
}

class MaxLongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {
private var _v = 0L

override def isZero: Boolean = _v == 0

override def copy(): MaxLongAccumulator = {
val newAcc = new MaxLongAccumulator
newAcc._v = this._v
newAcc
}

override def reset(): Unit = {
_v = 0L
}

override def add(v: jl.Long): Unit = {
if(v > _v) {
_v = v
}
}

def add(v: Long): Unit = {
if(v > _v) {
_v = v
}
}

override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match {
case o: MaxLongAccumulator =>
add(o.value)
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: jl.Long = _v
}

class AvgLongAccumulator extends AccumulatorV2[jl.Long, jl.Double] {
private var _sum = 0L
private var _count = 0L

override def isZero: Boolean = _count == 0L

override def copy(): AvgLongAccumulator = {
val newAcc = new AvgLongAccumulator
newAcc._sum = this._sum
newAcc._count = this._count
newAcc
}

override def reset(): Unit = {
_sum = 0L
_count = 0L
}

override def add(v: jl.Long): Unit = {
_sum += v
_count += 1
}

override def merge(other: AccumulatorV2[jl.Long, jl.Double]): Unit = other match {
case o: AvgLongAccumulator =>
_sum += o._sum
_count += o._count
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: jl.Double = if (_count != 0) {
1.0 * _sum / _count
} else 0;
}

class GpuTaskMetrics extends Serializable {
private val semaphoreHoldingTime = new NanoSecondAccumulator
private val semWaitTimeNs = new NanoSecondAccumulator
private val retryCount = new LongAccumulator
private val splitAndRetryCount = new LongAccumulator
private val retryBlockTime = new NanoSecondAccumulator
private val retryComputationTime = new NanoSecondAccumulator
// onGpuTask means a task that has data in GPU memory.
// Since it's not easy to decided if a task has data in GPU memory,
// We only count the tasks that had held semaphore before,
// so it's very likely to have data in GPU memory
private val onGpuTasksInWaitingQueueAvgCount = new AvgLongAccumulator
private val onGpuTasksInWaitingQueueMaxCount = new MaxLongAccumulator


// Spill
private val spillToHostTimeNs = new NanoSecondAccumulator
Expand Down Expand Up @@ -156,6 +239,7 @@ class GpuTaskMetrics extends Serializable {
}

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuTime" -> semaphoreHoldingTime,
"gpuSemaphoreWait" -> semWaitTimeNs,
"gpuRetryCount" -> retryCount,
"gpuSplitAndRetryCount" -> splitAndRetryCount,
Expand All @@ -167,7 +251,9 @@ class GpuTaskMetrics extends Serializable {
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes,
"gpuMaxHostMemoryBytes" -> maxHostMemoryBytes,
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes,
"gpuOnGpuTasksWaitingGPUAvgCount" -> onGpuTasksInWaitingQueueAvgCount,
"gpuOnGpuTasksWaitingGPUMaxCount" -> onGpuTasksInWaitingQueueMaxCount
)

def register(sc: SparkContext): Unit = {
Expand Down Expand Up @@ -204,6 +290,8 @@ class GpuTaskMetrics extends Serializable {
}
}

def addSemaphoreHoldingTime(duration: Long): Unit = semaphoreHoldingTime.add(duration)

def getSemWaitTime(): Long = semWaitTimeNs.value.value

def semWaitTime[A](f: => A): A = timeIt(semWaitTimeNs, "Acquire GPU", NvtxColor.RED, f)
Expand Down Expand Up @@ -263,6 +351,11 @@ class GpuTaskMetrics extends Serializable {
maxDiskMemoryBytes.add(maxDiskBytesAllocated)
}
}

def recordOnGpuTasksWaitingNumber(num: Int): Unit = {
onGpuTasksInWaitingQueueAvgCount.add(num)
onGpuTasksInWaitingQueueMaxCount.add(num)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite {
type TestPrioritySemaphore = PrioritySemaphore[Long]

test("tryAcquire should return true if permits are available") {
val semaphore = new TestPrioritySemaphore(10)
val semaphore = new TestPrioritySemaphore(10, GpuSemaphore.DEFAULT_PRIORITY )

assert(semaphore.tryAcquire(5, 0, 0))
assert(semaphore.tryAcquire(3, 0, 0))
Expand All @@ -33,7 +33,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite {
}

test("acquire and release should work correctly") {
val semaphore = new TestPrioritySemaphore(1)
val semaphore = new TestPrioritySemaphore(1, GpuSemaphore.DEFAULT_PRIORITY)

assert(semaphore.tryAcquire(1, 0, 0))

Expand All @@ -57,7 +57,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite {
}

test("multiple threads should handle permits and priority correctly") {
val semaphore = new TestPrioritySemaphore(0)
val semaphore = new TestPrioritySemaphore(0, GpuSemaphore.DEFAULT_PRIORITY)
val results = new java.util.ArrayList[Int]()

def taskWithPriority(priority: Int) = new Runnable {
Expand All @@ -83,7 +83,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite {
}

test("low priority thread cannot surpass high priority thread") {
val semaphore = new TestPrioritySemaphore(10)
val semaphore = new TestPrioritySemaphore(10, GpuSemaphore.DEFAULT_PRIORITY)
semaphore.acquire(5, 0, 0)
val t = new Thread(() => {
semaphore.acquire(10, 2, 0)
Expand All @@ -103,7 +103,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite {

// this case is described at https://github.com/NVIDIA/spark-rapids/pull/11574/files#r1795652488
test("thread with larger task id should not surpass smaller task id in the waiting queue") {
val semaphore = new TestPrioritySemaphore(10)
val semaphore = new TestPrioritySemaphore(10, GpuSemaphore.DEFAULT_PRIORITY)
semaphore.acquire(8, 0, 0)
val t = new Thread(() => {
semaphore.acquire(5, 0, 0)
Expand Down
Loading