From e14dc4cccf7f8405e0c95456a282782481dbba49 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Mon, 17 Jun 2024 13:10:05 +0800 Subject: [PATCH] Shuffle gpu serde (#21) * Support serializing packed tables directly for shuffle write --------- Signed-off-by: Firestarman * Disble GPU serde for the AQE tests Signed-off-by: Firestarman * Disable by default Signed-off-by: Firestarman * Fix a build error Signed-off-by: Firestarman * Address comments Signed-off-by: Firestarman * Support buffering small tables for Shuffle read Signed-off-by: Firestarman --------- Signed-off-by: Firestarman Co-authored-by: Liangcai Li --- .../delta/GpuOptimizeWriteExchangeExec.scala | 17 +- .../rapids/PackedTableHostColumnVector.java | 175 ++++++++++ .../spark/rapids/GpuCoalesceBatches.scala | 8 +- .../rapids/GpuColumnarBatchSerializer.scala | 162 +++++---- .../nvidia/spark/rapids/GpuPartitioning.scala | 62 +++- .../nvidia/spark/rapids/GpuTableSerde.scala | 330 ++++++++++++++++++ .../spark/rapids/GpuTransitionOverrides.scala | 11 +- .../rapids/NvcompLZ4CompressionCodec.scala | 29 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 16 + .../spark/sql/rapids/GpuShuffleEnv.scala | 4 + .../GpuShuffleExchangeExecBase.scala | 35 +- .../spark/rapids/GpuPartitioningSuite.scala | 117 +++++-- .../rapids/GpuSinglePartitioningSuite.scala | 35 +- .../rapids/ShufflePartitionerRetrySuite.scala | 9 +- 14 files changed, 838 insertions(+), 172 deletions(-) create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala diff --git a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala index 1a9936ea808..dad939fa6b2 100644 --- a/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala +++ b/delta-lake/common/src/main/databricks/scala/org/apache/spark/sql/rapids/delta/GpuOptimizeWriteExchangeExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * This file was derived from OptimizeWriteExchange.scala * in the Delta Lake project at https://github.com/delta-io/delta @@ -26,7 +26,7 @@ import scala.concurrent.Future import scala.concurrent.duration.Duration import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf -import com.nvidia.spark.rapids.{GpuColumnarBatchSerializer, GpuExec, GpuMetric, GpuPartitioning, GpuRoundRobinPartitioning} +import com.nvidia.spark.rapids.{GpuColumnarBatchSerializer, GpuExec, GpuMetric, GpuPartitioning, GpuRoundRobinPartitioning, RapidsConf} import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf import org.apache.spark.{MapOutputStatistics, ShuffleDependency} @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionS import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} +import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ThreadUtils @@ -84,6 +85,8 @@ case class GpuOptimizeWriteExchangeExec( createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle combine time"), "rapidsShuffleWriteIoTime" -> createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle write io time"), + "rapidsShufflePartitionTime" -> + createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle partition time"), "rapidsShuffleReadTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL, "rs. shuffle read time") ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) @@ -97,8 +100,12 @@ case class GpuOptimizeWriteExchangeExec( ) ++ additionalMetrics } - private lazy val serializer: Serializer = - new GpuColumnarBatchSerializer(gpuLongMetric("dataSize")) + private lazy val sparkTypes: Array[DataType] = child.output.map(_.dataType).toArray + + private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( + gpuLongMetric("dataSize"), allMetrics("rapidsShuffleSerializationTime"), + allMetrics("rapidsShuffleDeserializationTime"), partitioning.serdeOnGPU, + sparkTypes, new RapidsConf(conf).gpuTargetBatchSizeBytes) @transient lazy val inputRDD: RDD[ColumnarBatch] = child.executeColumnar() @@ -116,7 +123,7 @@ case class GpuOptimizeWriteExchangeExec( inputRDD, child.output, partitioning, - child.output.map(_.dataType).toArray, + sparkTypes, serializer, useGPUShuffle=partitioning.usesGPUShuffle, useMultiThreadedShuffle=partitioning.usesMultiThreadedShuffle, diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java new file mode 100644 index 00000000000..b1bc794d209 --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/PackedTableHostColumnVector.java @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids; + +import ai.rapids.cudf.ContiguousTable; +import ai.rapids.cudf.DeviceMemoryBuffer; +import ai.rapids.cudf.HostMemoryBuffer; +import com.nvidia.spark.rapids.format.TableMeta; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A column vector that tracks a packed (or compressed) table on host. Unlike a normal + * host column vector, the columnar data within cannot be accessed directly. + * This is intended to only be used during shuffle after the data is partitioned and + * before it is serialized. + */ +public final class PackedTableHostColumnVector extends ColumnVector { + + private static final String BAD_ACCESS_MSG = "Column is packed"; + + private final TableMeta tableMeta; + private final HostMemoryBuffer tableBuffer; + + PackedTableHostColumnVector(TableMeta tableMeta, HostMemoryBuffer tableBuffer) { + super(DataTypes.NullType); + long rows = tableMeta.rowCount(); + int batchRows = (int) rows; + if (rows != batchRows) { + throw new IllegalStateException("Cannot support a batch larger that MAX INT rows"); + } + this.tableMeta = tableMeta; + this.tableBuffer = tableBuffer; + } + + private static ColumnarBatch from(TableMeta meta, DeviceMemoryBuffer devBuf) { + HostMemoryBuffer tableBuf; + try(HostMemoryBuffer buf = HostMemoryBuffer.allocate(devBuf.getLength())) { + buf.copyFromDeviceBuffer(devBuf); + buf.incRefCount(); + tableBuf = buf; + } + ColumnVector column = new PackedTableHostColumnVector(meta, tableBuf); + return new ColumnarBatch(new ColumnVector[] { column }, (int) meta.rowCount()); + } + + /** Both the input table and output batch should be closed. */ + public static ColumnarBatch from(CompressedTable table) { + return from(table.meta(), table.buffer()); + } + + /** Both the input table and output batch should be closed. */ + public static ColumnarBatch from(ContiguousTable table) { + return from(MetaUtils.buildTableMeta(0, table), table.getBuffer()); + } + + /** Returns true if this columnar batch uses a packed table on host */ + public static boolean isBatchPackedOnHost(ColumnarBatch batch) { + return batch.numCols() == 1 && batch.column(0) instanceof PackedTableHostColumnVector; + } + + public TableMeta getTableMeta() { + return tableMeta; + } + + public HostMemoryBuffer getTableBuffer() { + return tableBuffer; + } + + @Override + public void close() { + if (tableBuffer != null) { + tableBuffer.close(); + } + } + + @Override + public boolean hasNull() { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public int numNulls() { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public boolean isNullAt(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public boolean getBoolean(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public byte getByte(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public short getShort(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public int getInt(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public long getLong(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public float getFloat(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public double getDouble(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public UTF8String getUTF8String(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public byte[] getBinary(int rowId) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } + + @Override + public ColumnVector getChild(int ordinal) { + throw new IllegalStateException(BAD_ACCESS_MSG); + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index e6dc216d7e6..2d3d5dbcbf7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -462,7 +462,7 @@ abstract class AbstractGpuCoalesceIterator( // If we have reached the cuDF limit once, proactively filter batches // after that first limit is reached. GpuFilter.filterAndClose(cbFromIter, inputFilterTier.get, - NoopMetric, NoopMetric, opTime) + NoopMetric, NoopMetric, NoopMetric) } else { Iterator(cbFromIter) } @@ -499,7 +499,7 @@ abstract class AbstractGpuCoalesceIterator( var filteredBytes = 0L if (hasAnyToConcat) { val filteredDowIter = GpuFilter.filterAndClose(concatAllAndPutOnGPU(), - filterTier, NoopMetric, NoopMetric, opTime) + filterTier, NoopMetric, NoopMetric, NoopMetric) while (filteredDowIter.hasNext) { closeOnExcept(filteredDowIter.next()) { filteredDownCb => filteredNumRows += filteredDownCb.numRows() @@ -512,7 +512,7 @@ abstract class AbstractGpuCoalesceIterator( // filterAndClose takes ownership of CB so we should not close it on a failure // anymore... val filteredCbIter = GpuFilter.filterAndClose(cb.release, filterTier, - NoopMetric, NoopMetric, opTime) + NoopMetric, NoopMetric, NoopMetric) while (filteredCbIter.hasNext) { closeOnExcept(filteredCbIter.next()) { filteredCb => val filteredWouldBeRows = filteredNumRows + filteredCb.numRows() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index 049f3f21bcf..4a58402e494 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,16 +25,15 @@ import scala.reflect.ClassTag import ai.rapids.cudf.{HostColumnVector, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.SerializedTableHeader import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} -import org.apache.spark.sql.types.NullType -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.types.{DataType, NullType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector => SparkColumnVector} -class SerializedBatchIterator(dIn: DataInputStream) - extends Iterator[(Int, ColumnarBatch)] { +class SerializedBatchIterator(dIn: DataInputStream, deserTime: GpuMetric +) extends Iterator[(Int, ColumnarBatch)] { private[this] var nextHeader: Option[SerializedTableHeader] = None private[this] var toBeReturned: Option[ColumnarBatch] = None private[this] var streamClosed: Boolean = false @@ -90,14 +89,16 @@ class SerializedBatchIterator(dIn: DataInputStream) } override def hasNext: Boolean = { - tryReadNextHeader() + deserTime.ns(tryReadNextHeader()) nextHeader.isDefined } override def next(): (Int, ColumnarBatch) = { if (toBeReturned.isEmpty) { - tryReadNextHeader() - toBeReturned = tryReadNext() + deserTime.ns { + tryReadNextHeader() + toBeReturned = tryReadNext() + } if (nextHeader.isEmpty || toBeReturned.isEmpty) { throw new NoSuchElementException("Walked off of the end...") } @@ -108,9 +109,12 @@ class SerializedBatchIterator(dIn: DataInputStream) (0, ret) } } + /** - * Serializer for serializing `ColumnarBatch`s for use during normal shuffle. + * Serializer for serializing `ColumnarBatch`s for use during normal shuffle. And it supports + * two types of batch as input. * + * Type 1 (isSerializedTable == false): * The serialization write path takes the cudf `Table` that is described by the `ColumnarBatch` * and uses cudf APIs to serialize the data into a sequence of bytes on the host. The data is * returned to the Spark shuffle code where it is compressed by the CPU and written to disk. @@ -122,69 +126,47 @@ class SerializedBatchIterator(dIn: DataInputStream) * custom batches of [[SerializedTableColumn]]. [[GpuShuffleCoalesceExec]] coalesces the smaller * shuffle partitions into larger tables before placing them on the GPU for further processing. * + * Type 2 (isSerializedTable == true) + * The table inside an input ColumnBatch is already serialized and placed on host by upstream + * operators. So the serializer writes it to the output stream directly, along with a + * lightweight metadata. (a serializable TableMeta). + * + * The deserialization path will read the metadata and serialized table back and move them + * to device to rebuild the cudf table. During query planning, each GPU columnar shuffle + * exchange is followed by a [[GpuCoalesceBatches]] that expects to receive the returned + * batches, decompress them when needed, and coalesces the smaller shuffle partitions into + * larger tables before sending them to downstream operators for further processing. + * * @note The RAPIDS shuffle does not use this code. */ -class GpuColumnarBatchSerializer(dataSize: GpuMetric) - extends Serializer with Serializable { +class GpuColumnarBatchSerializer(dataSize: GpuMetric, + serTime: GpuMetric = NoopMetric, + deserTime: GpuMetric = NoopMetric, + isSerializedTable: Boolean = false, + sparkTypes: Array[DataType] = Array.empty, + bundleSize: Long = 0L) extends Serializer with Serializable { override def newInstance(): SerializerInstance = - new GpuColumnarBatchSerializerInstance(dataSize) + new GpuColumnarBatchSerializerInstance(dataSize, serTime, deserTime, + isSerializedTable, sparkTypes, bundleSize) override def supportsRelocationOfSerializedObjects: Boolean = true } -private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends SerializerInstance { +private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric, serTime: GpuMetric, + deserTime: GpuMetric, isSerializedTable: Boolean, sparkTypes: Array[DataType], + bundleSize: Long) extends SerializerInstance { override def serializeStream(out: OutputStream): SerializationStream = new SerializationStream { - private[this] val dOut: DataOutputStream = - new DataOutputStream(new BufferedOutputStream(out)) + private[this] val dOut = new DataOutputStream(new BufferedOutputStream(out)) + private[this] val tableSerializer = new PackedTableSerializer() - override def writeValue[T: ClassTag](value: T): SerializationStream = { - val batch = value.asInstanceOf[ColumnarBatch] - val numColumns = batch.numCols() - val columns: Array[HostColumnVector] = new Array(numColumns) - val toClose = new ArrayBuffer[AutoCloseable]() - try { - var startRow = 0 - val numRows = batch.numRows() - if (batch.numCols() > 0) { - val firstCol = batch.column(0) - if (firstCol.isInstanceOf[SlicedGpuColumnVector]) { - // We don't have control over ColumnarBatch to put in the slice, so we have to do it - // for each column. In this case we are using the first column. - startRow = firstCol.asInstanceOf[SlicedGpuColumnVector].getStart - for (i <- 0 until numColumns) { - columns(i) = batch.column(i).asInstanceOf[SlicedGpuColumnVector].getBase - } - } else { - for (i <- 0 until numColumns) { - batch.column(i) match { - case gpu: GpuColumnVector => - val cpu = gpu.copyToHost() - toClose += cpu - columns(i) = cpu.getBase - case cpu: RapidsHostColumnVector => - columns(i) = cpu.getBase - } - } - } + private lazy val serializeBatch: ColumnarBatch => Unit = if (isSerializedTable) { + serializeGpuBatch + } else { + serializeCpuBatch + } - dataSize += JCudfSerialization.getSerializedSizeInBytes(columns, startRow, numRows) - val range = new NvtxRange("Serialize Batch", NvtxColor.YELLOW) - try { - JCudfSerialization.writeToStream(columns, dOut, startRow, numRows) - } finally { - range.close() - } - } else { - val range = new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW) - try { - JCudfSerialization.writeRowsToStream(dOut, numRows) - } finally { - range.close() - } - } - } finally { - toClose.safeClose() - } + override def writeValue[T: ClassTag](value: T): SerializationStream = { + serTime.ns(withResource(value.asInstanceOf[ColumnarBatch])(serializeBatch)) this } @@ -212,15 +194,65 @@ private class GpuColumnarBatchSerializerInstance(dataSize: GpuMetric) extends Se override def close(): Unit = { dOut.close() } - } + private def serializeCpuBatch(batch: ColumnarBatch): Unit = { + val numRows = batch.numRows() + val numCols = batch.numCols() + if (numCols > 0) { + withResource(new ArrayBuffer[AutoCloseable]()) { toClose => + var startRow = 0 + val cols = closeOnExcept(batch) { _ => + val toHostCol: SparkColumnVector => HostColumnVector = batch.column(0) match { + case sliced: SlicedGpuColumnVector => + // We don't have control over ColumnarBatch to put in the slice, so we have + // to do it for each column. In this case we are using the first column. + startRow = sliced.getStart + col => col.asInstanceOf[SlicedGpuColumnVector].getBase + case _: GpuColumnVector => + col => { + val hCol = col.asInstanceOf[GpuColumnVector].copyToHost() + toClose += hCol + hCol.getBase + } + case _: RapidsHostColumnVector => + col => col.asInstanceOf[RapidsHostColumnVector].getBase + } + (0 until numCols).map(i => toHostCol(batch.column(i))).toArray + } + dataSize += JCudfSerialization.getSerializedSizeInBytes(cols, startRow, numRows) + withResource(new NvtxRange("Serialize Batch", NvtxColor.YELLOW)) { _ => + JCudfSerialization.writeToStream(cols, dOut, startRow, numRows) + } + } + } else { // Rows only batch + withResource(new NvtxRange("Serialize Row Only Batch", NvtxColor.YELLOW)) { _ => + JCudfSerialization.writeRowsToStream(dOut, numRows) + } + } + } + + private def serializeGpuBatch(batch: ColumnarBatch): Unit = { + val packedCol = if (batch.numCols() == 0) { + val tableMeta = MetaUtils.buildDegenerateTableMeta(batch) + new PackedTableHostColumnVector(tableMeta, null) + } else { + require(PackedTableHostColumnVector.isBatchPackedOnHost(batch)) + batch.column(0).asInstanceOf[PackedTableHostColumnVector] + } + dataSize += tableSerializer.writeToStream(packedCol, dOut) + } + } override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { private[this] val dIn: DataInputStream = new DataInputStream(new BufferedInputStream(in)) override def asKeyValueIterator: Iterator[(Int, ColumnarBatch)] = { - new SerializedBatchIterator(dIn) + if (isSerializedTable) { + new PackedTableIterator(dIn, sparkTypes, bundleSize, deserTime) + } else { + new SerializedBatchIterator(dIn, deserTime) + } } override def asIterator: Iterator[Any] = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index c82e9a97656..4f54a29bf29 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,11 +35,13 @@ object GpuPartitioning { } trait GpuPartitioning extends Partitioning { - private[this] val (maxCompressionBatchSize, _useGPUShuffle, _useMultiThreadedShuffle) = { + private[this] val (maxCompressionBatchSize, _useGPUShuffle, _useMultiThreadedShuffle, + _isSerdeOnGPU) = { val rapidsConf = new RapidsConf(SQLConf.get) (rapidsConf.shuffleCompressionMaxBatchMemory, GpuShuffleEnv.useGPUShuffle(rapidsConf), - GpuShuffleEnv.useMultiThreadedShuffle(rapidsConf)) + GpuShuffleEnv.useMultiThreadedShuffle(rapidsConf), + GpuShuffleEnv.isSerdeOnGpu(rapidsConf)) } final def columnarEval(batch: ColumnarBatch): GpuColumnVector = { @@ -51,6 +53,30 @@ trait GpuPartitioning extends Partitioning { def usesMultiThreadedShuffle: Boolean = _useMultiThreadedShuffle + def serdeOnGPU: Boolean = _isSerdeOnGPU + + private lazy val toPackedBatch: ContiguousTable => ColumnarBatch = + if (_isSerdeOnGPU) { + table => + withResource(new NvtxRange("Table to Host", NvtxColor.BLUE)) { _ => + withResource(table) { _ => + PackedTableHostColumnVector.from(table) + } + } + } else { + GpuPackedTableColumn.from + } + + private lazy val toCompressedBatch: CompressedTable => ColumnarBatch = + if (_isSerdeOnGPU) { + table => + withResource(new NvtxRange("Table to Host", NvtxColor.BLUE)) { _ => + PackedTableHostColumnVector.from(table) + } + } else { + GpuCompressedColumnVector.from + } + def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = { var ret: ColumnarBatch = null val count = end - start @@ -63,24 +89,23 @@ trait GpuPartitioning extends Partitioning { def sliceInternalOnGpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = { - // The first index will always be 0, so we need to skip it. val batches = if (numRows > 0) { + // The first index will always be 0, so we need to skip it. val parts = partitionIndexes.slice(1, partitionIndexes.length) closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits => val contiguousTables = withResource(partitionColumns) { _ => withResource(new Table(partitionColumns.map(_.getBase).toArray: _*)) { table => - table.contiguousSplit(parts: _*) + withRetryNoSplit(table.contiguousSplit(parts: _*)) } } GpuShuffleEnv.rapidsShuffleCodec match { case Some(codec) => compressSplits(splits, codec, contiguousTables) case None => - // GpuPackedTableColumn takes ownership of the contiguous tables - closeOnExcept(contiguousTables) { cts => - cts.foreach { ct => splits.append(GpuPackedTableColumn.from(ct)) } - } + // ColumnarBatch takes ownership of the contiguous tables + closeOnExcept(contiguousTables)(_.foreach(ct => splits.append(toPackedBatch(ct)))) } + // synchronize our stream to ensure we have caught up with contiguous split // as downstream consumers (RapidsShuffleManager) will add hundreds of buffers // to the spill framework, this makes it so here we synchronize once. @@ -91,7 +116,10 @@ trait GpuPartitioning extends Partitioning { Array[ColumnarBatch]() } - GpuSemaphore.releaseIfNecessary(TaskContext.get()) + if (_isSerdeOnGPU) { + // All the data should be on host now for shuffle, leaving GPU for a while. + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + } batches } @@ -175,7 +203,7 @@ trait GpuPartitioning extends Partitioning { def sliceInternalGpuOrCpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[(ColumnarBatch, Int)] = { - val sliceOnGpu = usesGPUShuffle + val sliceOnGpu = usesGPUShuffle || _isSerdeOnGPU val nvtxRangeKey = if (sliceOnGpu) { "sliceInternalOnGpu" } else { @@ -211,8 +239,9 @@ trait GpuPartitioning extends Partitioning { // add each table either to the batch to be compressed or to the empty batch tracker contiguousTables.zipWithIndex.foreach { case (ct, i) => - if (ct.getRowCount == 0) { - emptyBatches.append((GpuPackedTableColumn.from(ct), i)) + // Buffer is empty when no rows or all rows are null, so check the buffer directly. + if (ct.getBuffer == null || ct.getBuffer.getLength == 0) { + emptyBatches.append((toPackedBatch(ct), i)) } else { compressor.addTableToCompress(ct) } @@ -226,18 +255,15 @@ trait GpuPartitioning extends Partitioning { // add any compressed batches that need to appear before the next empty batch val numCompressedToAdd = emptyOutputIndex - outputIndex (0 until numCompressedToAdd).foreach { _ => - val compressedTable = compressedTables(compressedTableIndex) - outputBatches.append(GpuCompressedColumnVector.from(compressedTable)) + outputBatches.append(toCompressedBatch(compressedTables(compressedTableIndex))) compressedTableIndex += 1 } outputBatches.append(emptyBatch) outputIndex = emptyOutputIndex + 1 } - // add any compressed batches that remain after the last empty batch (compressedTableIndex until compressedTables.length).foreach { i => - val ct = compressedTables(i) - outputBatches.append(GpuCompressedColumnVector.from(ct)) + outputBatches.append(toCompressedBatch(compressedTables(i))) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala new file mode 100644 index 00000000000..32099e24f5b --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTableSerde.scala @@ -0,0 +1,330 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.io.{DataInputStream, DataOutputStream, EOFException} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import ai.rapids.cudf.{DeviceMemoryBuffer, HostMemoryBuffer, NvtxColor, NvtxRange} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableColumn, AutoCloseableSeq} +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion +import com.nvidia.spark.rapids.format.TableMeta + +import org.apache.spark.TaskContext +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.vectorized.ColumnarBatch + +private sealed trait TableSerde { + protected val P_MAGIC_NUM: Int = 0x43554447 // "CUDF".asInt + 1 + protected val P_VERSION: Int = 0 + protected val headerLen = 8 // the size in bytes of two Ints for a header + + // buffers for reuse, so it should be only one instance of this trait per thread. + protected val tmpBuf = new Array[Byte](1024 * 64) // 64k +} + +private[rapids] class PackedTableSerializer extends TableSerde { + private def writeByteBufferToStream(bBuf: ByteBuffer, dOut: DataOutputStream): Unit = { + // Write the buffer size first + val bufLen = bBuf.capacity() + dOut.writeLong(bufLen.toLong) + if (bBuf.hasArray) { + dOut.write(bBuf.array()) + } else { // Probably a direct buffer + var leftLen = bufLen + while (leftLen > 0) { + val copyLen = Math.min(tmpBuf.length, leftLen) + bBuf.get(tmpBuf, 0, copyLen) + dOut.write(tmpBuf, 0, copyLen) + leftLen -= copyLen + } + } + } + + private def writeHostBufferToStream(hBuf: HostMemoryBuffer, dOut: DataOutputStream): Unit = { + // Write the buffer size first + val bufLen = hBuf.getLength + dOut.writeLong(bufLen) + var leftLen = bufLen + var hOffset = 0L + while (leftLen > 0L) { + val copyLen = Math.min(tmpBuf.length, leftLen) + hBuf.getBytes(tmpBuf, 0, hOffset, copyLen) + dOut.write(tmpBuf, 0, copyLen.toInt) + leftLen -= copyLen + hOffset += copyLen + } + } + + private def writeProtocolHeader(dOut: DataOutputStream): Unit = { + dOut.writeInt(P_MAGIC_NUM) + dOut.writeInt(P_VERSION) + } + + def writeToStream(hostTbl: PackedTableHostColumnVector, dOut: DataOutputStream): Long = { + withResource(new NvtxRange("Serialize Host Table", NvtxColor.RED)) { _ => + // In the order of 1) header, 2) table metadata, 3) table data on host + val metaBuf = hostTbl.getTableMeta.getByteBuffer + val dataBuf = hostTbl.getTableBuffer + var writtenLen = headerLen.toLong + metaBuf.capacity() + writeProtocolHeader(dOut) + writeByteBufferToStream(metaBuf, dOut) + if (dataBuf != null) { + writeHostBufferToStream(dataBuf, dOut) + writtenLen += dataBuf.getLength + } + writtenLen + } + } +} + +private[rapids] class PackedTableDeserializer extends TableSerde { + private def readProtocolHeader(dIn: DataInputStream): Unit = { + val magicNum = dIn.readInt() + if (magicNum != P_MAGIC_NUM) { + throw new IllegalStateException(s"Expected magic number $P_MAGIC_NUM for " + + s"table serializer, but got $magicNum") + } + val version = dIn.readInt() + if (version != P_VERSION) { + throw new IllegalStateException(s"Version mismatch: expected $P_VERSION for " + + s"table serializer, but got $version") + } + } + + private def readByteBufferFromStream(dIn: DataInputStream): ByteBuffer = { + val bufLen = dIn.readLong().toInt + val bufArray = new Array[Byte](bufLen) + var readLen = 0 + // A single call to read(bufArray) can not always read the expected length. So + // we do it here ourselves. + do { + val ret = dIn.read(bufArray, readLen, bufLen - readLen) + if (ret < 0) { + throw new EOFException() + } + readLen += ret + } while (readLen < bufLen) + ByteBuffer.wrap(bufArray) + } + + private def readHostBufferFromStream(dIn: DataInputStream): HostMemoryBuffer = { + val bufLen = dIn.readLong() + closeOnExcept(HostMemoryBuffer.allocate(bufLen)) { hostBuf => + var leftLen = bufLen + var hOffset = 0L + while (leftLen > 0) { + val copyLen = Math.min(tmpBuf.length, leftLen) + val readLen = dIn.read(tmpBuf, 0, copyLen.toInt) + if (readLen < 0) { + throw new EOFException() + } + hostBuf.setBytes(hOffset, tmpBuf, 0, readLen) + hOffset += readLen + leftLen -= readLen + } + hostBuf + } + } + + def readFromStream(dIn: DataInputStream): PackedTableHostColumnVector = { + withResource(new NvtxRange("Read Host Table", NvtxColor.ORANGE)) { _ => + // 1) read and check header + readProtocolHeader(dIn) + // 2) read table metadata + val tableMeta = TableMeta.getRootAsTableMeta(readByteBufferFromStream(dIn)) + val hostDataBuf = if (tableMeta.packedMetaAsByteBuffer() == null) { + // no packed metadata, must be a table with zero columns, so no buffer + null + } else { + // 3) read table data + readHostBufferFromStream(dIn) + } + new PackedTableHostColumnVector(tableMeta, hostDataBuf) + } + } +} + +private[rapids] class PackedTableIterator(dIn: DataInputStream, sparkTypes: Array[DataType], + bundleSize: Long, deserTime: GpuMetric) extends Iterator[(Int, ColumnarBatch)] { + + private val tableDeserializer = new PackedTableDeserializer + private val bundleTargetSize = Math.max(bundleSize, 128 * 1024 * 1024L) // at least 128M + private val readTables: mutable.ListBuffer[PackedTableHostColumnVector] = + mutable.ListBuffer.empty + + private val curOffsetsAndMetas: mutable.Queue[(Long, TableMeta)] = mutable.Queue.empty + private var curTablesDeviceBuf: Option[DeviceMemoryBuffer] = None + private var closed = false + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + closeBuffer() + if (!closed) { + dIn.close() + } + } + } + + private def closeBuffer(): Unit = { + readTables.safeClose() + readTables.clear() + curTablesDeviceBuf.foreach(_.safeClose()) + curTablesDeviceBuf = None + } + + override def hasNext: Boolean = { + if (curOffsetsAndMetas.isEmpty) { + tryReadNextBundle() + } + curOffsetsAndMetas.nonEmpty + } + + override def next(): (Int, ColumnarBatch) = { + if (!hasNext) { + throw new NoSuchElementException() + } + (0, nextBatchFromBundle()) + } + + private def tryReadNextBundle(): Unit = { + if (closed) { + return + } + assert(curOffsetsAndMetas.isEmpty) + // IO operation is coming, so leave GPU for a while + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + var (accSize, accRows) = (0L, 0L) + readTables.foreach { p => + curOffsetsAndMetas.enqueue((accSize, p.getTableMeta)) + accSize += (if (p.getTableBuffer != null) p.getTableBuffer.getLength else 0L) + accRows += p.getTableMeta.rowCount() + } + try { + deserTime.ns { + while (!closed && accSize < bundleTargetSize && accRows < Int.MaxValue) { + val p = withResource(new NvtxRange("Read Table", NvtxColor.ORANGE)) { _ => + tableDeserializer.readFromStream(dIn) + } + // Always cache the read table to the queue, even the total size may + // go beyond the target size. But we stop reading the next one. + readTables.append(p) + val startPos = if (p.getTableBuffer != null) { + val preAccSize = accSize + accSize += p.getTableBuffer.getLength + preAccSize + } else { + -1L // Indicate a rows-only batch. Since 0 is a valid for an empty buffer. + } + accRows += p.getTableMeta.rowCount() + if (accSize <= bundleTargetSize && accRows <= Int.MaxValue) { + // Take it to the current status only when no size and rows number overflow. + curOffsetsAndMetas.enqueue((startPos, p.getTableMeta)) + } + } + } + } catch { + case _: EOFException => // we reach the end + dIn.close() + closed = true + } + } + + // Merge host buffers in the current bundle into a single big contiguous buffer. + // It requires the buffered tables are NOT rows-only. + private def getCurrentTablesHostBuf: HostMemoryBuffer = { + val numCurTables = curOffsetsAndMetas.length + withResource(readTables.take(numCurTables).toArray) { curTables => + readTables.remove(0, numCurTables) + if (curTables.length == 1) { + val ret = curTables.head.getTableBuffer + curTables(0) = null + ret + } else { + val totoSize = curTables.map(_.getTableBuffer.getLength).sum + closeOnExcept(HostMemoryBuffer.allocate(totoSize)) { bigHostBuf => + curTables.zipWithIndex.foreach { case (p, idx) => + withResource(p) { _ => + curTables(idx) = null + bigHostBuf.copyFromHostBuffer(curOffsetsAndMetas(idx)._1, p.getTableBuffer, + 0, p.getTableBuffer.getLength) + } + } + bigHostBuf + } + } + } + } + + private def nextBatchFromBundle(): ColumnarBatch = { + val (start, tableMeta) = curOffsetsAndMetas.head + if (start < 0) { + GpuSemaphore.acquireIfNecessary(TaskContext.get) + // Rows-only batches. Also acquires GPU semaphore because the downstream + // operators expect the batch producer already holds the semaphore and may + // generate empty batches. + deserTime.ns { + val rowsNum = curOffsetsAndMetas.map(_._2.rowCount()).sum + curOffsetsAndMetas.clear() + new ColumnarBatch(Array.empty, rowsNum.toInt) + } + } else { + if (curTablesDeviceBuf.isEmpty) { + // Refresh the device buffer by lazily moving buffered small tables from host + // with a single copying. + curTablesDeviceBuf = withResource(getCurrentTablesHostBuf) { tablesHostBuf => + // Begin to use GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get) + deserTime.ns { + withResource(new NvtxRange("Table to Device", NvtxColor.RED)) { _ => + closeOnExcept(DeviceMemoryBuffer.allocate(tablesHostBuf.getLength)) { devBuf => + devBuf.copyFromHostBuffer(tablesHostBuf) + Some(devBuf) + } + } + } + } + } + assert(curTablesDeviceBuf.isDefined, "The device buffer holding tables is missing") + deserTime.ns { + curOffsetsAndMetas.dequeue() + val end = curOffsetsAndMetas.headOption.map(_._1) + .getOrElse(curTablesDeviceBuf.get.getLength) + val ret = withResource(curTablesDeviceBuf.get.slice(start, end - start)) { sliced => + withResource(new NvtxRange("Deserialize Table", NvtxColor.YELLOW)) { _ => + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + MetaUtils.getBatchFromMeta(sliced, tableMeta, sparkTypes) + } else { + GpuCompressedColumnVector.from(sliced, tableMeta) + } + } + } + closeOnExcept(ret) { _ => + if (curOffsetsAndMetas.isEmpty) { + // All the tables on the current buffer are consumed, close the current buffer + curTablesDeviceBuf.foreach(_.safeClose()) + curTablesDeviceBuf = None + } + } + ret + } + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index eef083bb93d..8de42711fc8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -70,7 +70,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { /** Adds the appropriate coalesce after a shuffle depending on the type of shuffle configured */ private def addPostShuffleCoalesce(plan: SparkPlan): SparkPlan = { - if (GpuShuffleEnv.useGPUShuffle(rapidsConf)) { + if (GpuShuffleEnv.useGPUShuffle(rapidsConf) || GpuShuffleEnv.isSerdeOnGpu(rapidsConf)) { GpuCoalesceBatches(plan, TargetSize(rapidsConf.gpuTargetBatchSizeBytes)) } else { GpuShuffleCoalesceExec(plan, rapidsConf.gpuTargetBatchSizeBytes) @@ -520,8 +520,13 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match { case exec: GpuShuffleExchangeExecBase => // always follow a GPU shuffle with a shuffle coalesce - GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), - rapidsConf.gpuTargetBatchSizeBytes) + if (GpuShuffleEnv.isSerdeOnGpu(rapidsConf)) { + GpuCoalesceBatches(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), + TargetSize(rapidsConf.gpuTargetBatchSizeBytes)) + } else { + GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), + rapidsConf.gpuTargetBatchSizeBytes) + } case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce)) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala index d83e706fcfb..d45bf4dbfc6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvcompLZ4CompressionCodec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ContiguousTable, Cuda, DeviceMemoryBuffer, NvtxColor, NvtxRange} import ai.rapids.cudf.nvcomp.{BatchedLZ4Compressor, BatchedLZ4Decompressor} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.format.{BufferMeta, CodecType} /** A table compression codec that uses nvcomp's LZ4-GPU codec */ @@ -79,6 +78,7 @@ class BatchedNvcompLZ4Decompressor(maxBatchMemory: Long, codecConfigs: TableCompressionCodecConfig, stream: Cuda.Stream) extends BatchedBufferDecompressor(maxBatchMemory, stream) { override val codecId: Byte = CodecType.NVCOMP_LZ4 + private val decompressor = new BatchedLZ4Decompressor(codecConfigs.lz4ChunkSize) override def decompressAsync( inputBuffers: Array[BaseDeviceMemoryBuffer], @@ -87,26 +87,17 @@ class BatchedNvcompLZ4Decompressor(maxBatchMemory: Long, require(inputBuffers.length == bufferMetas.length, s"number of input buffers (${inputBuffers.length}) does not equal number of metadata " + s"buffers (${bufferMetas.length}") - val outputBuffers = allocateOutputBuffers(inputBuffers, bufferMetas) - BatchedLZ4Decompressor.decompressAsync( - codecConfigs.lz4ChunkSize, + // Increase ref count to keep inputs alive since cudf decompressor will close the inputs. + val compressedBufs = DeviceBuffersUtils.incRefCount(inputBuffers) + val outputBuffers = closeOnExcept(compressedBufs) { _ => + withResource(new NvtxRange("alloc output bufs", NvtxColor.YELLOW)) { _ => + DeviceBuffersUtils.allocateBuffers(bufferMetas.map(_.uncompressedSize())) + } + } + decompressor.decompressAsync( inputBuffers, outputBuffers.asInstanceOf[Array[BaseDeviceMemoryBuffer]], stream) outputBuffers } - - private def allocateOutputBuffers( - inputBuffers: Array[BaseDeviceMemoryBuffer], - bufferMetas: Array[BufferMeta]): Array[DeviceMemoryBuffer] = { - withResource(new NvtxRange("alloc output bufs", NvtxColor.YELLOW)) { _ => - bufferMetas.zip(inputBuffers).safeMap { case (meta, input) => - // cudf decompressor guarantees that close will be called for 'inputBuffers' and will not - // throw before doing so, but this interface does not close inputs so we need to increment - // the ref count. - input.incRefCount() - DeviceMemoryBuffer.allocate(meta.uncompressedSize()) - } - } - } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ff6b8724d75..1dfcd67f000 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1896,6 +1896,19 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .integerConf .createWithDefault(20) + private val SHUFFLE_SERDE_TYPES = Set("CPU", "GPU") + + val SHUFFLE_SERDE_TYPE = + conf("spark.rapids.shuffle.serde.type") + .doc("When true, enable the GPU serialization and deserialization for the" + + " normal shuffle.") + .internal() + .startupOnly() + .stringConf + .checkValue(v => SHUFFLE_SERDE_TYPES.contains(v.toUpperCase(java.util.Locale.ROOT)), + s"The shuffle type should be one of ${SHUFFLE_SERDE_TYPES.mkString(", ")}") + .createWithDefault("CPU") + // ALLUXIO CONFIGS val ALLUXIO_MASTER = conf("spark.rapids.alluxio.master") .doc("The Alluxio master hostname. If not set, read Alluxio master URL from " + @@ -2967,6 +2980,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shuffleMultiThreadedReaderThreads: Int = get(SHUFFLE_MULTITHREADED_READER_THREADS) + lazy val isGpuSerdeEnabled: Boolean = + get(SHUFFLE_SERDE_TYPE).toUpperCase(java.util.Locale.ROOT) == "GPU" + def isUCXShuffleManagerMode: Boolean = RapidsShuffleManagerMode .withName(get(SHUFFLE_MANAGER_MODE)) == RapidsShuffleManagerMode.UCX diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala index 1b0ee21d494..58f8c3c10ff 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala @@ -152,6 +152,10 @@ object GpuShuffleEnv extends Logging { isRapidsShuffleAvailable(conf) } + def isSerdeOnGpu(conf: RapidsConf): Boolean = { + conf.isGpuSerdeEnabled && (!useGPUShuffle(conf)) + } + def getCatalog: ShuffleBufferCatalog = if (env == null) { null } else { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 5323fc89019..851a6fe3b79 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,10 @@ import scala.concurrent.Future import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, ShuffleOriginUtil, SparkShimImpl} -import org.apache.spark.{MapOutputStatistics, ShuffleDependency} +import org.apache.spark.{MapOutputStatistics, ShuffleDependency, TaskContext} import org.apache.spark.rapids.shims.GpuShuffleExchangeExec import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -208,6 +209,8 @@ abstract class GpuShuffleExchangeExecBase( createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle combine time"), "rapidsShuffleWriteIoTime" -> createNanoTimingMetric(DEBUG_LEVEL,"rs. shuffle write io time"), + "rapidsShufflePartitionTime" -> + createNanoTimingMetric(DEBUG_LEVEL, "rs. shuffle partition time"), "rapidsShuffleReadTime" -> createNanoTimingMetric(ESSENTIAL_LEVEL,"rs. shuffle read time") ) ++ GpuMetric.wrap(readMetrics) ++ GpuMetric.wrap(writeMetrics) @@ -231,7 +234,9 @@ abstract class GpuShuffleExchangeExecBase( // This value must be lazy because the child's output may not have been resolved // yet in all cases. private lazy val serializer: Serializer = new GpuColumnarBatchSerializer( - gpuLongMetric("dataSize")) + gpuLongMetric("dataSize"), allMetrics("rapidsShuffleSerializationTime"), + allMetrics("rapidsShuffleDeserializationTime"), gpuOutputPartitioning.serdeOnGPU, + sparkTypes, new RapidsConf(conf).gpuTargetBatchSizeBytes) @transient lazy val inputBatchRDD: RDD[ColumnarBatch] = child.executeColumnar() @@ -314,7 +319,8 @@ object GpuShuffleExchangeExecBase { } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) def getPartitioned: ColumnarBatch => Any = { - batch => partitioner.columnarEvalAny(batch) + val partitionMetric = metrics("rapidsShufflePartitionTime") + batch => partitionMetric.ns(partitioner.columnarEvalAny(batch)) } val rddWithPartitionIds: RDD[Product2[Int, ColumnarBatch]] = { newRdd.mapPartitions { iter => @@ -323,12 +329,17 @@ object GpuShuffleExchangeExecBase { private var partitioned : Array[(ColumnarBatch, Int)] = _ private var at = 0 private val mutablePair = new MutablePair[Int, ColumnarBatch]() - private def partNextBatch(): Unit = { - if (partitioned != null) { - partitioned.map(_._1).safeClose() - partitioned = null - at = 0 + Option(TaskContext.get()).foreach { tc => + onTaskCompletion(tc) { + if (partitioned != null) { + partitioned.drop(at).map(_._1).safeClose() + } } + } + + private def partNextBatch(): Unit = { + partitioned = null + at = 0 if (iter.hasNext) { var batch = iter.next() while (batch.numRows == 0 && iter.hasNext) { @@ -343,7 +354,6 @@ object GpuShuffleExchangeExecBase { metrics(GpuMetric.NUM_OUTPUT_ROWS) += batches._1.numRows() }) metrics(GpuMetric.NUM_OUTPUT_BATCHES) += partitioned.length - at = 0 } else { batch.close() } @@ -359,10 +369,7 @@ object GpuShuffleExchangeExecBase { } override def next(): Product2[Int, ColumnarBatch] = { - if (partitioned == null || at >= partitioned.length) { - partNextBatch() - } - if (partitioned == null || at >= partitioned.length) { + if (!hasNext) { throw new NoSuchElementException("Walked off of the end...") } val tup = partitioned(at) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index 1e3c0f699da..11519b65505 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids import java.io.File import java.math.RoundingMode -import ai.rapids.cudf.{ColumnVector, Cuda, DType, Table} -import com.nvidia.spark.rapids.Arm.withResource +import ai.rapids.cudf.{ColumnVector, Cuda, DeviceMemoryBuffer, DType, Table} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.SparkConf @@ -45,6 +45,26 @@ class GpuPartitioningSuite extends AnyFunSuite { } } + private def decompressTable(batch: ColumnarBatch): Array[ColumnVector] = { + val dataCol = batch.column(0).asInstanceOf[GpuCompressedColumnVector] + val descr = dataCol.getTableMeta.bufferMeta.codecBufferDescrs(0) + val codec = TableCompressionCodec.getCodec( + descr.codec, TableCompressionCodec.makeCodecConfig(rapidsConf)) + withResource(codec.createBatchDecompressor(100 * 1024 * 1024L, + Cuda.DEFAULT_STREAM)) { decompressor => + dataCol.getTableBuffer.incRefCount() + decompressor.addBufferToDecompress(dataCol.getTableBuffer, + dataCol.getTableMeta.bufferMeta) + withResource(decompressor.finishAsync()) { outputBuffers => + val outputBuffer = outputBuffers.head + // There should be only one + withResource(MetaUtils.getTableFromMeta(outputBuffer, dataCol.getTableMeta)) { table => + (0 until table.getNumberOfColumns).map(i => table.getColumn(i).incRefCount()).toArray + } + } + } + } + /** * Retrieves the underlying column vectors for a batch. It increments the reference counts for * them if needed so the results need to be closed. @@ -56,22 +76,22 @@ class GpuPartitioningSuite extends AnyFunSuite { // The contiguous table is still responsible for closing these columns. (0 until table.getNumberOfColumns).map(i => table.getColumn(i).incRefCount()).toArray } else if (GpuCompressedColumnVector.isBatchCompressed(batch)) { - val compressedColumn = batch.column(0).asInstanceOf[GpuCompressedColumnVector] - val descr = compressedColumn.getTableMeta.bufferMeta.codecBufferDescrs(0) - val codec = TableCompressionCodec.getCodec( - descr.codec, TableCompressionCodec.makeCodecConfig(rapidsConf)) - withResource(codec.createBatchDecompressor(100 * 1024 * 1024L, - Cuda.DEFAULT_STREAM)) { decompressor => - compressedColumn.getTableBuffer.incRefCount() - decompressor.addBufferToDecompress(compressedColumn.getTableBuffer, - compressedColumn.getTableMeta.bufferMeta) - withResource(decompressor.finishAsync()) { outputBuffers => - val outputBuffer = outputBuffers.head - // There should be only one - withResource( - MetaUtils.getTableFromMeta(outputBuffer, compressedColumn.getTableMeta)) { table => - (0 until table.getNumberOfColumns).map(i => table.getColumn(i).incRefCount()).toArray - } + decompressTable(batch) + } else if (PackedTableHostColumnVector.isBatchPackedOnHost(batch)) { + val hostCol = batch.column(0).asInstanceOf[PackedTableHostColumnVector] + val tableMeta = hostCol.getTableMeta + val hostBuf = hostCol.getTableBuffer + val data = closeOnExcept(DeviceMemoryBuffer.allocate(hostBuf.getLength)) { devBuf => + devBuf.copyFromHostBuffer(hostBuf) + devBuf + } + withResource(data) { _ => + val bufferMeta = tableMeta.bufferMeta() + if (bufferMeta == null || bufferMeta.codecBufferDescrsLength == 0) { + val table = MetaUtils.getTableFromMeta(data, tableMeta) + (0 until table.getNumberOfColumns).map(i => table.getColumn(i)).toArray + } else { + decompressTable(GpuCompressedColumnVector.from(data, hostCol.getTableMeta)) } } } else { @@ -109,11 +129,14 @@ class GpuPartitioningSuite extends AnyFunSuite { } } - test("GPU partition") { + private def testGpuPartition(serdeType: String): Unit = { TrampolineUtil.cleanupAnyExistingSession() - val conf = new SparkConf().set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "none") + val conf = new SparkConf() + .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, "none") + .set(RapidsConf.SHUFFLE_SERDE_TYPE.key, serdeType) TestUtils.withGpuSparkSession(conf) { _ => - GpuShuffleEnv.init(new RapidsConf(conf), new RapidsDiskBlockManager(conf)) + val rapidsConf = new RapidsConf(conf) + GpuShuffleEnv.init(rapidsConf, new RapidsDiskBlockManager(conf)) val partitionIndices = Array(0, 2, 2) val gp = new GpuPartitioning { override val numPartitions: Int = partitionIndices.length @@ -135,7 +158,11 @@ class GpuPartitioningSuite extends AnyFunSuite { } val expectedRows = endRow - startRow assertResult(expectedRows)(partBatch.numRows) - assert(GpuPackedTableColumn.isBatchPacked(partBatch)) + if (rapidsConf.isGpuSerdeEnabled) { + assert(PackedTableHostColumnVector.isBatchPackedOnHost(partBatch)) + } else { + assert(GpuPackedTableColumn.isBatchPacked(partBatch)) + } withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch => compareBatches(expectedBatch, partBatch) } @@ -145,17 +172,10 @@ class GpuPartitioningSuite extends AnyFunSuite { } } - test("GPU partition with lz4 compression") { - testGpuPartitionWithCompression("lz4") - } - - test("GPU partition with zstd compression") { - testGpuPartitionWithCompression("zstd") - } - - private def testGpuPartitionWithCompression(codecName: String): Unit = { + private def testGpuPartitionWithCompression(serdeType: String, codecName: String): Unit = { val conf = new SparkConf() - .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, codecName) + .set(RapidsConf.SHUFFLE_COMPRESSION_CODEC.key, codecName) + .set(RapidsConf.SHUFFLE_SERDE_TYPE.key, serdeType) TestUtils.withGpuSparkSession(conf) { _ => GpuShuffleEnv.init(new RapidsConf(conf), new RapidsDiskBlockManager(conf)) val spillPriority = 7L @@ -197,6 +217,8 @@ class GpuPartitioningSuite extends AnyFunSuite { val rows = c.getContiguousTable.getRowCount assert(rows == 0) rows + case c: PackedTableHostColumnVector => + c.getTableMeta.rowCount case _ => throw new IllegalStateException("column should either be compressed or packed") } @@ -213,6 +235,17 @@ class GpuPartitioningSuite extends AnyFunSuite { } } } + } else if (PackedTableHostColumnVector.isBatchPackedOnHost(batch)) { + val pthc = columns.head.asInstanceOf[PackedTableHostColumnVector] + val hostBuffer = pthc.getTableBuffer + val handle = catalog.addBuffer(hostBuffer, pthc.getTableMeta, spillPriority) + withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch => + withResource(catalog.acquireBuffer(handle)) { buffer => + withResource(buffer.getColumnarBatch(sparkTypes)) { batch => + compareBatches(expectedBatch, batch) + } + } + } } } } @@ -220,6 +253,26 @@ class GpuPartitioningSuite extends AnyFunSuite { } } } + + test("GPU partition") { + testGpuPartition("GPU") + } + + test("GPU partition with CPU Serde") { + testGpuPartition("CPU") + } + + test("GPU partition with lz4 compression") { + testGpuPartitionWithCompression("GPU", "lz4") + } + + test("GPU partition with zstd compression") { + testGpuPartitionWithCompression("GPU", "zstd") + } + + test("GPU partition with lz4 compression and CPU Serde") { + testGpuPartitionWithCompression("CPU", "lz4") + } } case class MockRapidsBufferId(tableId: Int) extends RapidsBufferId { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala index 9211c32e142..5361127ccc7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,8 @@ package com.nvidia.spark.rapids import java.math.RoundingMode -import ai.rapids.cudf.Table -import com.nvidia.spark.rapids.Arm.withResource +import ai.rapids.cudf.{DeviceMemoryBuffer, Table} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.SparkConf @@ -62,13 +62,28 @@ class GpuSinglePartitioningSuite extends AnyFunSuite { assertResult(1)(result.length) assertResult(0)(result.head._2) val resultBatch = result.head._1 - // verify this is a contiguous split table - assert(GpuPackedTableColumn.isBatchPacked(resultBatch)) - val packedColumn = resultBatch.column(0).asInstanceOf[GpuPackedTableColumn] - val actual = packedColumn.getContiguousTable - assertResult(expected.getBuffer.getLength)(actual.getBuffer.getLength) - assertResult(expected.getMetadataDirectBuffer)(actual.getMetadataDirectBuffer) - TestUtils.compareTables(expected.getTable, actual.getTable) + if (GpuPackedTableColumn.isBatchPacked(resultBatch)) { + // verify this is a contiguous split table + val packedColumn = resultBatch.column(0).asInstanceOf[GpuPackedTableColumn] + val actual = packedColumn.getContiguousTable + assertResult(expected.getBuffer.getLength)(actual.getBuffer.getLength) + assertResult(expected.getMetadataDirectBuffer)(actual.getMetadataDirectBuffer) + TestUtils.compareTables(expected.getTable, actual.getTable) + } else { + assert(PackedTableHostColumnVector.isBatchPackedOnHost(resultBatch)) + val packedCol = resultBatch.column(0).asInstanceOf[PackedTableHostColumnVector] + assertResult(expected.getBuffer.getLength)(packedCol.getTableBuffer.getLength) + val hostBuf = packedCol.getTableBuffer + val data = closeOnExcept(DeviceMemoryBuffer.allocate(hostBuf.getLength)) { + devBuf => + devBuf.copyFromHostBuffer(hostBuf) + devBuf + } + val actualTable = withResource(data)( + MetaUtils.getTableFromMeta(_, packedCol.getTableMeta)) + withResource(actualTable)(TestUtils.compareTables(expected.getTable, _)) + } + } finally { result.foreach(_._1.close()) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala index fc9c85112e5..fc2faf3a6d4 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ShufflePartitionerRetrySuite.scala @@ -22,6 +22,7 @@ import com.nvidia.spark.rapids.jni.RmmSpark import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder, SpecificInternalRow} +import org.apache.spark.sql.rapids.{GpuShuffleEnv, RapidsDiskBlockManager} import org.apache.spark.sql.types.{DataType, IntegerType, StringType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -36,7 +37,9 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase { } private def testRoundRobinPartitioner(partNum: Int) = { - TestUtils.withGpuSparkSession(new SparkConf()) { _ => + val conf = new SparkConf() + TestUtils.withGpuSparkSession(conf) { _ => + GpuShuffleEnv.init(new RapidsConf(conf), new RapidsDiskBlockManager(conf)) val rrp = GpuRoundRobinPartitioning(partNum) // batch will be closed within columnarEvalAny val batch = buildBatch @@ -55,7 +58,9 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase { } test("GPU range partition with retry") { - TestUtils.withGpuSparkSession(new SparkConf()) { _ => + val conf = new SparkConf() + TestUtils.withGpuSparkSession(conf) { _ => + GpuShuffleEnv.init(new RapidsConf(conf), new RapidsDiskBlockManager(conf)) // Initialize range bounds val fieldTypes: Array[DataType] = Array(IntegerType) val bounds = new SpecificInternalRow(fieldTypes)