Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

host watermark metric #11725

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.GpuTaskMetrics

private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
Expand Down Expand Up @@ -52,17 +54,32 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
}
}

private def getHostAllocMetricsLogStr(metrics: GpuTaskMetrics): String = {
Option(TaskContext.get()).map { context =>
val taskId = context.taskAttemptId()
val totalSize = metrics.getHostBytesAllocated
val maxSize = metrics.getMaxHostBytesAllocated
s"total size for task $taskId is $totalSize, max size is $maxSize"
}.getOrElse("allocated memory outside of a task context")
}

private def releasePinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

private def releaseNonPinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentNonPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

Expand Down Expand Up @@ -186,6 +203,9 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
allocAttemptFinishedWithoutException = true
} finally {
if (ret.isDefined) {
val metrics = GpuTaskMetrics.get
metrics.incHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.postCpuAllocSuccess(ret.get.getAddress, amount, blocking, isRecursive)
} else {
// shouldRetry should indicate if spill did anything for us and we should try again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,38 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator
private val maxHostMemoryBytes = new HighWatermarkAccumulator
private val maxDiskMemoryBytes = new HighWatermarkAccumulator

private var diskBytesAllocated: Long = 0
private var maxHostBytesAllocated: Long = 0

private var maxDiskBytesAllocated: Long = 0

def getDiskBytesAllocated: Long = diskBytesAllocated
def getDiskBytesAllocated: Long = GpuTaskMetrics.diskBytesAllocated

def getMaxDiskBytesAllocated: Long = maxDiskBytesAllocated

def getHostBytesAllocated: Long = GpuTaskMetrics.hostBytesAllocated

def getMaxHostBytesAllocated: Long = maxHostBytesAllocated

def incHostBytesAllocated(bytes: Long): Unit = {
GpuTaskMetrics.incHostBytesAllocated(bytes)
maxHostBytesAllocated = maxHostBytesAllocated.max(GpuTaskMetrics.hostBytesAllocated)
}

def decHostBytesAllocated(bytes: Long): Unit = {
GpuTaskMetrics.decHostBytesAllocated(bytes)
}


def incDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated += bytes
maxDiskBytesAllocated = maxDiskBytesAllocated.max(diskBytesAllocated)
GpuTaskMetrics.incDiskBytesAllocated(bytes)
maxDiskBytesAllocated = maxDiskBytesAllocated.max(GpuTaskMetrics.diskBytesAllocated)
}

def decDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated -= bytes
// For some reason it's possible for the task to start out by releasing resources,
// possibly from a previous task, in such case we probably should just ignore it.
diskBytesAllocated = diskBytesAllocated.max(0)
GpuTaskMetrics.decHostBytesAllocated(bytes)
}

private val metrics = Map[String, AccumulatorV2[_, _]](
Expand All @@ -153,6 +166,7 @@ class GpuTaskMetrics extends Serializable {
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes,
"gpuMaxHostMemoryBytes" -> maxHostMemoryBytes,
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes
)

Expand Down Expand Up @@ -242,6 +256,9 @@ class GpuTaskMetrics extends Serializable {
// add method instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
if (maxHostBytesAllocated > 0) {
maxHostMemoryBytes.add(maxHostBytesAllocated)
}
if (maxDiskBytesAllocated > 0) {
maxDiskMemoryBytes.add(maxDiskBytesAllocated)
}
Expand All @@ -254,6 +271,25 @@ class GpuTaskMetrics extends Serializable {
object GpuTaskMetrics extends Logging {
private val taskLevelMetrics = mutable.Map[Long, GpuTaskMetrics]()

private var hostBytesAllocated: Long = 0
private var diskBytesAllocated: Long = 0

private def incHostBytesAllocated(bytes: Long): Unit = synchronized {
hostBytesAllocated += bytes
}

private def decHostBytesAllocated(bytes: Long): Unit = synchronized {
hostBytesAllocated -= bytes
}

def incDiskBytesAllocated(bytes: Long): Unit = synchronized {
diskBytesAllocated += bytes
}

def decDiskBytesAllocated(bytes: Long): Unit = synchronized {
diskBytesAllocated -= bytes
}

def registerOnTask(metrics: GpuTaskMetrics): Unit = synchronized {
val tc = TaskContext.get()
if (tc != null) {
Expand Down