From 79882f881207e3d85887036863338506031b0fea Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 11 Nov 2024 16:01:03 +0800 Subject: [PATCH] Some small refactors to improve the stability. Signed-off-by: Firestarman --- .../spark/rapids/GpuAggregateExec.scala | 2 +- .../spark/rapids/GpuCoalesceBatches.scala | 2 +- .../nvidia/spark/rapids/GpuGenerateExec.scala | 31 ++++++----- .../sql/rapids/GpuFileFormatDataWriter.scala | 51 +++++++++---------- .../GpuBroadcastNestedLoopJoinExecBase.scala | 21 ++++---- 5 files changed, 52 insertions(+), 55 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index d5bbe15209d..e3ca330b409 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -716,7 +716,7 @@ object GpuAggregateIterator extends Logging { val dataTypes = (0 until numCols).map { c => batchesToConcat.head.column(c).dataType }.toArray - withResource(batchesToConcat.map(GpuColumnVector.from)) { tbl => + withResource(batchesToConcat.safeMap(GpuColumnVector.from)) { tbl => withResource(cudf.Table.concatenate(tbl: _*)) { concatenated => val cb = GpuColumnVector.from(concatenated, dataTypes) SpillableColumnarBatch(cb, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index cc1196d44e4..0af5baa90db 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -67,7 +67,7 @@ object ConcatAndConsumeAll { if (arrayOfBatches.length == 1) { arrayOfBatches(0) } else { - val tables = arrayOfBatches.map(GpuColumnVector.from) + val tables = arrayOfBatches.safeMap(GpuColumnVector.from) try { val combined = Table.concatenate(tables: _*) try { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index 239b7a3d4c0..dbf2e95dbf0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -677,27 +677,26 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene outer: Boolean): Iterator[ColumnarBatch] = { val batchesToGenerate = inputSpillables.map(new BatchToGenerate(0, _)) withRetry(batchesToGenerate, generateSplitSpillableInHalfByRows(generatorOffset)) { attempt => - withResource(attempt.spillable.getColumnarBatch()) { inputBatch => + val (exploded, schema) = withResource(attempt.spillable.getColumnarBatch()) { inputBatch => require(inputBatch.numCols() - 1 == generatorOffset, s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.") val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset) - withResource(GpuColumnVector.from(inputBatch)) { table => - withResource( - explodeFun(table, generatorOffset, outer, attempt.fixUpOffset)) { exploded => - child.dataType match { - case _: ArrayType => - GpuColumnVector.from(exploded, schema) - case MapType(kt, vt, _) => - // We need to pull the key and value of of the struct column - withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed => - GpuColumnVector.from(fixed, schema) - } - case other => - throw new IllegalArgumentException( - s"$other is not supported as explode input right now") + (explodeFun(table, generatorOffset, outer, attempt.fixUpOffset), schema) + } + } + withResource(exploded) { _ => + child.dataType match { + case _: ArrayType => + GpuColumnVector.from(exploded, schema) + case MapType(kt, vt, _) => + // We need to pull the key and value of of the struct column + withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed => + GpuColumnVector.from(fixed, schema) } - } + case other => + throw new IllegalArgumentException( + s"$other is not supported as explode input right now") } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index be88e7a2937..6b6580df68f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -78,34 +78,35 @@ object GpuFileFormatDataWriter { * The input batch is closed in case of error or in case we have to split it. * It is not closed if it wasn't split. * - * @param batch ColumnarBatch to split (and close) + * @param scb Spillable ColumnarBatch to split (and close) * @param maxRecordsPerFile max rowcount per file * @param recordsInFile row count in the file so far * @return array of SpillableColumnarBatch splits */ - def splitToFitMaxRecordsAndClose( - batch: ColumnarBatch, + def splitToFitMaxRecordsAndCloseWithRetry( + scb: SpillableColumnarBatch, maxRecordsPerFile: Long, recordsInFile: Long): Array[SpillableColumnarBatch] = { - val (types, splitIndexes) = closeOnExcept(batch) { _ => - val splitIndexes = getSplitIndexes(maxRecordsPerFile, recordsInFile, batch.numRows()) - (GpuColumnVector.extractTypes(batch), splitIndexes) + val splitIndexes = closeOnExcept(scb) { _ => + getSplitIndexes(maxRecordsPerFile, recordsInFile, scb.numRows()) } if (splitIndexes.isEmpty) { - // this should never happen, as `splitToFitMaxRecordsAndClose` is called when + // this should never happen, as `splitToFitMaxRecordsAndCloseWithRetry` is called when // splits should already happen, but making it more efficient in that case - Array(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + Array(scb) } else { // actually split it - val tbl = withResource(batch) { _ => - GpuColumnVector.from(batch) - } - val cts = withResource(tbl) { _ => - tbl.contiguousSplit(splitIndexes: _*) - } - withResource(cts) { _ => - cts.safeMap(ct => - SpillableColumnarBatch(ct, types, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + withRetryNoSplit(scb) { _ => + val tbl = withResource(scb.getColumnarBatch()) { batch => + GpuColumnVector.from(batch) + } + val cts = withResource(tbl) { _ => + tbl.contiguousSplit(splitIndexes: _*) + } + withResource(cts) { _ => + cts.safeMap(ct => + SpillableColumnarBatch(ct, scb.dataTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + } } } } @@ -257,14 +258,13 @@ class GpuSingleDirectoryDataWriter( override def write(batch: ColumnarBatch): Unit = { val maxRecordsPerFile = description.maxRecordsPerFile val recordsInFile = currentWriterStatus.recordsInFile + val scb = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) if (!shouldSplitToFitMaxRecordsPerFile( - maxRecordsPerFile, recordsInFile, batch.numRows())) { - writeUpdateMetricsAndClose( - SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY), - currentWriterStatus) + maxRecordsPerFile, recordsInFile, scb.numRows())) { + writeUpdateMetricsAndClose(scb, currentWriterStatus) } else { - val partBatches = splitToFitMaxRecordsAndClose( - batch, maxRecordsPerFile, recordsInFile) + val partBatches = splitToFitMaxRecordsAndCloseWithRetry(scb, maxRecordsPerFile, + recordsInFile) val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile closeOnExcept(partBatches) { _ => partBatches.zipWithIndex.foreach { case (partBatch, partIx) => @@ -593,10 +593,7 @@ class GpuDynamicPartitionDataSingleWriter( if (!shouldSplitToFitMaxRecordsPerFile(maxRecordsPerFile, recordsInFile, scb.numRows())) { writeUpdateMetricsAndClose(scb, writerStatus) } else { - val batch = withRetryNoSplit(scb) { scb => - scb.getColumnarBatch() - } - val splits = splitToFitMaxRecordsAndClose(batch, maxRecordsPerFile, recordsInFile) + val splits = splitToFitMaxRecordsAndCloseWithRetry(scb, maxRecordsPerFile, recordsInFile) withResource(splits) { _ => val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile splits.zipWithIndex.foreach { case (part, partIx) => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala index b939a8c4155..21166437d36 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExecBase.scala @@ -700,17 +700,18 @@ abstract class GpuBroadcastNestedLoopJoinExecBase( val spillable = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) withRetry(spillable, RmmRapidsRetryIterator.splitSpillableInHalfByRows) { spillBatch => withResource(spillBatch.getColumnarBatch()) { batch => - GpuColumnVector.incRefCounts(batch) - val newCols = new Array[ColumnVector](batch.numCols + 1) - (0 until newCols.length - 1).foreach { i => - newCols(i) = batch.column(i) - } - val existsCol = withResource(Scalar.fromBool(exists)) { existsScalar => - GpuColumnVector.from(cudf.ColumnVector.fromScalar(existsScalar, batch.numRows), - BooleanType) + closeOnExcept(GpuColumnVector.incRefCounts(batch)) { _ => + val newCols = new Array[ColumnVector](batch.numCols + 1) + (0 until newCols.length - 1).foreach { i => + newCols(i) = batch.column(i) + } + val existsCol = withResource(Scalar.fromBool(exists)) { existsScalar => + GpuColumnVector.from(cudf.ColumnVector.fromScalar(existsScalar, batch.numRows), + BooleanType) + } + newCols(batch.numCols) = existsCol + new ColumnarBatch(newCols, batch.numRows) } - newCols(batch.numCols) = existsCol - new ColumnarBatch(newCols, batch.numRows) } } }