From 03b85b184e141849e414e90a83a4b589fe7e57fc Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 14 Jan 2025 14:18:36 -0600 Subject: [PATCH] Avoid concatentating multiple host buffers when reading Parquet (#11911) Depends upon rapidsai/cudf#17673. Updates the multithreaded Parquet reader to leverage the new multiple host buffers reader interface. This removes the need to concatenate multiple host buffers into a single buffer before decoding the data via the GPU. This also makes it easier to accept "late arrivals" in the multithreaded combine reader after waking up with the GPU semaphore, since we only need to fabricate a new footer to accommodate the additional buffers in the read. --------- Signed-off-by: Jason Lowe - --- .../com/nvidia/spark/rapids/DumpUtils.scala | 24 ++- .../spark/rapids/GpuMultiFileReader.scala | 31 ++- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 25 +-- .../nvidia/spark/rapids/GpuParquetScan.scala | 201 ++++++++---------- .../apache/spark/sql/rapids/GpuAvroScan.scala | 13 +- .../rapids/GpuMultiFileReaderSuite.scala | 2 +- 6 files changed, 144 insertions(+), 152 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index 64b08162557..fcff6192d59 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +65,28 @@ object DumpUtils extends Logging { } } + def dumpBuffer( + conf: Configuration, + data: Array[HostMemoryBuffer], + prefix: String, + suffix: String): String = { + try { + val (out, path) = FileUtils.createTempFile(conf, prefix, suffix) + withResource(out) { _ => + data.foreach { hmb => + withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in => + IOUtils.copy(in, out) + } + } + } + path.toString + } catch { + case e: Exception => + log.error(s"Error attempting to dump data", e) + s"" + } + } + /** * Debug utility to dump columnar batch to parquet file.
* It's running on GPU. Parquet column names are generated from columnar batch type info.
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index 6d8ab11c60c..ffbf06e42bb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -29,7 +29,7 @@ import scala.language.implicitConversions import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Table} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric.{BUFFER_TIME, FILTER_TIME} -import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq +import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, AutoCloseableProducingSeq} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -53,13 +53,13 @@ import org.apache.spark.util.SerializableConfiguration * This contains a single HostMemoryBuffer along with other metadata needed * for combining the buffers before sending to GPU. */ -case class SingleHMBAndMeta(hmb: HostMemoryBuffer, bytes: Long, numRows: Long, +case class SingleHMBAndMeta(hmbs: Array[HostMemoryBuffer], bytes: Long, numRows: Long, blockMeta: Seq[DataBlockBase]) object SingleHMBAndMeta { // Contains no data but could have number of rows for things like count(). def empty(numRows: Long = 0): SingleHMBAndMeta = { - SingleHMBAndMeta(null.asInstanceOf[HostMemoryBuffer], 0, numRows, Seq.empty) + SingleHMBAndMeta(Array.empty, 0, numRows, Seq.empty) } } @@ -675,9 +675,7 @@ abstract class MultiFileCloudPartitionReaderBase( private def closeCurrentFileHostBuffers(): Unit = { currentFileHostBuffers.foreach { current => current.memBuffersAndSizes.foreach { hbInfo => - if (hbInfo.hmb != null) { - hbInfo.hmb.close() - } + hbInfo.hmbs.safeClose() } } currentFileHostBuffers = None @@ -692,9 +690,7 @@ abstract class MultiFileCloudPartitionReaderBase( tasks.asScala.foreach { task => if (task.isDone()) { task.get.memBuffersAndSizes.foreach { hmbInfo => - if (hmbInfo.hmb != null) { - hmbInfo.hmb.close() - } + hmbInfo.hmbs.safeClose() } } else { // Note we are not interrupting thread here so it @@ -1009,9 +1005,9 @@ abstract class MultiFileCoalescingPartitionReaderBase( if (currentChunkMeta.currentChunk.isEmpty) { CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { - val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk, + val dataBuffer = readPartFiles(currentChunkMeta.currentChunk, currentChunkMeta.clippedSchema) - if (dataSize == 0) { + if (dataBuffer.getLength == 0) { dataBuffer.close() CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { @@ -1020,8 +1016,8 @@ abstract class MultiFileCoalescingPartitionReaderBase( // We don't want to actually close the host buffer until we know that we don't // want to retry more, so offset the close for now. dataBuffer.incRefCount() - val tableReader = readBufferToTablesAndClose(dataBuffer, - dataSize, currentChunkMeta.clippedSchema, currentChunkMeta.readSchema, + val tableReader = readBufferToTablesAndClose(dataBuffer, dataBuffer.getLength, + currentChunkMeta.clippedSchema, currentChunkMeta.readSchema, currentChunkMeta.extraInfo) CachedGpuBatchIterator(tableReader, colTypes) } @@ -1042,12 +1038,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( * Read all data blocks into HostMemoryBuffer * @param blocks a sequence of data blocks to be read * @param clippedSchema the clipped schema is used to calculate the estimated output size - * @return (HostMemoryBuffer, Long) - * the HostMemoryBuffer and its data size + * @return the HostMemoryBuffer */ private def readPartFiles( blocks: Seq[(Path, DataBlockBase)], - clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = { + clippedSchema: SchemaBase): HostMemoryBuffer = { withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, metrics("bufferTime"))) { _ => @@ -1138,7 +1133,9 @@ abstract class MultiFileCoalescingPartitionReaderBase( } logDebug(s"$getFileFormatShortName Coalescing reading estimates the initTotalSize:" + s" $initTotalSize, and the true size: $finalBufferSize") - (finalBuffer, finalBufferSize) + withResource(finalBuffer) { _ => + finalBuffer.slice(0, finalBufferSize) + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 550f6bc5591..2199e6bd4db 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -2105,12 +2105,12 @@ class MultiFileCloudOrcPartitionReader( val (hostBuf, bufSize) = readPartFile(ctx, blocksToRead) val numRows = blocksToRead.map(_.infoBuilder.getNumberOfRows).sum val metas = blocksToRead.map(b => OrcDataStripe(OrcStripeWithMeta(b, ctx))) - hostBuffers += SingleHMBAndMeta(hostBuf, bufSize, numRows, metas) + hostBuffers += SingleHMBAndMeta(Array(hostBuf), bufSize, numRows, metas) } val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose() logDebug("Reader is closed, return empty buffer for the current read for " + s"file: ${partFile.filePath.toString}") HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema) @@ -2123,7 +2123,7 @@ class MultiFileCloudOrcPartitionReader( } } catch { case e: Throwable => - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose(e) throw e } val bufferTime = System.nanoTime() - bufferTimeStart @@ -2222,9 +2222,10 @@ class MultiFileCloudOrcPartitionReader( case buffer: HostMemoryBuffersWithMetaData => val memBuffersAndSize = buffer.memBuffersAndSizes val hmbInfo = memBuffersAndSize.head - val batchIter = readBufferToBatches(hmbInfo.hmb, hmbInfo.bytes, buffer.updatedReadSchema, - buffer.requestedMapping, filterHandler.isCaseSensitive, buffer.partitionedFile, - buffer.allPartValues) + require(hmbInfo.hmbs.length == 1) + val batchIter = readBufferToBatches(hmbInfo.hmbs.head, hmbInfo.bytes, + buffer.updatedReadSchema, buffer.requestedMapping, filterHandler.isCaseSensitive, + buffer.partitionedFile, buffer.allPartValues) if (memBuffersAndSize.length > 1) { val updatedBuffers = memBuffersAndSize.drop(1) currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers)) @@ -2314,9 +2315,10 @@ class MultiFileCloudOrcPartitionReader( toCombine.foreach { hmbWithMeta => hmbWithMeta.memBuffersAndSizes.foreach { buf => val dataCopyAmount = buf.blockMeta.map(_.getBlockSize).sum - if (dataCopyAmount > 0 && buf.hmb != null) { + if (dataCopyAmount > 0 && buf.hmbs.nonEmpty) { + require(buf.hmbs.length == 1) combinedBuf.copyFromHostBuffer( - offset, buf.hmb, OrcTools.ORC_MAGIC.length, dataCopyAmount) + offset, buf.hmbs.head, OrcTools.ORC_MAGIC.length, dataCopyAmount) } // update the offset for each stripe var stripeOffset = offset @@ -2325,9 +2327,7 @@ class MultiFileCloudOrcPartitionReader( stripeOffset += block.getBlockSize } offset += dataCopyAmount - if (buf.hmb != null) { - buf.hmb.close() - } + buf.hmbs.safeClose() allOutputStripes ++= buf.blockMeta.map(_.stripe) } } @@ -2364,7 +2364,8 @@ class MultiFileCloudOrcPartitionReader( // e: Create the new meta for the combined buffer val numRows = combinedMeta.allPartValues.map(_._1).sum - val combinedRet = SingleHMBAndMeta(maybeNewBuf, outStream.getPos, numRows, blockMetas) + val combinedRet = SingleHMBAndMeta(Array(maybeNewBuf), outStream.getPos, numRows, + blockMetas) val newHmbWithMeta = metaToUse.copy( memBuffersAndSizes = Array(combinedRet), allPartValues = Some(combinedMeta.allPartValues)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index f37c00b8f5a..277e0c6b934 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -34,13 +34,12 @@ import ai.rapids.cudf._ import com.github.luben.zstd.ZstdDecompressCtx import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ -import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy} +import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy, PARQUET_MAGIC} import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter} import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} -import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, Path} @@ -1913,7 +1912,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics protected def readPartFile( blocks: Seq[BlockMetaData], clippedSchema: MessageType, - filePath: Path): (HostMemoryBuffer, Long, Seq[BlockMetaData]) = { + filePath: Path): (HostMemoryBuffer, Seq[BlockMetaData]) = { withResource(new NvtxRange("Parquet buffer file split", NvtxColor.YELLOW)) { _ => val estTotalSize = calculateParquetOutputSize(blocks, clippedSchema, false) closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb => @@ -1933,7 +1932,10 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " + s"small, actual written: ${out.getPos}") } - (hmb, out.getPos, outputBlocks) + val outputBuffer = withResource(hmb) { _ => + hmb.slice(0, out.getPos) + } + (outputBuffer, outputBlocks) } } } @@ -2250,7 +2252,7 @@ class MultiFileParquetPartitionReader( MakeParquetTableProducer(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, conf, currentTargetBatchSize, parseOpts, - dataBuffer, 0, dataSize, metrics, + Array(dataBuffer), metrics, extraInfo.dateRebaseMode, extraInfo.timestampRebaseMode, extraInfo.hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, splits, debugDumpPrefix, debugDumpAlways) @@ -2391,80 +2393,54 @@ class MultiFileCloudParquetPartitionReader( logDebug(s"Using Combine mode and actually combining, num files ${toCombineHmbs.size} " + s"files: ${toCombineHmbs.map(_.partitionedFile.filePath).mkString(",")}") val startCombineTime = System.currentTimeMillis() - // this size includes the written header and footer on each buffer so remove - // the size of those to get data size - val existingSizeUsed = toCombineHmbs.map { hbWithMeta => - hbWithMeta.memBuffersAndSizes.map(smb => Math.max(smb.bytes - PARQUET_META_SIZE, 0)).sum - }.sum // since we know not all of them are empty and we know all these have the same schema since // we already separated, just use the clippedSchema from metadata val schemaToUse = metaToUse.clippedSchema - val blocksAlreadyRead = toCombineHmbs.flatMap(_.memBuffersAndSizes.flatMap(_.blockMeta)) - val footerSize = calculateParquetFooterSize(blocksAlreadyRead.toSeq, schemaToUse) - // all will have same schema so same number of columns - val numCols = blocksAlreadyRead.head.getColumns().size() - val extraMemory = calculateExtraMemoryForParquetFooter(numCols, blocksAlreadyRead.size) - val initTotalSize = existingSizeUsed + footerSize + extraMemory - val combined = closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { combinedHmb => - var offset = withResource(new HostMemoryOutputStream(combinedHmb)) { out => - out.write(ParquetPartitionReader.PARQUET_MAGIC) - out.getPos - } + val combined = closeOnExcept(new ArrayBuffer[HostMemoryBuffer]) { buffers => + var offset: Long = PARQUET_MAGIC.size val allOutputBlocks = new ArrayBuffer[BlockMetaData]() - // copy the actual data + // zero-copy the data toCombineHmbs.map { hbWithMeta => hbWithMeta.memBuffersAndSizes.map { hmbInfo => - val copyAmount = hmbInfo.blockMeta.map { meta => + val columnDataSize = hmbInfo.blockMeta.map { meta => meta.getColumns.asScala.map(_.getTotalSize).sum }.sum - if (copyAmount > 0 && hmbInfo.hmb != null) { - combinedHmb.copyFromHostBuffer(offset, hmbInfo.hmb, - ParquetPartitionReader.PARQUET_MAGIC.size, copyAmount) + if (columnDataSize > 0 && hmbInfo.hmbs.nonEmpty) { + val bytesToSlice = if (buffers.isEmpty) { + columnDataSize + PARQUET_MAGIC.size + } else { + columnDataSize + } + val sliceOffset = if (buffers.isEmpty) 0 else PARQUET_MAGIC.size + require(hmbInfo.hmbs.length == 1) + buffers += hmbInfo.hmbs.head.slice(sliceOffset, bytesToSlice) } val outputBlocks = computeBlockMetaData(hmbInfo.blockMeta, offset) allOutputBlocks ++= outputBlocks - offset += copyAmount - if (hmbInfo.hmb != null) { - hmbInfo.hmb.close() - } + offset += columnDataSize + hmbInfo.hmbs.safeClose() + } + } + if (buffers.isEmpty) { + closeOnExcept(HostMemoryBuffer.allocate(PARQUET_MAGIC.size)) { hmb => + hmb.setBytes(0, PARQUET_MAGIC, 0, PARQUET_MAGIC.size); + buffers += hmb } } // using all of the actual combined output blocks meta calculate what the footer size // will really be val actualFooterSize = calculateParquetFooterSize(allOutputBlocks.toSeq, schemaToUse) - var buf: HostMemoryBuffer = combinedHmb - val totalBufferSize = if ((initTotalSize - offset) < actualFooterSize) { - val newBufferSize = offset + actualFooterSize + 4 + 4 - logWarning(s"The original estimated size $initTotalSize is too small, " + - s"reallocating and copying data to bigger buffer size: $newBufferSize") - // Copy the old buffer to a new allocated bigger buffer and close the old buffer - buf = withResource(combinedHmb) { _ => - withResource(new HostMemoryInputStream(combinedHmb, offset)) { in => - // realloc memory and copy - closeOnExcept(HostMemoryBuffer.allocate(newBufferSize)) { newhmb => - withResource(new HostMemoryOutputStream(newhmb)) { out => - IOUtils.copy(in, out) - } - newhmb - } - } - } - newBufferSize - } else { - initTotalSize + val footerBuf = HostMemoryBuffer.allocate(actualFooterSize + 8) + buffers += footerBuf + withResource(new HostMemoryOutputStream(footerBuf)) { footerOut => + writeFooter(footerOut, allOutputBlocks.toSeq, schemaToUse) + BytesUtils.writeIntLittleEndian(footerOut, footerOut.getPos.toInt) + footerOut.write(ParquetPartitionReader.PARQUET_MAGIC) + offset += footerOut.getPos } - - withResource(buf.slice(offset, (totalBufferSize - offset))) { footerHmbSlice => - withResource(new HostMemoryOutputStream(footerHmbSlice)) { footerOut => - writeFooter(footerOut, allOutputBlocks.toSeq, schemaToUse) - BytesUtils.writeIntLittleEndian(footerOut, footerOut.getPos.toInt) - footerOut.write(ParquetPartitionReader.PARQUET_MAGIC) - offset += footerOut.getPos - } - } - val newHmbBufferInfo = SingleHMBAndMeta(buf, offset, + val newHmbBufferInfo = SingleHMBAndMeta(buffers.toArray, offset, combinedMeta.allPartValues.map(_._1).sum, Seq.empty) val newHmbMeta = HostMemoryBuffersWithMetaData( metaToUse.partitionedFile, @@ -2686,16 +2662,16 @@ class MultiFileCloudParquetPartitionReader( while (blockChunkIter.hasNext) { val blocksToRead = populateCurrentBlockChunk(blockChunkIter, maxReadBatchSizeRows, maxReadBatchSizeBytes, fileBlockMeta.readSchema) - val (dataBuffer, dataSize, blockMeta) = + val (dataBuffer, blockMeta) = readPartFile(blocksToRead, fileBlockMeta.schema, filePath) val numRows = blocksToRead.map(_.getRowCount).sum.toInt - hostBuffers += SingleHMBAndMeta(dataBuffer, dataSize, + hostBuffers += SingleHMBAndMeta(Array(dataBuffer), dataBuffer.getLength, numRows, blockMeta) } val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose() HostMemoryEmptyMetaData(file, 0, bytesRead, fileBlockMeta.dateRebaseMode, fileBlockMeta.timestampRebaseMode, fileBlockMeta.hasInt96Timestamps, fileBlockMeta.schema, @@ -2711,7 +2687,7 @@ class MultiFileCloudParquetPartitionReader( } } catch { case e: Throwable => - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose(e) throw e } val bufferTime = System.nanoTime() - bufferStartTime @@ -2783,8 +2759,7 @@ class MultiFileCloudParquetPartitionReader( val hmbAndInfo = memBuffersAndSize.head val batchIter = readBufferToBatches(buffer.dateRebaseMode, buffer.timestampRebaseMode, buffer.hasInt96Timestamps, buffer.clippedSchema, - buffer.readSchema, buffer.partitionedFile, hmbAndInfo.hmb, hmbAndInfo.bytes, - buffer.allPartValues) + buffer.readSchema, buffer.partitionedFile, hmbAndInfo.hmbs, buffer.allPartValues) if (memBuffersAndSize.length > 1) { val updatedBuffers = memBuffersAndSize.drop(1) currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers)) @@ -2802,11 +2777,10 @@ class MultiFileCloudParquetPartitionReader( clippedSchema: MessageType, readDataSchema: StructType, partedFile: PartitionedFile, - hostBuffer: HostMemoryBuffer, - dataSize: Long, + hostBuffers: Array[HostMemoryBuffer], allPartValues: Option[Array[(Long, InternalRow)]]): Iterator[ColumnarBatch] = { - val parseOpts = closeOnExcept(hostBuffer) { _ => + val parseOpts = closeOnExcept(hostBuffers) { _ => getParquetOptions(readDataSchema, clippedSchema, useFieldId) } val colTypes = readDataSchema.fields.map(f => f.dataType) @@ -2814,34 +2788,36 @@ class MultiFileCloudParquetPartitionReader( // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get()) - RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ => - // The MakeParquetTableProducer will close the input buffer, and that would be bad - // because we don't want to close it until we know that we are done with it - hostBuffer.incRefCount() - val tableReader = MakeParquetTableProducer(useChunkedReader, - maxChunkedReaderMemoryUsageSizeBytes, - conf, targetBatchSizeBytes, - parseOpts, - hostBuffer, 0, dataSize, metrics, - dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, - isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, - debugDumpPrefix, debugDumpAlways) - - val batchIter = CachedGpuBatchIterator(tableReader, colTypes) - - if (allPartValues.isDefined) { - val allPartInternalRows = allPartValues.get.map(_._2) - val rowsPerPartition = allPartValues.get.map(_._1) - new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows, - rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes) - } else { - // this is a bit weird, we don't have number of rows when allPartValues isn't - // filled in so can't use GpuColumnarBatchWithPartitionValuesIterator - batchIter.flatMap { batch => - // we have to add partition values here for this batch, we already verified that - // its not different for all the blocks in this batch - BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch, - partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes) + withResource(hostBuffers) { _ => + RmmRapidsRetryIterator.withRetryNoSplit { + // The MakeParquetTableProducer will close the input buffers, and that would be bad + // because we don't want to close them until we know that we are done with them. + hostBuffers.foreach(_.incRefCount()) + val tableReader = MakeParquetTableProducer(useChunkedReader, + maxChunkedReaderMemoryUsageSizeBytes, + conf, targetBatchSizeBytes, + parseOpts, + hostBuffers, metrics, + dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, + isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, + debugDumpPrefix, debugDumpAlways) + + val batchIter = CachedGpuBatchIterator(tableReader, colTypes) + + if (allPartValues.isDefined) { + val allPartInternalRows = allPartValues.get.map(_._2) + val rowsPerPartition = allPartValues.get.map(_._1) + new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows, + rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes) + } else { + // this is a bit weird, we don't have number of rows when allPartValues isn't + // filled in so can't use GpuColumnarBatchWithPartitionValuesIterator + batchIter.flatMap { batch => + // we have to add partition values here for this batch, we already verified that + // its not different for all the blocks in this batch + BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch, + partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes) + } } } } @@ -2855,9 +2831,7 @@ object MakeParquetTableProducer extends Logging { conf: Configuration, chunkSizeByteLimit: Long, opts: ParquetOptions, - buffer: HostMemoryBuffer, - offset: Long, - len: Long, + buffers: Array[HostMemoryBuffer], metrics : Map[String, GpuMetric], dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, @@ -2872,30 +2846,29 @@ object MakeParquetTableProducer extends Logging { ): GpuDataProducer[Table] = { debugDumpPrefix.foreach { prefix => if (debugDumpAlways) { - val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet") + val p = DumpUtils.dumpBuffer(conf, buffers, prefix, ".parquet") logWarning(s"Wrote data for ${splits.mkString(", ")} to $p") } } if (useChunkedReader) { ParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, - opts, buffer, offset, - len, metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, + opts, buffers, metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedParquetSchema, splits, debugDumpPrefix, debugDumpAlways) } else { - val table = withResource(buffer) { _ => + val table = withResource(buffers) { _ => try { RmmRapidsRetryIterator.withRetryNoSplit[Table] { withResource(new NvtxWithMetrics("Parquet decode", NvtxColor.DARK_GREEN, metrics(GPU_DECODE_TIME))) { _ => - Table.readParquet(opts, buffer, offset, len) + Table.readParquet(opts, buffers:_*) } } } catch { case e: Exception => val dumpMsg = debugDumpPrefix.map { prefix => if (!debugDumpAlways) { - val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet") + val p = DumpUtils.dumpBuffer(conf, buffers, prefix, ".parquet") s", data dumped to $p" } else { "" @@ -2927,9 +2900,7 @@ case class ParquetTableReader( chunkSizeByteLimit: Long, maxChunkedReaderMemoryUsageSizeBytes: Long, opts: ParquetOptions, - buffer: HostMemoryBuffer, - offset: Long, - len: Long, + buffers: Array[HostMemoryBuffer], metrics : Map[String, GpuMetric], dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, @@ -2942,7 +2913,7 @@ case class ParquetTableReader( debugDumpPrefix: Option[String], debugDumpAlways: Boolean) extends GpuDataProducer[Table] with Logging { private[this] val reader = new ParquetChunkedReader(chunkSizeByteLimit, - maxChunkedReaderMemoryUsageSizeBytes, opts, buffer, offset, len) + maxChunkedReaderMemoryUsageSizeBytes, opts, buffers:_*) private[this] lazy val splitsString = splits.mkString("; ") @@ -2957,7 +2928,7 @@ case class ParquetTableReader( case e: Exception => val dumpMsg = debugDumpPrefix.map { prefix => if (!debugDumpAlways) { - val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet") + val p = DumpUtils.dumpBuffer(conf, buffers, prefix, ".parquet") s", data dumped to $p" } else { "" @@ -2982,7 +2953,7 @@ case class ParquetTableReader( override def close(): Unit = { reader.close() - buffer.close() + buffers.safeClose() } } @@ -3073,10 +3044,10 @@ class ParquetPartitionReader( CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { val parseOpts = getParquetOptions(readDataSchema, clippedParquetSchema, useFieldId) - val (dataBuffer, dataSize, _) = metrics(BUFFER_TIME).ns { + val (dataBuffer, _) = metrics(BUFFER_TIME).ns { readPartFile(currentChunkedBlocks, clippedParquetSchema, filePath) } - if (dataSize == 0) { + if (dataBuffer.getLength == 0) { dataBuffer.close() CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { @@ -3090,7 +3061,7 @@ class ParquetPartitionReader( val producer = MakeParquetTableProducer(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts, - dataBuffer, 0, dataSize, metrics, + Array(dataBuffer), metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index e81e2eb9821..d4a9da091db 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -663,7 +663,7 @@ class GpuMultiFileCloudAvroPartitionReader( val bufsAndSizes = buffer.memBuffersAndSizes val bufAndSizeInfo = bufsAndSizes.head val partitionValues = buffer.partitionedFile.partitionValues - val batchIter = if (bufAndSizeInfo.hmb == null) { + val batchIter = if (bufAndSizeInfo.hmbs.isEmpty) { // Not reading any data, but add in partition data if needed // Someone is going to process this data, even if it is just a row count GpuSemaphore.acquireIfNecessary(TaskContext.get()) @@ -671,7 +671,8 @@ class GpuMultiFileCloudAvroPartitionReader( BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(emptyBatch, partitionValues, partitionSchema, maxGpuColumnSizeBytes) } else { - val maybeBatch = sendToGpu(bufAndSizeInfo.hmb, bufAndSizeInfo.bytes, files) + require(bufAndSizeInfo.hmbs.length == 1) + val maybeBatch = sendToGpu(bufAndSizeInfo.hmbs.head, bufAndSizeInfo.bytes, files) // we have to add partition values here for this batch, we already verified that // it's not different for all the blocks in this batch maybeBatch match { @@ -844,18 +845,18 @@ class GpuMultiFileCloudAvroPartitionReader( // One batch is done optOut.foreach(out => hostBuffers += - (SingleHMBAndMeta(optHmb.get, out.getPos, batchRowsNum, Seq.empty))) + (SingleHMBAndMeta(Array(optHmb.get), out.getPos, batchRowsNum, Seq.empty))) totalRowsNum += batchRowsNum estBlocksSize -= batchSize } } // end of while val bufAndSize: Array[SingleHMBAndMeta] = if (readDataSchema.isEmpty) { - hostBuffers.foreach(_.hmb.safeClose(new Exception)) + hostBuffers.flatMap(_.hmbs).safeClose() Array(SingleHMBAndMeta.empty(totalRowsNum)) } else if (isDone) { // got close before finishing, return null buffer and zero size - hostBuffers.foreach(_.hmb.safeClose(new Exception)) + hostBuffers.flatMap(_.hmbs).safeClose() Array(SingleHMBAndMeta.empty()) } else { hostBuffers.toArray @@ -863,7 +864,7 @@ class GpuMultiFileCloudAvroPartitionReader( createBufferAndMeta(bufAndSize, startingBytesRead) } catch { case e: Throwable => - hostBuffers.foreach(_.hmb.safeClose(e)) + hostBuffers.flatMap(_.hmbs).safeClose(e) throw e } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala index ae1b5a47011..983031f4611 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala @@ -36,7 +36,7 @@ class GpuMultiFileReaderSuite extends AnyFunSuite { val conf = new Configuration(false) val membuffers = Array(SingleHMBAndMeta( - HostMemoryBuffer.allocate(0), 0L, 0, Seq.empty)) + Array(HostMemoryBuffer.allocate(0)), 0L, 0, Seq.empty)) val multiFileReader = new MultiFileCloudPartitionReaderBase( conf, inputFiles = Array.empty,