Skip to content

Commit

Permalink
d1
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Jul 4, 2024
1 parent b52038e commit b4ea48f
Showing 1 changed file with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,42 @@ trait GpuPartitioning extends Partitioning {
def sliceInternalOnGpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
val batches = if (numRows > 0) {
// The first index will always be 0, so we need to skip it.
val parts = partitionIndexes.slice(1, partitionIndexes.length)
closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits =>
val contiguousTables = withResource(partitionColumns) { _ =>
withResource(new Table(partitionColumns.map(_.getBase).toArray: _*)) { table =>
withRetryNoSplit(table.contiguousSplit(parts: _*))
}
}
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables)
case None =>
// GpuPackedTableColumn takes ownership of the contiguous tables
closeOnExcept(contiguousTables) { cts =>
cts.foreach { ct => splits.append(GpuPackedTableColumn.from(ct)) }
withResource(new NvtxRange("Split Compress", NvtxColor.GREEN)) { _ =>
// The first index will always be 0, so we need to skip it.
val parts = partitionIndexes.slice(1, partitionIndexes.length)
closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits =>
val contiguousTables = withResource(partitionColumns) { _ =>
withResource(new Table(partitionColumns.map(_.getBase).toArray: _*)) { table =>
withRetryNoSplit(table.contiguousSplit(parts: _*))
}
}
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables)
case None =>
// GpuPackedTableColumn takes ownership of the contiguous tables
closeOnExcept(contiguousTables) { cts =>
cts.foreach { ct => splits.append(GpuPackedTableColumn.from(ct)) }
}
}
// synchronize our stream to ensure we have caught up with contiguous split
// as downstream consumers (RapidsShuffleManager) will add hundreds of buffers
// to the spill framework, this makes it so here we synchronize once.
Cuda.DEFAULT_STREAM.sync()
splits.toArray
}
// synchronize our stream to ensure we have caught up with contiguous split
// as downstream consumers (RapidsShuffleManager) will add hundreds of buffers
// to the spill framework, this makes it so here we synchronize once.
Cuda.DEFAULT_STREAM.sync()
splits.toArray
}
} else {
Array[ColumnarBatch]()
}

if (_isSerdeOnGPU) {
closeOnExcept(moveToHostAndClose(batches)) { hostBatches =>
// All the data should be on host now for shuffle, leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(TaskContext.get())
hostBatches
withResource(new NvtxRange("Buffering toHost", NvtxColor.BLUE)) { _ =>
closeOnExcept(moveToHostAndClose(batches)) { hostBatches =>
// All the data should be on host now for shuffle, leaving GPU for a while.
GpuSemaphore.releaseIfNecessary(TaskContext.get())
hostBatches
}
}
} else {
batches
Expand Down

0 comments on commit b4ea48f

Please sign in to comment.