diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 850a04f390f..bc67366d347 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -92,6 +92,7 @@ object GpuMetric extends Logging { val DELETION_VECTOR_SIZE = "deletionVectorSize" val CONCAT_HEADER_TIME = "concatHeaderTime" val CONCAT_BUFFER_TIME = "concatBufferTime" + val COPY_TO_HOST_TIME = "d2hMemCopyTime" // Metric Descriptions. val DESCRIPTION_BUFFER_TIME = "buffer time" @@ -133,6 +134,7 @@ object GpuMetric extends Logging { val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size" val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time" val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time" + val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time" def unwrap(input: GpuMetric): SQLMetric = input match { case w :WrappedGpuMetric => w.sqlMetric diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index bb279d847aa..4fbc612591b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -32,9 +32,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch object GpuPartitioning { // The maximum size of an Array minus a bit for overhead for metadata val MaxCpuBatchSize = 2147483639L - 2048L - - // The SQLMetric key for MemoryCopyFromDeviceToHost - val CopyToHostTime: String = "d2hMemCpyTime" } trait GpuPartitioning extends Partitioning { @@ -129,16 +126,23 @@ trait GpuPartitioning extends Partitioning { val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns) val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize - val hostPartColumns = withResource(partitionColumns) { _ => - withRetryNoSplit { - partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM)) + // We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync, + // because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when + // the copy is from pageable memory, and we're not guaranteed to be using pinned memory). + val hostPartColumns = withResource( + new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ => + val hostColumns = withResource(partitionColumns) { _ => + withRetryNoSplit { + partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM)) + } } - } - withResource(hostPartColumns) { _ => - withResource(new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ => + closeOnExcept(hostColumns) { _ => Cuda.DEFAULT_STREAM.sync() } + hostColumns + } + withResource(hostPartColumns) { _ => // Leaving the GPU for a while GpuSemaphore.releaseIfNecessary(TaskContext.get()) @@ -248,13 +252,13 @@ trait GpuPartitioning extends Partitioning { } } - private var memCopyTime: Option[GpuMetric] = None + private var memCopyTime: GpuMetric = NoopMetric /** * Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to * be called at the query planning stage. Therefore, this method is NOT thread safe. */ def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = { - memCopyTime = metrics.get(GpuPartitioning.CopyToHostTime).getOrElse(NoopMetric) + metrics.get(GpuMetric.COPY_TO_HOST_TIME).foreach(memCopyTime = _) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index fa755de3dc9..0e1b857317c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -208,7 +208,8 @@ abstract class GpuShuffleExchangeExecBase( PARTITION_SIZE -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_PARTITION_SIZE), NUM_PARTITIONS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_PARTITIONS), NUM_OUTPUT_ROWS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_OUTPUT_ROWS), - NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES) + NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES), + COPY_TO_HOST_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_COPY_TO_HOST_TIME) ) ++ additionalMetrics override def nodeName: String = "GpuColumnarExchange" @@ -294,8 +295,6 @@ object GpuShuffleExchangeExecBase { val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time" val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time" - val METRIC_COPY_TO_HOST_TIME = GpuPartitioning.CopyToHostTime - val METRIC_DESC_COPY_TO_HOST_TIME = "RAPIDS shuffle DeviceToHost copy time" def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( // dataSize and dataReadSize are uncompressed, one is on write and the other on read @@ -324,9 +323,7 @@ object GpuShuffleExchangeExecBase { METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> - gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME), - METRIC_COPY_TO_HOST_TIME -> - gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_COPY_TO_HOST_TIME) + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) ) def prepareBatchShuffleDependency( @@ -371,7 +368,7 @@ object GpuShuffleExchangeExecBase { // Inject debugging subMetrics, such as D2HTime before SliceOnCpu // The injected metrics will be serialized as the members of GpuPartitioning partitioner match { - case pt: GpuPartitioning => pt.setupDebugMetrics(additionalMetrics) + case pt: GpuPartitioning => pt.setupDebugMetrics(metrics) case _ => } val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)