Skip to content

Commit

Permalink
Add metrics GpuPartitioning.CopyToHostTime (#11882)
Browse files Browse the repository at this point in the history
Add metrics GpuPartitioning.CopyToHostTime

Signed-off-by: sperlingxx <[email protected]>
Co-authored-by: Jason Lowe <[email protected]>
  • Loading branch information
sperlingxx and jlowe authored Dec 19, 2024
1 parent 3f26d33 commit 231a9c6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object GpuMetric extends Logging {
val DELETION_VECTOR_SIZE = "deletionVectorSize"
val CONCAT_HEADER_TIME = "concatHeaderTime"
val CONCAT_BUFFER_TIME = "concatBufferTime"
val COPY_TO_HOST_TIME = "d2hMemCopyTime"

// Metric Descriptions.
val DESCRIPTION_BUFFER_TIME = "buffer time"
Expand Down Expand Up @@ -133,6 +134,7 @@ object GpuMetric extends Logging {
val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size"
val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time"
val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time"
val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,23 @@ trait GpuPartitioning extends Partitioning {
val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns)
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize

val hostPartColumns = withResource(partitionColumns) { _ =>
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM))
// We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync,
// because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when
// the copy is from pageable memory, and we're not guaranteed to be using pinned memory).
val hostPartColumns = withResource(
new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ =>
val hostColumns = withResource(partitionColumns) { _ =>
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM))
}
}
closeOnExcept(hostColumns) { _ =>
Cuda.DEFAULT_STREAM.sync()
}
hostColumns
}

withResource(hostPartColumns) { _ =>
Cuda.DEFAULT_STREAM.sync()
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())

Expand Down Expand Up @@ -241,4 +251,14 @@ trait GpuPartitioning extends Partitioning {
}
}
}

private var memCopyTime: GpuMetric = NoopMetric

/**
* Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to
* be called at the query planning stage. Therefore, this method is NOT thread safe.
*/
def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = {
metrics.get(GpuMetric.COPY_TO_HOST_TIME).foreach(memCopyTime = _)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ abstract class GpuShuffleExchangeExecBase(
PARTITION_SIZE -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_PARTITION_SIZE),
NUM_PARTITIONS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_PARTITIONS),
NUM_OUTPUT_ROWS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_OUTPUT_ROWS),
NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES)
NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES),
COPY_TO_HOST_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_COPY_TO_HOST_TIME)
) ++ additionalMetrics

override def nodeName: String = "GpuColumnarExchange"
Expand Down Expand Up @@ -364,6 +365,12 @@ object GpuShuffleExchangeExecBase {
rdd
}
val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning)
// Inject debugging subMetrics, such as D2HTime before SliceOnCpu
// The injected metrics will be serialized as the members of GpuPartitioning
partitioner match {
case pt: GpuPartitioning => pt.setupDebugMetrics(metrics)
case _ =>
}
val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
def getPartitioned: ColumnarBatch => Any = {
batch => partitionTime.ns {
Expand Down

0 comments on commit 231a9c6

Please sign in to comment.