From 967d345093943219c2a346403ced8120377ced25 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Tue, 17 Dec 2024 12:52:36 +0900 Subject: [PATCH 1/5] add metrics GpuPartitioning.CopyToHostTime Signed-off-by: sperlingxx --- .../nvidia/spark/rapids/GpuPartitioning.scala | 28 ++++++++++++++++++- .../GpuShuffleExchangeExecBase.scala | 12 +++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 6394e2974b4..d2338a91384 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -32,6 +32,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch object GpuPartitioning { // The maximum size of an Array minus a bit for overhead for metadata val MaxCpuBatchSize = 2147483639L - 2048L + + // The SQLMetric key for MemoryCopyFromDeviceToHost + val CopyToHostTime: String = "d2hMemCpyTime" } trait GpuPartitioning extends Partitioning { @@ -132,7 +135,15 @@ trait GpuPartitioning extends Partitioning { } } withResource(hostPartColumns) { _ => - Cuda.DEFAULT_STREAM.sync() + lazy val memCpyNvtxRange = memCopyTime.map( + new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, _)) + .getOrElse( + new NvtxRange("PartitionD2H", NvtxColor.CYAN)) + // Wait for copyToHostAsync + withResource(memCpyNvtxRange) { _ => + Cuda.DEFAULT_STREAM.sync() + } + // Leaving the GPU for a while GpuSemaphore.releaseIfNecessary(TaskContext.get()) @@ -241,4 +252,19 @@ trait GpuPartitioning extends Partitioning { } } } + + private var memCopyTime: Option[GpuMetric] = None + + /** + * Setup Spark SQL Metrics for the details of GpuPartition. This method is expected to be called + * at the query planning stage for only once. + */ + def setupMetrics(metrics: Map[String, GpuMetric]): Unit = { + metrics.get(GpuPartitioning.CopyToHostTime).foreach { metric => + // Check and set GpuPartitioning.CopyToHostTime + require(memCopyTime.isEmpty, + s"The GpuMetric[${GpuPartitioning.CopyToHostTime}] has already been set") + memCopyTime = Some(metric) + } + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 332545a99e1..6fb78d85554 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -294,6 +294,8 @@ object GpuShuffleExchangeExecBase { val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time" val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time" + val METRIC_COPY_TO_HOST_TIME = GpuPartitioning.CopyToHostTime + val METRIC_DESC_COPY_TO_HOST_TIME = "RAPIDS shuffle DeviceToHost copy time" def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( // dataSize and dataReadSize are uncompressed, one is on write and the other on read @@ -322,7 +324,9 @@ object GpuShuffleExchangeExecBase { METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> - gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME), + METRIC_COPY_TO_HOST_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_COPY_TO_HOST_TIME) ) def prepareBatchShuffleDependency( @@ -364,6 +368,12 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) + // Inject detailed Metrics, such as D2HTime before SliceOnCpu + // The injected metrics will be serialized as the members of GpuPartitioning + partitioner match { + case pt: GpuPartitioning => pt.setupMetrics(additionalMetrics) + case _ => + } val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) def getPartitioned: ColumnarBatch => Any = { batch => partitionTime.ns { From 00580a04420ff524ac7f3dafb49af1b771ed8cea Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Tue, 17 Dec 2024 14:44:04 +0900 Subject: [PATCH 2/5] fix --- .../nvidia/spark/rapids/GpuPartitioning.scala | 16 ++++++++-------- .../execution/GpuShuffleExchangeExecBase.scala | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index d2338a91384..616e2df721b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -256,15 +256,15 @@ trait GpuPartitioning extends Partitioning { private var memCopyTime: Option[GpuMetric] = None /** - * Setup Spark SQL Metrics for the details of GpuPartition. This method is expected to be called - * at the query planning stage for only once. + * Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to + * be called at the query planning stage. Therefore, this method is NOT thread safe. */ - def setupMetrics(metrics: Map[String, GpuMetric]): Unit = { - metrics.get(GpuPartitioning.CopyToHostTime).foreach { metric => - // Check and set GpuPartitioning.CopyToHostTime - require(memCopyTime.isEmpty, - s"The GpuMetric[${GpuPartitioning.CopyToHostTime}] has already been set") - memCopyTime = Some(metric) + def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = { + // Check and set GpuPartitioning.CopyToHostTime + if (memCopyTime.isEmpty) { + metrics.get(GpuPartitioning.CopyToHostTime).foreach { metric => + memCopyTime = Some(metric) + } } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 6fb78d85554..fa755de3dc9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -368,10 +368,10 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) - // Inject detailed Metrics, such as D2HTime before SliceOnCpu + // Inject debugging subMetrics, such as D2HTime before SliceOnCpu // The injected metrics will be serialized as the members of GpuPartitioning partitioner match { - case pt: GpuPartitioning => pt.setupMetrics(additionalMetrics) + case pt: GpuPartitioning => pt.setupDebugMetrics(additionalMetrics) case _ => } val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) From 85e84eb27d1bd6380c3afb18a3697928557bb5cc Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Wed, 18 Dec 2024 09:24:17 +0800 Subject: [PATCH 3/5] Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala Co-authored-by: Jason Lowe --- .../scala/com/nvidia/spark/rapids/GpuPartitioning.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 616e2df721b..1632eb5f2a2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -260,11 +260,6 @@ trait GpuPartitioning extends Partitioning { * be called at the query planning stage. Therefore, this method is NOT thread safe. */ def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = { - // Check and set GpuPartitioning.CopyToHostTime - if (memCopyTime.isEmpty) { - metrics.get(GpuPartitioning.CopyToHostTime).foreach { metric => - memCopyTime = Some(metric) - } - } + memCopyTime = metrics.get(GpuPartitioning.CopyToHostTime).getOrElse(NoopMetric) } } From 377f87c39559d37a74bc189f0d5d3e9dcd57419e Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Wed, 18 Dec 2024 09:24:32 +0800 Subject: [PATCH 4/5] Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala Co-authored-by: Jason Lowe --- .../scala/com/nvidia/spark/rapids/GpuPartitioning.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 1632eb5f2a2..bb279d847aa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -135,12 +135,7 @@ trait GpuPartitioning extends Partitioning { } } withResource(hostPartColumns) { _ => - lazy val memCpyNvtxRange = memCopyTime.map( - new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, _)) - .getOrElse( - new NvtxRange("PartitionD2H", NvtxColor.CYAN)) - // Wait for copyToHostAsync - withResource(memCpyNvtxRange) { _ => + withResource(new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ => Cuda.DEFAULT_STREAM.sync() } From 505b7e8b50b563359b367380a1bace1c9b7d2fa7 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 18 Dec 2024 10:54:57 +0800 Subject: [PATCH 5/5] refine Signed-off-by: sperlingxx --- .../com/nvidia/spark/rapids/GpuExec.scala | 2 ++ .../nvidia/spark/rapids/GpuPartitioning.scala | 26 +++++++++++-------- .../GpuShuffleExchangeExecBase.scala | 11 +++----- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 850a04f390f..bc67366d347 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -92,6 +92,7 @@ object GpuMetric extends Logging { val DELETION_VECTOR_SIZE = "deletionVectorSize" val CONCAT_HEADER_TIME = "concatHeaderTime" val CONCAT_BUFFER_TIME = "concatBufferTime" + val COPY_TO_HOST_TIME = "d2hMemCopyTime" // Metric Descriptions. val DESCRIPTION_BUFFER_TIME = "buffer time" @@ -133,6 +134,7 @@ object GpuMetric extends Logging { val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size" val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time" val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time" + val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time" def unwrap(input: GpuMetric): SQLMetric = input match { case w :WrappedGpuMetric => w.sqlMetric diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index bb279d847aa..4fbc612591b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -32,9 +32,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch object GpuPartitioning { // The maximum size of an Array minus a bit for overhead for metadata val MaxCpuBatchSize = 2147483639L - 2048L - - // The SQLMetric key for MemoryCopyFromDeviceToHost - val CopyToHostTime: String = "d2hMemCpyTime" } trait GpuPartitioning extends Partitioning { @@ -129,16 +126,23 @@ trait GpuPartitioning extends Partitioning { val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns) val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize - val hostPartColumns = withResource(partitionColumns) { _ => - withRetryNoSplit { - partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM)) + // We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync, + // because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when + // the copy is from pageable memory, and we're not guaranteed to be using pinned memory). + val hostPartColumns = withResource( + new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ => + val hostColumns = withResource(partitionColumns) { _ => + withRetryNoSplit { + partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM)) + } } - } - withResource(hostPartColumns) { _ => - withResource(new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ => + closeOnExcept(hostColumns) { _ => Cuda.DEFAULT_STREAM.sync() } + hostColumns + } + withResource(hostPartColumns) { _ => // Leaving the GPU for a while GpuSemaphore.releaseIfNecessary(TaskContext.get()) @@ -248,13 +252,13 @@ trait GpuPartitioning extends Partitioning { } } - private var memCopyTime: Option[GpuMetric] = None + private var memCopyTime: GpuMetric = NoopMetric /** * Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to * be called at the query planning stage. Therefore, this method is NOT thread safe. */ def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = { - memCopyTime = metrics.get(GpuPartitioning.CopyToHostTime).getOrElse(NoopMetric) + metrics.get(GpuMetric.COPY_TO_HOST_TIME).foreach(memCopyTime = _) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index fa755de3dc9..0e1b857317c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -208,7 +208,8 @@ abstract class GpuShuffleExchangeExecBase( PARTITION_SIZE -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_PARTITION_SIZE), NUM_PARTITIONS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_PARTITIONS), NUM_OUTPUT_ROWS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_OUTPUT_ROWS), - NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES) + NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES), + COPY_TO_HOST_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_COPY_TO_HOST_TIME) ) ++ additionalMetrics override def nodeName: String = "GpuColumnarExchange" @@ -294,8 +295,6 @@ object GpuShuffleExchangeExecBase { val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time" val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time" - val METRIC_COPY_TO_HOST_TIME = GpuPartitioning.CopyToHostTime - val METRIC_DESC_COPY_TO_HOST_TIME = "RAPIDS shuffle DeviceToHost copy time" def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( // dataSize and dataReadSize are uncompressed, one is on write and the other on read @@ -324,9 +323,7 @@ object GpuShuffleExchangeExecBase { METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> - gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME), - METRIC_COPY_TO_HOST_TIME -> - gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_COPY_TO_HOST_TIME) + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) ) def prepareBatchShuffleDependency( @@ -371,7 +368,7 @@ object GpuShuffleExchangeExecBase { // Inject debugging subMetrics, such as D2HTime before SliceOnCpu // The injected metrics will be serialized as the members of GpuPartitioning partitioner match { - case pt: GpuPartitioning => pt.setupDebugMetrics(additionalMetrics) + case pt: GpuPartitioning => pt.setupDebugMetrics(metrics) case _ => } val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)