Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

host watermark metric #11725

Merged
merged 7 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.GpuTaskMetrics

private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
Expand Down Expand Up @@ -52,17 +54,32 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
}
}

private def getHostAllocMetricsLogStr(metrics: GpuTaskMetrics): String = {
Option(TaskContext.get()).map { context =>
val taskId = context.taskAttemptId()
val totalSize = metrics.getHostBytesAllocated
val maxSize = metrics.getMaxHostBytesAllocated
s"total size for task $taskId is $totalSize, max size is $maxSize"
}.getOrElse("allocated memory outside of a task context")
}

private def releasePinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

private def releaseNonPinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentNonPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

Expand Down Expand Up @@ -186,6 +203,9 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
allocAttemptFinishedWithoutException = true
} finally {
if (ret.isDefined) {
val metrics = GpuTaskMetrics.get
metrics.incHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.postCpuAllocSuccess(ret.get.getAddress, amount, blocking, isRecursive)
} else {
// shouldRetry should indicate if spill did anything for us and we should try again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,33 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator
private val maxHostMemoryBytes = new HighWatermarkAccumulator
private val maxDiskMemoryBytes = new HighWatermarkAccumulator

private var hostBytesAllocated: Long = 0
private var maxHostBytesAllocated: Long = 0

private var diskBytesAllocated: Long = 0
private var maxDiskBytesAllocated: Long = 0

def getDiskBytesAllocated: Long = diskBytesAllocated

def getMaxDiskBytesAllocated: Long = maxDiskBytesAllocated

def getHostBytesAllocated: Long = hostBytesAllocated

def getMaxHostBytesAllocated: Long = maxHostBytesAllocated

def incHostBytesAllocated(bytes: Long): Unit = {
hostBytesAllocated += bytes
maxHostBytesAllocated = maxHostBytesAllocated.max(hostBytesAllocated)
}

def decHostBytesAllocated(bytes: Long): Unit = {
hostBytesAllocated -= bytes
}


def incDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated += bytes
maxDiskBytesAllocated = maxDiskBytesAllocated.max(diskBytesAllocated)
Expand All @@ -153,6 +171,7 @@ class GpuTaskMetrics extends Serializable {
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes,
"gpuMaxHostMemoryBytes" -> maxHostMemoryBytes,
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes
)

Expand Down Expand Up @@ -242,6 +261,9 @@ class GpuTaskMetrics extends Serializable {
// add method instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
if (maxHostBytesAllocated > 0) {
maxHostMemoryBytes.add(maxHostBytesAllocated)
}
if (maxDiskBytesAllocated > 0) {
maxDiskMemoryBytes.add(maxDiskBytesAllocated)
}
Expand Down
Loading