Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine-grained spill metrics #9509

Merged
merged 19 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import ai.rapids.cudf.{Cuda, Rmm, RmmEventHandler}
import com.sun.management.HotSpotDiagnosticMXBean

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.rapids.execution.TrampolineUtil

/**
Expand Down Expand Up @@ -115,52 +114,49 @@ class DeviceMemoryEventHandler(
s"onAllocFailure invoked with invalid retryCount $retryCount")

try {
GpuTaskMetrics.get.spillTime {
val storeSize = store.currentSize
val storeSpillableSize = store.currentSpillableSize
val storeSize = store.currentSize
val storeSpillableSize = store.currentSpillableSize

val attemptMsg = if (retryCount > 0) {
s"Attempt ${retryCount}. "
} else {
"First attempt. "
}
val attemptMsg = if (retryCount > 0) {
s"Attempt ${retryCount}. "
} else {
"First attempt. "
}

val retryState = oomRetryState.get()
retryState.resetIfNeeded(retryCount, storeSpillableSize)

logInfo(s"Device allocation of $allocSize bytes failed, device store has " +
s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg" +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes. ")
if (storeSpillableSize == 0) {
if (retryState.shouldTrySynchronizing(retryCount)) {
Cuda.deviceSynchronize()
logWarning(s"[RETRY ${retryState.getRetriesSoFar}] " +
s"Retrying allocation of $allocSize after a synchronize. " +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.")
true
} else {
logWarning(s"Device store exhausted, unable to allocate $allocSize bytes. " +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.")
synchronized {
if (dumpStackTracesOnFailureToHandleOOM) {
dumpStackTracesOnFailureToHandleOOM = false
GpuSemaphore.dumpActiveStackTracesToLog()
}
}
oomDumpDir.foreach(heapDump)
false
}
val retryState = oomRetryState.get()
retryState.resetIfNeeded(retryCount, storeSpillableSize)

logInfo(s"Device allocation of $allocSize bytes failed, device store has " +
s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg" +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes. ")
if (storeSpillableSize == 0) {
if (retryState.shouldTrySynchronizing(retryCount)) {
Cuda.deviceSynchronize()
logWarning(s"[RETRY ${retryState.getRetriesSoFar}] " +
s"Retrying allocation of $allocSize after a synchronize. " +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.")
true
} else {
val targetSize = Math.max(storeSpillableSize - allocSize, 0)
logDebug(s"Targeting device store size of $targetSize bytes")
val maybeAmountSpilled =
catalog.synchronousSpill(store, targetSize, Cuda.DEFAULT_STREAM)
maybeAmountSpilled.foreach { amountSpilled =>
logInfo(s"Spilled $amountSpilled bytes from the device store")
TrampolineUtil.incTaskMetricsMemoryBytesSpilled(amountSpilled)
logWarning(s"Device store exhausted, unable to allocate $allocSize bytes. " +
s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.")
synchronized {
if (dumpStackTracesOnFailureToHandleOOM) {
dumpStackTracesOnFailureToHandleOOM = false
GpuSemaphore.dumpActiveStackTracesToLog()
}
}
true
oomDumpDir.foreach(heapDump)
false
}
} else {
val targetSize = Math.max(storeSpillableSize - allocSize, 0)
logDebug(s"Targeting device store size of $targetSize bytes")
val maybeAmountSpilled = catalog.synchronousSpill(store, targetSize, Cuda.DEFAULT_STREAM)
maybeAmountSpilled.foreach { amountSpilled =>
logInfo(s"Spilled $amountSpilled bytes from the device store")
TrampolineUtil.incTaskMetricsMemoryBytesSpilled(amountSpilled)
}
true
}
} catch {
case t: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import ai.rapids.cudf.{ColumnView, HostMemoryBuffer, HostMemoryReservation, Memo
import com.nvidia.spark.rapids.HostAlloc.align

import org.apache.spark.TaskContext
import org.apache.spark.sql.rapids.GpuTaskMetrics

private class HostAlloc(nonPinnedLimit: Long) {
private var currentNonPinnedAllocated: Long = 0L
Expand Down Expand Up @@ -415,7 +416,9 @@ object HostAlloc {
}

def alloc(amount: Long, preferPinned: Boolean = true): HostMemoryBuffer = {
getSingleton.alloc(amount, preferPinned)
GpuTaskMetrics.get.hostAllocTime {
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
getSingleton.alloc(amount, preferPinned)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import com.nvidia.spark.rapids.StorageTier.{DEVICE, StorageTier}
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -477,45 +476,43 @@ abstract class RapidsBufferStore(val tier: StorageTier)
* hides the RapidsBuffer from clients and simplifies locking.
*/
override def getDeviceMemoryBuffer: DeviceMemoryBuffer = {
GpuTaskMetrics.get.readSpillTime {
if (RapidsBufferCatalog.shouldUnspill) {
(0 until MAX_UNSPILL_ATTEMPTS).foreach { _ =>
catalog.acquireBuffer(id, DEVICE) match {
case Some(buffer) =>
withResource(buffer) { _ =>
return buffer.getDeviceMemoryBuffer
}
case _ =>
try {
logDebug(s"Unspilling $this $id to $DEVICE")
val newBuffer = catalog.unspillBufferToDeviceStore(
this,
Cuda.DEFAULT_STREAM)
withResource(newBuffer) { _ =>
return newBuffer.getDeviceMemoryBuffer
}
} catch {
case _: DuplicateBufferException =>
logDebug(s"Lost device buffer registration race for buffer $id, retrying...")
}
}
}
throw new IllegalStateException(s"Unable to get device memory buffer for ID: $id")
} else {
materializeMemoryBuffer match {
case h: HostMemoryBuffer =>
withResource(h) { _ =>
closeOnExcept(DeviceMemoryBuffer.allocate(h.getLength)) { deviceBuffer =>
logDebug(s"copying ${h.getLength} from host $h to device $deviceBuffer " +
s"of size ${deviceBuffer.getLength}")
deviceBuffer.copyFromHostBuffer(h)
deviceBuffer
if (RapidsBufferCatalog.shouldUnspill) {
(0 until MAX_UNSPILL_ATTEMPTS).foreach { _ =>
catalog.acquireBuffer(id, DEVICE) match {
case Some(buffer) =>
withResource(buffer) { _ =>
return buffer.getDeviceMemoryBuffer
}
case _ =>
try {
logDebug(s"Unspilling $this $id to $DEVICE")
val newBuffer = catalog.unspillBufferToDeviceStore(
this,
Cuda.DEFAULT_STREAM)
withResource(newBuffer) { _ =>
return newBuffer.getDeviceMemoryBuffer
}
} catch {
case _: DuplicateBufferException =>
logDebug(s"Lost device buffer registration race for buffer $id, retrying...")
}
case d: DeviceMemoryBuffer => d
case b => throw new IllegalStateException(s"Unrecognized buffer: $b")
}
}
throw new IllegalStateException(s"Unable to get device memory buffer for ID: $id")
} else {
materializeMemoryBuffer match {
case h: HostMemoryBuffer =>
withResource(h) { _ =>
closeOnExcept(DeviceMemoryBuffer.allocate(h.getLength)) { deviceBuffer =>
logDebug(s"copying ${h.getLength} from host $h to device $deviceBuffer " +
s"of size ${deviceBuffer.getLength}")
deviceBuffer.copyFromHostBuffer(h)
deviceBuffer
}
}
case d: DeviceMemoryBuffer => d
case b => throw new IllegalStateException(s"Unrecognized buffer: $b")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableSeq
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.rapids.storage.RapidsStorageUtils
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -64,10 +65,12 @@ class RapidsDeviceMemoryStore(
memoryBuffer match {
case d: DeviceMemoryBuffer => d
case h: HostMemoryBuffer =>
closeOnExcept(DeviceMemoryBuffer.allocate(memoryBuffer.getLength)) { deviceBuffer =>
logDebug(s"copying from host $h to device $deviceBuffer")
deviceBuffer.copyFromHostBuffer(h, stream)
deviceBuffer
GpuTaskMetrics.get.readSpillHost2GpuTime {
closeOnExcept(DeviceMemoryBuffer.allocate(memoryBuffer.getLength)) { deviceBuffer =>
logDebug(s"copying from host $h to device $deviceBuffer")
deviceBuffer.copyFromHostBuffer(h, stream)
deviceBuffer
}
}
case b => throw new IllegalStateException(s"Unrecognized buffer: $b")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta
import org.apache.commons.io.IOUtils

import org.apache.spark.sql.rapids.RapidsDiskBlockManager
import org.apache.spark.sql.rapids.{GpuTaskMetrics, RapidsDiskBlockManager}
import org.apache.spark.sql.rapids.execution.SerializedHostTableUtils
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -108,16 +108,19 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
Array(StandardOpenOption.CREATE, StandardOpenOption.WRITE)
}
var currentPos, writtenBytes = 0L
withResource(FileChannel.open(path.toPath, option: _*)) { fc =>
currentPos = fc.position()
withResource(Channels.newOutputStream(fc)) { os =>
withResource(diskBlockManager.getSerializerManager()
.wrapStream(incoming.id, os)) { cos =>
val outputChannel = Channels.newChannel(cos)
writtenBytes = fileWritable.writeToChannel(outputChannel, stream)

GpuTaskMetrics.get.spillToDiskTime {
withResource(FileChannel.open(path.toPath, option: _*)) { fc =>
currentPos = fc.position()
withResource(Channels.newOutputStream(fc)) { os =>
withResource(diskBlockManager.getSerializerManager()
.wrapStream(incoming.id, os)) { cos =>
val outputChannel = Channels.newChannel(cos)
writtenBytes = fileWritable.writeToChannel(outputChannel, stream)
}
}
(currentPos, writtenBytes, path.length() - currentPos)
}
(currentPos, writtenBytes, path.length() - currentPos)
}
case other =>
throw new IllegalStateException(
Expand Down Expand Up @@ -153,14 +156,16 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
val memBuffer = if (serializerManager.isRapidsSpill(id)) {
// Only go through serializerManager's stream wrapper for spill case
closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed =>
withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c =>
c.position(fileOffset)
withResource(Channels.newInputStream(c)) { compressed =>
withResource(serializerManager.wrapStream(id, compressed)) { in =>
withResource(new HostMemoryOutputStream(decompressed)) { out =>
IOUtils.copy(in, out)
GpuTaskMetrics.get.readSpillDisk2HostTime {
withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c =>
c.position(fileOffset)
withResource(Channels.newInputStream(c)) { compressed =>
withResource(serializerManager.wrapStream(id, compressed)) { in =>
withResource(new HostMemoryOutputStream(decompressed)) { out =>
IOUtils.copy(in, out)
}
decompressed
}
decompressed
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer, NvtxColor, NvtxRange}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, freeOnExcept, withResource}
import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_SPILL_OFFSET}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.GpuTaskMetrics
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.storage.RapidsStorageUtils
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -123,7 +124,7 @@ class RapidsHostMemoryStore(
val totalCopySize = otherBufferIterator.getTotalCopySize
closeOnExcept(HostAlloc.allocHighPriority(totalCopySize)) { hb =>
hb.map { hostBuffer =>
withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ =>
val spillNs = GpuTaskMetrics.get.spillToHostTime {
var hostOffset = 0L
val start = System.nanoTime()
while (otherBufferIterator.hasNext) {
Expand All @@ -140,12 +141,12 @@ class RapidsHostMemoryStore(
}
}
stream.sync()
val end = System.nanoTime()
val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong
val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong
logDebug(s"Spill to host (chunked=$isChunked) " +
s"size=$szMB MiB bandwidth=$bw MiB/sec")
System.nanoTime() - start
}
val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong
val bw = (szMB.toDouble / (spillNs.toDouble / 1000000000.0)).toLong
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
logDebug(s"Spill to host (chunked=$isChunked) " +
s"size=$szMB MiB bandwidth=$bw MiB/sec")
new RapidsHostMemoryBuffer(
other.id,
totalCopySize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,31 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] {

class GpuTaskMetrics extends Serializable {
private val semWaitTimeNs = new NanoSecondAccumulator
private val spillBlockTimeNs = new NanoSecondAccumulator
private val readSpillTimeNs = new NanoSecondAccumulator
private val retryCount = new LongAccumulator
private val splitAndRetryCount = new LongAccumulator
private val retryBlockTime = new NanoSecondAccumulator
private val retryComputationTime = new NanoSecondAccumulator

// Spill
private val spillToHostTimeNs = new NanoSecondAccumulator
private val spillToDiskTimeNs = new NanoSecondAccumulator
private val readSpillFromHostTimeNs = new NanoSecondAccumulator
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val hostAllocWaitTimeNs = new NanoSecondAccumulator

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuSemaphoreWait" -> semWaitTimeNs,
"gpuSpillBlockTime" -> spillBlockTimeNs,
"gpuReadSpillTime" -> readSpillTimeNs,
"gpuRetryCount" -> retryCount,
"gpuSplitAndRetryCount" -> splitAndRetryCount,
"gpuRetryBlockTime" -> retryBlockTime,
"gpuRetryComputationTime" -> retryComputationTime)
"gpuRetryComputationTime" -> retryComputationTime,
"gpuSpillToHostTime" -> spillToHostTimeNs,
"gpuSpillToDiskTime" -> spillToDiskTimeNs,
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuHostAllocationWaitTime" -> hostAllocWaitTimeNs
)

def register(sc: SparkContext): Unit = {
metrics.foreach { case (k, m) =>
Expand Down Expand Up @@ -132,9 +142,25 @@ class GpuTaskMetrics extends Serializable {

def semWaitTime[A](f: => A): A = timeIt(semWaitTimeNs, "Acquire GPU", NvtxColor.RED, f)

def spillTime[A](f: => A): A = timeIt(spillBlockTimeNs, "OnAllocFailure", NvtxColor.RED, f)
def spillToHostTime[A](f: => A): A = {
timeIt(spillToHostTimeNs, "spillToHostTime", NvtxColor.RED, f)
}

def spillToDiskTime[A](f: => A): A = {
timeIt(spillToDiskTimeNs, "spillToDiskTime", NvtxColor.RED, f)
}

def readSpillHost2GpuTime[A](f: => A): A = {
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
timeIt(readSpillFromHostTimeNs, "readSpillFromHostTime", NvtxColor.ORANGE, f)
}

def readSpillTime[A](f: => A): A = timeIt(readSpillTimeNs, "Read Spill", NvtxColor.ORANGE, f)
def readSpillDisk2HostTime[A](f: => A): A = {
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
timeIt(readSpillFromDiskTimeNs, "readSpillFromDiskTime", NvtxColor.ORANGE, f)
}

def hostAllocTime[A](f: => A): A = {
timeIt(hostAllocWaitTimeNs, "hostAllocWaitTime", NvtxColor.RED, f)
}

def updateRetry(taskAttemptId: Long): Unit = {
val rc = RmmSpark.getAndResetNumRetryThrow(taskAttemptId)
Expand Down