Skip to content

Commit

Permalink
host watermark metric
Browse files Browse the repository at this point in the history
Signed-off-by: Zach Puller <[email protected]>
  • Loading branch information
zpuller committed Nov 15, 2024
1 parent d208004 commit 6c3a566
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
25 changes: 25 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.GpuTaskMetrics

private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
Expand Down Expand Up @@ -52,17 +54,35 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
}
}

private def reportHostAllocMetrics(metrics: GpuTaskMetrics): String = {
try {
val taskId = TaskContext.get().taskAttemptId()
val totalSize = metrics.getHostBytesAllocated
val maxSize = metrics.getMaxHostBytesAllocated
s"total size for task $taskId is $totalSize, max size is $maxSize"
} catch {
case _: NullPointerException =>
"allocated memory outside of a task context"
}
}

private def releasePinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logDebug(reportHostAllocMetrics(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

private def releaseNonPinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentNonPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logDebug(reportHostAllocMetrics(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

Expand Down Expand Up @@ -186,6 +206,11 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
allocAttemptFinishedWithoutException = true
} finally {
if (ret.isDefined) {
// Alternatively we could do the host watermark tracking in the JNI code to make
// it consistent with how we handle device memory tracking
val metrics = GpuTaskMetrics.get
metrics.incHostBytesAllocated(amount)
logDebug(reportHostAllocMetrics(metrics))
RmmSpark.postCpuAllocSuccess(ret.get.getAddress, amount, blocking, isRecursive)
} else {
// shouldRetry should indicate if spill did anything for us and we should try again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,36 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator
private val maxHostMemoryBytes = new HighWatermarkAccumulator
private val maxDiskMemoryBytes = new HighWatermarkAccumulator

private var hostBytesAllocated: Long = 0
private var maxHostBytesAllocated: Long = 0

private var diskBytesAllocated: Long = 0
private var maxDiskBytesAllocated: Long = 0

def getDiskBytesAllocated: Long = diskBytesAllocated

def getMaxDiskBytesAllocated: Long = maxDiskBytesAllocated

def getHostBytesAllocated: Long = hostBytesAllocated

def getMaxHostBytesAllocated: Long = maxHostBytesAllocated

def incHostBytesAllocated(bytes: Long): Unit = {
hostBytesAllocated += bytes
maxHostBytesAllocated = maxHostBytesAllocated.max(hostBytesAllocated)
}

def decHostBytesAllocated(bytes: Long): Unit = {
hostBytesAllocated -= bytes
// For some reason it's possible for the task to start out by releasing resources,
// possibly from a previous task, in such case we probably should just ignore it.
hostBytesAllocated = hostBytesAllocated.max(0)
}


def incDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated += bytes
maxDiskBytesAllocated = maxDiskBytesAllocated.max(diskBytesAllocated)
Expand All @@ -153,6 +174,7 @@ class GpuTaskMetrics extends Serializable {
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes,
"gpuMaxHostMemoryBytes" -> maxHostMemoryBytes,
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes
)

Expand Down Expand Up @@ -242,6 +264,9 @@ class GpuTaskMetrics extends Serializable {
// add method instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
if (maxHostBytesAllocated > 0) {
maxHostMemoryBytes.add(maxHostBytesAllocated)
}
if (maxDiskBytesAllocated > 0) {
maxDiskMemoryBytes.add(maxDiskBytesAllocated)
}
Expand Down

0 comments on commit 6c3a566

Please sign in to comment.