Skip to content

Commit

Permalink
Some small fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Apr 22, 2024
1 parent 1e00d59 commit 7f61274
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimizeWriteExchange.scala
* in the Delta Lake project at https://github.com/delta-io/delta
Expand Down Expand Up @@ -97,8 +97,10 @@ case class GpuOptimizeWriteExchangeExec(
) ++ additionalMetrics
}

private lazy val serializer: Serializer =
new GpuColumnarBatchSerializer(gpuLongMetric("dataSize"))
private lazy val sparkTypes: Array[DataType] = child.output.map(_.dataType).toArray

private lazy val serializer: Serializer = new GpuColumnarBatchSerializer(
gpuLongMetric("dataSize"), partitioning.serializingOnGPU, sparkTypes)

@transient lazy val inputRDD: RDD[ColumnarBatch] = child.executeColumnar()

Expand All @@ -116,7 +118,7 @@ case class GpuOptimizeWriteExchangeExec(
inputRDD,
child.output,
partitioning,
child.output.map(_.dataType).toArray,
sparkTypes,
serializer,
useGPUShuffle=partitioning.usesGPUShuffle,
useMultiThreadedShuffle=partitioning.usesMultiThreadedShuffle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,24 @@ class SerializedBatchIterator(dIn: DataInputStream) extends Iterator[(Int, Colum
*
* @note The RAPIDS shuffle does not use this code.
*/
class GpuColumnarBatchSerializer(dataSize: GpuMetric, serializingOnGpu: Boolean = false,
class GpuColumnarBatchSerializer(dataSize: GpuMetric, isSerializedTable: Boolean = false,
sparkTypes: Array[DataType] = Array.empty) extends Serializer with Serializable {
override def newInstance(): SerializerInstance =
new GpuColumnarBatchSerializerInstance(dataSize, serializingOnGpu, sparkTypes)
new GpuColumnarBatchSerializerInstance(dataSize, isSerializedTable, sparkTypes)
override def supportsRelocationOfSerializedObjects: Boolean = true
}

private class GpuColumnarBatchSerializerInstance(
dataSize: GpuMetric,
serializingOnGpu: Boolean,
isSerializedTable: Boolean,
sparkTypes: Array[DataType]) extends SerializerInstance {

private lazy val tableSerializer = new SimpleTableSerializer(sparkTypes)

override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream {
private[this] val dOut = new DataOutputStream(new BufferedOutputStream(out))

private def serializeBatchOnCPU(batch: ColumnarBatch): Unit = {
private def serializeCpuBatch(batch: ColumnarBatch): Unit = {
val numCols = batch.numCols()
if (numCols > 0) {
withResource(new ArrayBuffer[AutoCloseable]()) { toClose =>
Expand Down Expand Up @@ -173,7 +173,7 @@ private class GpuColumnarBatchSerializerInstance(
}
}

private def serializeBatchOnGPU(batch: ColumnarBatch): Unit = {
private def serializeGpuBatch(batch: ColumnarBatch): Unit = {
if (batch.numCols() > 0) {
batch.column(0) match {
case packTable: GpuPackedTableColumn =>
Expand All @@ -190,10 +190,10 @@ private class GpuColumnarBatchSerializerInstance(
}
}

private lazy val serializeBatch: ColumnarBatch => Unit = if (serializingOnGpu) {
serializeBatchOnGPU
private lazy val serializeBatch: ColumnarBatch => Unit = if (isSerializedTable) {
serializeGpuBatch
} else {
serializeBatchOnCPU
serializeCpuBatch
}

override def writeValue[T: ClassTag](value: T): SerializationStream = {
Expand Down Expand Up @@ -278,7 +278,8 @@ private class GpuColumnarBatchSerializerInstance(
private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) {

private val P_MAGIC_CUDF: Int = 0x43554446
private val headerLen = 4 // the size in bytes of an Int
private val P_VERSION: Int = 0
private val headerLen = 8 // the size in bytes of two Ints for a header
private val tmpBuf = new Array[Byte](1024 * 64) // 64k

private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = {
Expand Down Expand Up @@ -315,6 +316,7 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) {

private def writeProtocolHeader(dOut: DataOutputStream): Unit = {
dOut.writeInt(P_MAGIC_CUDF)
dOut.writeInt(P_VERSION)
}

def writeRowsOnlyToStream(numRows: Int, dOut: DataOutputStream): Long = {
Expand All @@ -328,7 +330,7 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) {
}

def writeToStream(table: ContiguousTable, dOut: DataOutputStream): Long = {
// 1) header, now only a magic number, may add more as needed
// 1) header
writeProtocolHeader(dOut)
// 2) table metadata,
val tableMetaBuf = MetaUtils.buildTableMeta(0, table).getByteBuffer
Expand All @@ -343,10 +345,15 @@ private[rapids] class SimpleTableSerializer(sparkTypes: Array[DataType]) {
}

private def readProtocolHeader(dIn: DataInputStream): Unit = {
val num = dIn.readInt()
if (num != P_MAGIC_CUDF) {
val magicNum = dIn.readInt()
if (magicNum != P_MAGIC_CUDF) {
throw new IllegalStateException(s"Expected magic number $P_MAGIC_CUDF for " +
s"table serializer, but got $num")
s"table serializer, but got $magicNum")
}
val version = dIn.readInt()
if (version != P_VERSION) {
throw new IllegalStateException(s"Version mismatch: expected $P_VERSION for " +
s"table serializer, but got $version")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ object GpuShuffleEnv extends Logging {
def serializingOnGpu(conf: RapidsConf): Boolean = {
// Serializing on GPU for CPU shuffle does not support compression yet.
conf.isSerializingOnGpu &&
conf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) == "none"
conf.shuffleCompressionCodec.toLowerCase(Locale.ROOT) == "none" &&
(!useGPUShuffle(conf))
}

def getCatalog: ShuffleBufferCatalog = if (env == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,6 @@ abstract class GpuShuffleExchangeExecBase(
}
}

private lazy val serializingOnGPU = {
gpuOutputPartitioning match {
case gpuPartitioning: GpuPartitioning => gpuPartitioning.serializingOnGPU
case _ => false
}
}

// Shuffle produces a lot of small output batches that should be coalesced together.
// This coalesce occurs on the GPU and should always be done when using RAPIDS shuffle,
// when it is under UCX or CACHE_ONLY modes.
Expand Down Expand Up @@ -238,7 +231,7 @@ abstract class GpuShuffleExchangeExecBase(
// This value must be lazy because the child's output may not have been resolved
// yet in all cases.
private lazy val serializer: Serializer = new GpuColumnarBatchSerializer(
gpuLongMetric("dataSize"), serializingOnGPU, sparkTypes)
gpuLongMetric("dataSize"), gpuOutputPartitioning.serializingOnGPU, sparkTypes)

@transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar()

Expand Down

0 comments on commit 7f61274

Please sign in to comment.