Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx committed Dec 18, 2024
1 parent 377f87c commit 505b7e8
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ object GpuMetric extends Logging {
val DELETION_VECTOR_SIZE = "deletionVectorSize"
val CONCAT_HEADER_TIME = "concatHeaderTime"
val CONCAT_BUFFER_TIME = "concatBufferTime"
val COPY_TO_HOST_TIME = "d2hMemCopyTime"

// Metric Descriptions.
val DESCRIPTION_BUFFER_TIME = "buffer time"
Expand Down Expand Up @@ -133,6 +134,7 @@ object GpuMetric extends Logging {
val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size"
val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time"
val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time"
val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
object GpuPartitioning {
// The maximum size of an Array minus a bit for overhead for metadata
val MaxCpuBatchSize = 2147483639L - 2048L

// The SQLMetric key for MemoryCopyFromDeviceToHost
val CopyToHostTime: String = "d2hMemCpyTime"
}

trait GpuPartitioning extends Partitioning {
Expand Down Expand Up @@ -129,16 +126,23 @@ trait GpuPartitioning extends Partitioning {
val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns)
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize

val hostPartColumns = withResource(partitionColumns) { _ =>
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM))
// We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync,
// because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when
// the copy is from pageable memory, and we're not guaranteed to be using pinned memory).
val hostPartColumns = withResource(
new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ =>
val hostColumns = withResource(partitionColumns) { _ =>
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHostAsync(Cuda.DEFAULT_STREAM))
}
}
}
withResource(hostPartColumns) { _ =>
withResource(new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, memCopyTime)) { _ =>
closeOnExcept(hostColumns) { _ =>
Cuda.DEFAULT_STREAM.sync()
}
hostColumns
}

withResource(hostPartColumns) { _ =>
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())

Expand Down Expand Up @@ -248,13 +252,13 @@ trait GpuPartitioning extends Partitioning {
}
}

private var memCopyTime: Option[GpuMetric] = None
private var memCopyTime: GpuMetric = NoopMetric

/**
* Setup sub-metrics for the performance debugging of GpuPartition. This method is expected to
* be called at the query planning stage. Therefore, this method is NOT thread safe.
*/
def setupDebugMetrics(metrics: Map[String, GpuMetric]): Unit = {
memCopyTime = metrics.get(GpuPartitioning.CopyToHostTime).getOrElse(NoopMetric)
metrics.get(GpuMetric.COPY_TO_HOST_TIME).foreach(memCopyTime = _)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ abstract class GpuShuffleExchangeExecBase(
PARTITION_SIZE -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_PARTITION_SIZE),
NUM_PARTITIONS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_PARTITIONS),
NUM_OUTPUT_ROWS -> createMetric(ESSENTIAL_LEVEL, DESCRIPTION_NUM_OUTPUT_ROWS),
NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES)
NUM_OUTPUT_BATCHES -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_OUTPUT_BATCHES),
COPY_TO_HOST_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_COPY_TO_HOST_TIME)
) ++ additionalMetrics

override def nodeName: String = "GpuColumnarExchange"
Expand Down Expand Up @@ -294,8 +295,6 @@ object GpuShuffleExchangeExecBase {
val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time"
val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime"
val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time"
val METRIC_COPY_TO_HOST_TIME = GpuPartitioning.CopyToHostTime
val METRIC_DESC_COPY_TO_HOST_TIME = "RAPIDS shuffle DeviceToHost copy time"

def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map(
// dataSize and dataReadSize are uncompressed, one is on write and the other on read
Expand Down Expand Up @@ -324,9 +323,7 @@ object GpuShuffleExchangeExecBase {
METRIC_SHUFFLE_SER_COPY_HEADER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME),
METRIC_SHUFFLE_SER_COPY_BUFFER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME),
METRIC_COPY_TO_HOST_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_COPY_TO_HOST_TIME)
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME)
)

def prepareBatchShuffleDependency(
Expand Down Expand Up @@ -371,7 +368,7 @@ object GpuShuffleExchangeExecBase {
// Inject debugging subMetrics, such as D2HTime before SliceOnCpu
// The injected metrics will be serialized as the members of GpuPartitioning
partitioner match {
case pt: GpuPartitioning => pt.setupDebugMetrics(additionalMetrics)
case pt: GpuPartitioning => pt.setupDebugMetrics(metrics)
case _ =>
}
val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
Expand Down

0 comments on commit 505b7e8

Please sign in to comment.