diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala index 2cfcc542315..72808d1f376 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DeviceMemoryEventHandler.scala @@ -24,7 +24,6 @@ import ai.rapids.cudf.{Cuda, Rmm, RmmEventHandler} import com.sun.management.HotSpotDiagnosticMXBean import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.GpuTaskMetrics import org.apache.spark.sql.rapids.execution.TrampolineUtil /** @@ -115,52 +114,49 @@ class DeviceMemoryEventHandler( s"onAllocFailure invoked with invalid retryCount $retryCount") try { - GpuTaskMetrics.get.spillTime { - val storeSize = store.currentSize - val storeSpillableSize = store.currentSpillableSize + val storeSize = store.currentSize + val storeSpillableSize = store.currentSpillableSize - val attemptMsg = if (retryCount > 0) { - s"Attempt ${retryCount}. " - } else { - "First attempt. " - } + val attemptMsg = if (retryCount > 0) { + s"Attempt ${retryCount}. " + } else { + "First attempt. " + } - val retryState = oomRetryState.get() - retryState.resetIfNeeded(retryCount, storeSpillableSize) - - logInfo(s"Device allocation of $allocSize bytes failed, device store has " + - s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg" + - s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes. ") - if (storeSpillableSize == 0) { - if (retryState.shouldTrySynchronizing(retryCount)) { - Cuda.deviceSynchronize() - logWarning(s"[RETRY ${retryState.getRetriesSoFar}] " + - s"Retrying allocation of $allocSize after a synchronize. " + - s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.") - true - } else { - logWarning(s"Device store exhausted, unable to allocate $allocSize bytes. " + - s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.") - synchronized { - if (dumpStackTracesOnFailureToHandleOOM) { - dumpStackTracesOnFailureToHandleOOM = false - GpuSemaphore.dumpActiveStackTracesToLog() - } - } - oomDumpDir.foreach(heapDump) - false - } + val retryState = oomRetryState.get() + retryState.resetIfNeeded(retryCount, storeSpillableSize) + + logInfo(s"Device allocation of $allocSize bytes failed, device store has " + + s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg" + + s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes. ") + if (storeSpillableSize == 0) { + if (retryState.shouldTrySynchronizing(retryCount)) { + Cuda.deviceSynchronize() + logWarning(s"[RETRY ${retryState.getRetriesSoFar}] " + + s"Retrying allocation of $allocSize after a synchronize. " + + s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.") + true } else { - val targetSize = Math.max(storeSpillableSize - allocSize, 0) - logDebug(s"Targeting device store size of $targetSize bytes") - val maybeAmountSpilled = - catalog.synchronousSpill(store, targetSize, Cuda.DEFAULT_STREAM) - maybeAmountSpilled.foreach { amountSpilled => - logInfo(s"Spilled $amountSpilled bytes from the device store") - TrampolineUtil.incTaskMetricsMemoryBytesSpilled(amountSpilled) + logWarning(s"Device store exhausted, unable to allocate $allocSize bytes. " + + s"Total RMM allocated is ${Rmm.getTotalBytesAllocated} bytes.") + synchronized { + if (dumpStackTracesOnFailureToHandleOOM) { + dumpStackTracesOnFailureToHandleOOM = false + GpuSemaphore.dumpActiveStackTracesToLog() + } } - true + oomDumpDir.foreach(heapDump) + false } + } else { + val targetSize = Math.max(storeSpillableSize - allocSize, 0) + logDebug(s"Targeting device store size of $targetSize bytes") + val maybeAmountSpilled = catalog.synchronousSpill(store, targetSize, Cuda.DEFAULT_STREAM) + maybeAmountSpilled.foreach { amountSpilled => + logInfo(s"Spilled $amountSpilled bytes from the device store") + TrampolineUtil.incTaskMetricsMemoryBytesSpilled(amountSpilled) + } + true } } catch { case t: Throwable => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index a58a8415cbf..511686f8557 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -28,7 +28,6 @@ import com.nvidia.spark.rapids.StorageTier.{DEVICE, StorageTier} import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.internal.Logging -import org.apache.spark.sql.rapids.GpuTaskMetrics import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -477,45 +476,43 @@ abstract class RapidsBufferStore(val tier: StorageTier) * hides the RapidsBuffer from clients and simplifies locking. */ override def getDeviceMemoryBuffer: DeviceMemoryBuffer = { - GpuTaskMetrics.get.readSpillTime { - if (RapidsBufferCatalog.shouldUnspill) { - (0 until MAX_UNSPILL_ATTEMPTS).foreach { _ => - catalog.acquireBuffer(id, DEVICE) match { - case Some(buffer) => - withResource(buffer) { _ => - return buffer.getDeviceMemoryBuffer - } - case _ => - try { - logDebug(s"Unspilling $this $id to $DEVICE") - val newBuffer = catalog.unspillBufferToDeviceStore( - this, - Cuda.DEFAULT_STREAM) - withResource(newBuffer) { _ => - return newBuffer.getDeviceMemoryBuffer - } - } catch { - case _: DuplicateBufferException => - logDebug(s"Lost device buffer registration race for buffer $id, retrying...") - } - } - } - throw new IllegalStateException(s"Unable to get device memory buffer for ID: $id") - } else { - materializeMemoryBuffer match { - case h: HostMemoryBuffer => - withResource(h) { _ => - closeOnExcept(DeviceMemoryBuffer.allocate(h.getLength)) { deviceBuffer => - logDebug(s"copying ${h.getLength} from host $h to device $deviceBuffer " + - s"of size ${deviceBuffer.getLength}") - deviceBuffer.copyFromHostBuffer(h) - deviceBuffer + if (RapidsBufferCatalog.shouldUnspill) { + (0 until MAX_UNSPILL_ATTEMPTS).foreach { _ => + catalog.acquireBuffer(id, DEVICE) match { + case Some(buffer) => + withResource(buffer) { _ => + return buffer.getDeviceMemoryBuffer + } + case _ => + try { + logDebug(s"Unspilling $this $id to $DEVICE") + val newBuffer = catalog.unspillBufferToDeviceStore( + this, + Cuda.DEFAULT_STREAM) + withResource(newBuffer) { _ => + return newBuffer.getDeviceMemoryBuffer } + } catch { + case _: DuplicateBufferException => + logDebug(s"Lost device buffer registration race for buffer $id, retrying...") } - case d: DeviceMemoryBuffer => d - case b => throw new IllegalStateException(s"Unrecognized buffer: $b") } } + throw new IllegalStateException(s"Unable to get device memory buffer for ID: $id") + } else { + materializeMemoryBuffer match { + case h: HostMemoryBuffer => + withResource(h) { _ => + closeOnExcept(DeviceMemoryBuffer.allocate(h.getLength)) { deviceBuffer => + logDebug(s"copying ${h.getLength} from host $h to device $deviceBuffer " + + s"of size ${deviceBuffer.getLength}") + deviceBuffer.copyFromHostBuffer(h) + deviceBuffer + } + } + case d: DeviceMemoryBuffer => d + case b => throw new IllegalStateException(s"Unrecognized buffer: $b") + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index bc09752bfdf..f0dfd4a53a6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -27,6 +27,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableSeq import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.sql.rapids.GpuTaskMetrics import org.apache.spark.sql.rapids.storage.RapidsStorageUtils import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -64,10 +65,12 @@ class RapidsDeviceMemoryStore( memoryBuffer match { case d: DeviceMemoryBuffer => d case h: HostMemoryBuffer => - closeOnExcept(DeviceMemoryBuffer.allocate(memoryBuffer.getLength)) { deviceBuffer => - logDebug(s"copying from host $h to device $deviceBuffer") - deviceBuffer.copyFromHostBuffer(h, stream) - deviceBuffer + GpuTaskMetrics.get.readSpillFromHostTime { + closeOnExcept(DeviceMemoryBuffer.allocate(memoryBuffer.getLength)) { deviceBuffer => + logDebug(s"copying from host $h to device $deviceBuffer") + deviceBuffer.copyFromHostBuffer(h, stream) + deviceBuffer + } } case b => throw new IllegalStateException(s"Unrecognized buffer: $b") } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index dcfd08d861d..3a4c8cf1797 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -28,7 +28,7 @@ import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta import org.apache.commons.io.IOUtils -import org.apache.spark.sql.rapids.RapidsDiskBlockManager +import org.apache.spark.sql.rapids.{GpuTaskMetrics, RapidsDiskBlockManager} import org.apache.spark.sql.rapids.execution.SerializedHostTableUtils import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -108,16 +108,19 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) Array(StandardOpenOption.CREATE, StandardOpenOption.WRITE) } var currentPos, writtenBytes = 0L - withResource(FileChannel.open(path.toPath, option: _*)) { fc => - currentPos = fc.position() - withResource(Channels.newOutputStream(fc)) { os => - withResource(diskBlockManager.getSerializerManager() - .wrapStream(incoming.id, os)) { cos => - val outputChannel = Channels.newChannel(cos) - writtenBytes = fileWritable.writeToChannel(outputChannel, stream) + + GpuTaskMetrics.get.spillToDiskTime { + withResource(FileChannel.open(path.toPath, option: _*)) { fc => + currentPos = fc.position() + withResource(Channels.newOutputStream(fc)) { os => + withResource(diskBlockManager.getSerializerManager() + .wrapStream(incoming.id, os)) { cos => + val outputChannel = Channels.newChannel(cos) + writtenBytes = fileWritable.writeToChannel(outputChannel, stream) + } } + (currentPos, writtenBytes, path.length() - currentPos) } - (currentPos, writtenBytes, path.length() - currentPos) } case other => throw new IllegalStateException( @@ -153,14 +156,16 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) val memBuffer = if (serializerManager.isRapidsSpill(id)) { // Only go through serializerManager's stream wrapper for spill case closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed => - withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c => - c.position(fileOffset) - withResource(Channels.newInputStream(c)) { compressed => - withResource(serializerManager.wrapStream(id, compressed)) { in => - withResource(new HostMemoryOutputStream(decompressed)) { out => - IOUtils.copy(in, out) + GpuTaskMetrics.get.readSpillFromDiskTime { + withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c => + c.position(fileOffset) + withResource(Channels.newInputStream(c)) { compressed => + withResource(serializerManager.wrapStream(id, compressed)) { in => + withResource(new HostMemoryOutputStream(decompressed)) { out => + IOUtils.copy(in, out) + } + decompressed } - decompressed } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 05958a7e4b1..d0aa7c413d3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -22,12 +22,13 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable -import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer, NvtxColor, NvtxRange} +import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostColumnVector, HostMemoryBuffer, JCudfSerialization, MemoryBuffer} import com.nvidia.spark.rapids.Arm.{closeOnExcept, freeOnExcept, withResource} import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_SPILL_OFFSET} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.sql.rapids.GpuTaskMetrics import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.rapids.storage.RapidsStorageUtils import org.apache.spark.sql.types.DataType @@ -123,7 +124,7 @@ class RapidsHostMemoryStore( val totalCopySize = otherBufferIterator.getTotalCopySize closeOnExcept(HostAlloc.allocHighPriority(totalCopySize)) { hb => hb.map { hostBuffer => - withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => + val spillNs = GpuTaskMetrics.get.spillToHostTime { var hostOffset = 0L val start = System.nanoTime() while (otherBufferIterator.hasNext) { @@ -140,12 +141,12 @@ class RapidsHostMemoryStore( } } stream.sync() - val end = System.nanoTime() - val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong - val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong - logDebug(s"Spill to host (chunked=$isChunked) " + - s"size=$szMB MiB bandwidth=$bw MiB/sec") + System.nanoTime() - start } + val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong + val bw = (szMB.toDouble / (spillNs.toDouble / 1000000000.0)).toLong + logDebug(s"Spill to host (chunked=$isChunked) " + + s"size=$szMB MiB bandwidth=$bw MiB/sec") new RapidsHostMemoryBuffer( other.id, totalCopySize, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index b1bcd73538f..da4f385ddfc 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -80,21 +80,28 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] { class GpuTaskMetrics extends Serializable { private val semWaitTimeNs = new NanoSecondAccumulator - private val spillBlockTimeNs = new NanoSecondAccumulator - private val readSpillTimeNs = new NanoSecondAccumulator private val retryCount = new LongAccumulator private val splitAndRetryCount = new LongAccumulator private val retryBlockTime = new NanoSecondAccumulator private val retryComputationTime = new NanoSecondAccumulator + // Spill + private val spillToHostTimeNs = new NanoSecondAccumulator + private val spillToDiskTimeNs = new NanoSecondAccumulator + private val readSpillFromHostTimeNs = new NanoSecondAccumulator + private val readSpillFromDiskTimeNs = new NanoSecondAccumulator + private val metrics = Map[String, AccumulatorV2[_, _]]( "gpuSemaphoreWait" -> semWaitTimeNs, - "gpuSpillBlockTime" -> spillBlockTimeNs, - "gpuReadSpillTime" -> readSpillTimeNs, "gpuRetryCount" -> retryCount, "gpuSplitAndRetryCount" -> splitAndRetryCount, "gpuRetryBlockTime" -> retryBlockTime, - "gpuRetryComputationTime" -> retryComputationTime) + "gpuRetryComputationTime" -> retryComputationTime, + "gpuSpillToHostTime" -> spillToHostTimeNs, + "gpuSpillToDiskTime" -> spillToDiskTimeNs, + "gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs, + "gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs + ) def register(sc: SparkContext): Unit = { metrics.foreach { case (k, m) => @@ -132,9 +139,21 @@ class GpuTaskMetrics extends Serializable { def semWaitTime[A](f: => A): A = timeIt(semWaitTimeNs, "Acquire GPU", NvtxColor.RED, f) - def spillTime[A](f: => A): A = timeIt(spillBlockTimeNs, "OnAllocFailure", NvtxColor.RED, f) + def spillToHostTime[A](f: => A): A = { + timeIt(spillToHostTimeNs, "spillToHostTime", NvtxColor.RED, f) + } + + def spillToDiskTime[A](f: => A): A = { + timeIt(spillToDiskTimeNs, "spillToDiskTime", NvtxColor.RED, f) + } - def readSpillTime[A](f: => A): A = timeIt(readSpillTimeNs, "Read Spill", NvtxColor.ORANGE, f) + def readSpillFromHostTime[A](f: => A): A = { + timeIt(readSpillFromHostTimeNs, "readSpillFromHostTime", NvtxColor.ORANGE, f) + } + + def readSpillFromDiskTime[A](f: => A): A = { + timeIt(readSpillFromDiskTimeNs, "readSpillFromDiskTime", NvtxColor.ORANGE, f) + } def updateRetry(taskAttemptId: Long): Unit = { val rc = RmmSpark.getAndResetNumRetryThrow(taskAttemptId)