From 57e8bfdfbbc6549a0a48cee8c2e98df83edbd15f Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Wed, 4 Dec 2024 16:35:06 +0800 Subject: [PATCH 1/3] add a few more stage level metrics Signed-off-by: Hongbin Ma (Mahone) --- .../nvidia/spark/rapids/GpuSemaphore.scala | 14 ++- .../spark/rapids/PrioritySemaphore.scala | 12 ++- .../spark/sql/rapids/GpuTaskMetrics.scala | 95 ++++++++++++++++++- .../spark/rapids/PrioritySemaphoreSuite.scala | 10 +- 4 files changed, 119 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 719c4525373..caf783de3af 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -184,7 +184,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) * If this task holds the GPU semaphore or not. */ private var hasSemaphore = false - private var lastHeld: Long = 0 + private var lastAcquired: Long = 0 + private var lastReleased: Long = 0 type GpuBackingSemaphore = PrioritySemaphore[Long] @@ -256,11 +257,12 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) if (!done && shouldBlockOnSemaphore) { // We cannot be in a synchronized block and wait on the semaphore // so we have to release it and grab it again afterwards. - semaphore.acquire(numPermits, lastHeld, taskAttemptId) + semaphore.acquire(numPermits, lastReleased, taskAttemptId) synchronized { // We now own the semaphore so we need to wake up all of the other tasks that are // waiting. hasSemaphore = true + lastAcquired = System.nanoTime() if (trackSemaphore) { nvtxRange = Some(new NvtxUniqueRange(s"Stage ${stageId} Task ${taskAttemptId} owning GPU", @@ -296,9 +298,10 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) } else { if (blockedThreads.size() == 0) { // No other threads for this task are waiting, so we might be able to grab this directly - val ret = semaphore.tryAcquire(numPermits, lastHeld, taskAttemptId) + val ret = semaphore.tryAcquire(numPermits, lastReleased, taskAttemptId) if (ret) { hasSemaphore = true + lastAcquired = System.nanoTime() activeThreads.add(t) // no need to notify because there are no other threads and we are holding the lock // to ensure that. @@ -316,7 +319,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) if (hasSemaphore) { semaphore.release(numPermits) hasSemaphore = false - lastHeld = System.currentTimeMillis() + lastReleased = System.nanoTime() + GpuTaskMetrics.get.addGpuTime(lastReleased - lastAcquired) nvtxRange.foreach(_.close()) nvtxRange = None } @@ -333,7 +337,7 @@ private final class GpuSemaphore() extends Logging { import GpuSemaphore._ type GpuBackingSemaphore = PrioritySemaphore[Long] - private val semaphore = new GpuBackingSemaphore(MAX_PERMITS) + private val semaphore = new GpuBackingSemaphore(MAX_PERMITS, 0L) // A map of taskAttemptId => semaphoreTaskInfo. // This map keeps track of all tasks that are both active on the GPU and blocked waiting // on the GPU. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala index dc90382d3a0..594059d3c87 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PrioritySemaphore.scala @@ -19,7 +19,12 @@ package com.nvidia.spark.rapids import java.util.PriorityQueue import java.util.concurrent.locks.{Condition, ReentrantLock} -class PrioritySemaphore[T](val maxPermits: Int)(implicit ordering: Ordering[T]) { +import scala.collection.JavaConverters.asScalaIteratorConverter + +import org.apache.spark.sql.rapids.GpuTaskMetrics + +class PrioritySemaphore[T](val maxPermits: Int, val priorityForNonStarted: T) + (implicit ordering: Ordering[T]) { // This lock is used to generate condition variables, which affords us the flexibility to notify // specific threads at a time. If we use the regular synchronized pattern, we have to either // notify randomly, or if we try creating condition variables not tied to a shared lock, they @@ -69,6 +74,11 @@ class PrioritySemaphore[T](val maxPermits: Int)(implicit ordering: Ordering[T]) val info = ThreadInfo(priority, condition, numPermits, taskAttemptId) try { waitingQueue.add(info) + // only count tasks that had held semaphore before, + // so they're very likely to have remaining data on GPU + GpuTaskMetrics.get.recordOnGpuTasksWaitingNumber( + waitingQueue.iterator().asScala.count(_.priority != priorityForNonStarted)) + while (!info.signaled) { info.condition.await() } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index 84ca5e2ac51..1e261487c24 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -107,12 +107,95 @@ class HighWatermarkAccumulator extends AccumulatorV2[jl.Long, Long] { override def value: Long = _value } +class MaxLongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { + private var _v = 0L + + override def isZero: Boolean = _v == 0 + + override def copy(): MaxLongAccumulator = { + val newAcc = new MaxLongAccumulator + newAcc._v = this._v + newAcc + } + + override def reset(): Unit = { + _v = 0L + } + + override def add(v: jl.Long): Unit = { + if(v > _v) { + _v = v + } + } + + def add(v: Long): Unit = { + if(v > _v) { + _v = v + } + } + + override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match { + case o: MaxLongAccumulator => + add(o.value) + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def value: jl.Long = _v +} + +class AvgLongAccumulator extends AccumulatorV2[jl.Long, jl.Double] { + private var _sum = 0L + private var _count = 0L + + override def isZero: Boolean = _count == 0L + + override def copy(): AvgLongAccumulator = { + val newAcc = new AvgLongAccumulator + newAcc._sum = this._sum + newAcc._count = this._count + newAcc + } + + override def reset(): Unit = { + _sum = 0L + _count = 0L + } + + override def add(v: jl.Long): Unit = { + _sum += v + _count += 1 + } + + override def merge(other: AccumulatorV2[jl.Long, jl.Double]): Unit = other match { + case o: AvgLongAccumulator => + _sum += o._sum + _count += o._count + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def value: jl.Double = if (_count != 0) { + 1.0 * _sum / _count + } else 0; +} + class GpuTaskMetrics extends Serializable { + private val gpuTime = new NanoSecondAccumulator private val semWaitTimeNs = new NanoSecondAccumulator private val retryCount = new LongAccumulator private val splitAndRetryCount = new LongAccumulator private val retryBlockTime = new NanoSecondAccumulator private val retryComputationTime = new NanoSecondAccumulator + // onGpuTask means a task that has data in GPU memory. + // Since it's not easy to decided if a task has data in GPU memory, + // We only count the tasks that had held semaphore before, + // so it's very likely to have data in GPU memory + private val onGpuTasksInWaitingQueueAvgCount = new AvgLongAccumulator + private val onGpuTasksInWaitingQueueMaxCount = new MaxLongAccumulator + // Spill private val spillToHostTimeNs = new NanoSecondAccumulator @@ -156,6 +239,7 @@ class GpuTaskMetrics extends Serializable { } private val metrics = Map[String, AccumulatorV2[_, _]]( + "gpuTime" -> gpuTime, "gpuSemaphoreWait" -> semWaitTimeNs, "gpuRetryCount" -> retryCount, "gpuSplitAndRetryCount" -> splitAndRetryCount, @@ -167,7 +251,9 @@ class GpuTaskMetrics extends Serializable { "gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs, "gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes, "gpuMaxHostMemoryBytes" -> maxHostMemoryBytes, - "gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes + "gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes, + "gpuOnGpuTasksWaitingGPUAvgCount" -> onGpuTasksInWaitingQueueAvgCount, + "gpuOnGpuTasksWaitingGPUMaxCount" -> onGpuTasksInWaitingQueueMaxCount ) def register(sc: SparkContext): Unit = { @@ -204,6 +290,8 @@ class GpuTaskMetrics extends Serializable { } } + def addGpuTime(duration: Long): Unit = gpuTime.add(duration) + def getSemWaitTime(): Long = semWaitTimeNs.value.value def semWaitTime[A](f: => A): A = timeIt(semWaitTimeNs, "Acquire GPU", NvtxColor.RED, f) @@ -263,6 +351,11 @@ class GpuTaskMetrics extends Serializable { maxDiskMemoryBytes.add(maxDiskBytesAllocated) } } + + def recordOnGpuTasksWaitingNumber(num: Int): Unit = { + onGpuTasksInWaitingQueueAvgCount.add(num) + onGpuTasksInWaitingQueueMaxCount.add(num) + } } /** diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala index 7199aa55df6..8fb4ab2ec28 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala @@ -24,7 +24,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { type TestPrioritySemaphore = PrioritySemaphore[Long] test("tryAcquire should return true if permits are available") { - val semaphore = new TestPrioritySemaphore(10) + val semaphore = new TestPrioritySemaphore(10, 0L) assert(semaphore.tryAcquire(5, 0, 0)) assert(semaphore.tryAcquire(3, 0, 0)) @@ -33,7 +33,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { } test("acquire and release should work correctly") { - val semaphore = new TestPrioritySemaphore(1) + val semaphore = new TestPrioritySemaphore(1, 0L) assert(semaphore.tryAcquire(1, 0, 0)) @@ -57,7 +57,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { } test("multiple threads should handle permits and priority correctly") { - val semaphore = new TestPrioritySemaphore(0) + val semaphore = new TestPrioritySemaphore(0, 0L) val results = new java.util.ArrayList[Int]() def taskWithPriority(priority: Int) = new Runnable { @@ -83,7 +83,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { } test("low priority thread cannot surpass high priority thread") { - val semaphore = new TestPrioritySemaphore(10) + val semaphore = new TestPrioritySemaphore(10, 0L) semaphore.acquire(5, 0, 0) val t = new Thread(() => { semaphore.acquire(10, 2, 0) @@ -103,7 +103,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { // this case is described at https://github.com/NVIDIA/spark-rapids/pull/11574/files#r1795652488 test("thread with larger task id should not surpass smaller task id in the waiting queue") { - val semaphore = new TestPrioritySemaphore(10) + val semaphore = new TestPrioritySemaphore(10, 0L) semaphore.acquire(8, 0, 0) val t = new Thread(() => { semaphore.acquire(5, 0, 0) From 783cd27ea90c81aec34db9944346dfad428bebc2 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Thu, 5 Dec 2024 08:46:02 +0800 Subject: [PATCH 2/3] address comments Signed-off-by: Hongbin Ma (Mahone) --- .../scala/com/nvidia/spark/rapids/GpuSemaphore.scala | 7 ++++--- .../nvidia/spark/rapids/PrioritySemaphoreSuite.scala | 10 +++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index caf783de3af..3b841c6f50f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -133,6 +133,7 @@ object GpuSemaphore { } private val MAX_PERMITS = 1000 + val DEFAULT_PRIORITY = 0L def computeNumPermits(conf: SQLConf): Int = { val concurrentStr = conf.getConfString(RapidsConf.CONCURRENT_GPU_TASKS.key, null) @@ -184,8 +185,8 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) * If this task holds the GPU semaphore or not. */ private var hasSemaphore = false - private var lastAcquired: Long = 0 - private var lastReleased: Long = 0 + private var lastAcquired: Long = GpuSemaphore.DEFAULT_PRIORITY + private var lastReleased: Long = GpuSemaphore.DEFAULT_PRIORITY type GpuBackingSemaphore = PrioritySemaphore[Long] @@ -337,7 +338,7 @@ private final class GpuSemaphore() extends Logging { import GpuSemaphore._ type GpuBackingSemaphore = PrioritySemaphore[Long] - private val semaphore = new GpuBackingSemaphore(MAX_PERMITS, 0L) + private val semaphore = new GpuBackingSemaphore(MAX_PERMITS, GpuSemaphore.DEFAULT_PRIORITY) // A map of taskAttemptId => semaphoreTaskInfo. // This map keeps track of all tasks that are both active on the GPU and blocked waiting // on the GPU. diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala index 8fb4ab2ec28..a2356a68c1e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/PrioritySemaphoreSuite.scala @@ -24,7 +24,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { type TestPrioritySemaphore = PrioritySemaphore[Long] test("tryAcquire should return true if permits are available") { - val semaphore = new TestPrioritySemaphore(10, 0L) + val semaphore = new TestPrioritySemaphore(10, GpuSemaphore.DEFAULT_PRIORITY ) assert(semaphore.tryAcquire(5, 0, 0)) assert(semaphore.tryAcquire(3, 0, 0)) @@ -33,7 +33,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { } test("acquire and release should work correctly") { - val semaphore = new TestPrioritySemaphore(1, 0L) + val semaphore = new TestPrioritySemaphore(1, GpuSemaphore.DEFAULT_PRIORITY) assert(semaphore.tryAcquire(1, 0, 0)) @@ -57,7 +57,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { } test("multiple threads should handle permits and priority correctly") { - val semaphore = new TestPrioritySemaphore(0, 0L) + val semaphore = new TestPrioritySemaphore(0, GpuSemaphore.DEFAULT_PRIORITY) val results = new java.util.ArrayList[Int]() def taskWithPriority(priority: Int) = new Runnable { @@ -83,7 +83,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { } test("low priority thread cannot surpass high priority thread") { - val semaphore = new TestPrioritySemaphore(10, 0L) + val semaphore = new TestPrioritySemaphore(10, GpuSemaphore.DEFAULT_PRIORITY) semaphore.acquire(5, 0, 0) val t = new Thread(() => { semaphore.acquire(10, 2, 0) @@ -103,7 +103,7 @@ class PrioritySemaphoreSuite extends AnyFunSuite { // this case is described at https://github.com/NVIDIA/spark-rapids/pull/11574/files#r1795652488 test("thread with larger task id should not surpass smaller task id in the waiting queue") { - val semaphore = new TestPrioritySemaphore(10, 0L) + val semaphore = new TestPrioritySemaphore(10, GpuSemaphore.DEFAULT_PRIORITY) semaphore.acquire(8, 0, 0) val t = new Thread(() => { semaphore.acquire(5, 0, 0) From af093a6b31df265d9c499e90022a9e5dd75c061e Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Tue, 10 Dec 2024 14:48:39 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: Hongbin Ma (Mahone) --- .../main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala | 2 +- .../scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 3b841c6f50f..68912193920 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -321,7 +321,7 @@ private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) semaphore.release(numPermits) hasSemaphore = false lastReleased = System.nanoTime() - GpuTaskMetrics.get.addGpuTime(lastReleased - lastAcquired) + GpuTaskMetrics.get.addSemaphoreHoldingTime(lastReleased - lastAcquired) nvtxRange.foreach(_.close()) nvtxRange = None } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index 1e261487c24..e21a9b71cb3 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -183,7 +183,7 @@ class AvgLongAccumulator extends AccumulatorV2[jl.Long, jl.Double] { } class GpuTaskMetrics extends Serializable { - private val gpuTime = new NanoSecondAccumulator + private val semaphoreHoldingTime = new NanoSecondAccumulator private val semWaitTimeNs = new NanoSecondAccumulator private val retryCount = new LongAccumulator private val splitAndRetryCount = new LongAccumulator @@ -239,7 +239,7 @@ class GpuTaskMetrics extends Serializable { } private val metrics = Map[String, AccumulatorV2[_, _]]( - "gpuTime" -> gpuTime, + "gpuTime" -> semaphoreHoldingTime, "gpuSemaphoreWait" -> semWaitTimeNs, "gpuRetryCount" -> retryCount, "gpuSplitAndRetryCount" -> splitAndRetryCount, @@ -290,7 +290,7 @@ class GpuTaskMetrics extends Serializable { } } - def addGpuTime(duration: Long): Unit = gpuTime.add(duration) + def addSemaphoreHoldingTime(duration: Long): Unit = semaphoreHoldingTime.add(duration) def getSemWaitTime(): Long = semWaitTimeNs.value.value