From df144b1ce945a707b7e829f426c6d7491f74fb71 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 12 Jan 2024 14:47:32 -0600 Subject: [PATCH] Fix explode with carry-along columns on GpuExplode single row retry handling (#10193) * Fixes bug where on split and retry the carry along columns were corrupted --------- Signed-off-by: Alessandro Bellina --- .../nvidia/spark/rapids/GpuGenerateExec.scala | 2 +- .../spark/rapids/GpuGenerateSuite.scala | 84 ++++++++++++------- 2 files changed, 54 insertions(+), 32 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index a07b7e3464c..71841e6576c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -636,7 +636,7 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene colToReconstitute.makeListFromOffsets(1, offsets) } } else { - tbl.getColumn(0).incRefCount() + tbl.getColumn(col).incRefCount() } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala index f5ebf0e8606..d0f3339b425 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector, Table} import ai.rapids.cudf.HostColumnVector.{BasicType, ListType, StructType} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} @@ -88,7 +89,8 @@ class GpuGenerateSuite includeRepeatColumn: Boolean = true, includeNulls: Boolean = false, listSize: Int = 4, - allNulls: Boolean = false): (ColumnarBatch, Long) = { + allNulls: Boolean = false, + carryAlongColumnCount: Int = 1): (ColumnarBatch, Long) = { var inputSize: Long = 0L withResource(makeListColumn(numRows, listSize, includeNulls, allNulls)) { cvList => inputSize += @@ -96,18 +98,25 @@ class GpuGenerateSuite _.getDeviceMemorySize } val batch = if (includeRepeatColumn) { - val dt: Array[DataType] = Seq(IntegerType, ArrayType(IntegerType)).toArray - val secondColumn = (0 until numRows).map { x => - val i = Int.box(x) - if (allNulls || includeNulls && i % 2 == 0) { - null - } else { - i - } + val carryAlongColumnTypes = (0 until carryAlongColumnCount).map(_ => IntegerType) + val dt: Array[DataType] = (carryAlongColumnTypes ++ Seq(ArrayType(IntegerType))).toArray + val carryAlongColumns = (0 until carryAlongColumnCount).safeMap { c => + val elements = (0 until numRows).map { x => + // + c to add some differences per columnn + val i = Int.box(x + c) + if (allNulls || includeNulls && i % 2 == 0) { + null + } else { + i + } + }.toSeq + ColumnVector.fromBoxedInts(elements:_*) } - withResource(ColumnVector.fromBoxedInts(secondColumn: _*)) { repeatCol => - inputSize += listSize * repeatCol.getDeviceMemorySize - withResource(new Table(repeatCol, cvList)) { tbl => + withResource(carryAlongColumns) { _ => + carryAlongColumns.foreach { rc => + inputSize += listSize * rc.getDeviceMemorySize + } + withResource(new Table((carryAlongColumns :+ cvList): _*)) { tbl => GpuColumnVector.from(tbl, dt) } } @@ -126,7 +135,8 @@ class GpuGenerateSuite includeRepeatColumn: Boolean = true, includeNulls: Boolean = false, mapSize: Int = 4, - allNulls: Boolean = false): (ColumnarBatch, Long) = { + allNulls: Boolean = false, + carryAlongColumnCount: Int = 1): (ColumnarBatch, Long) = { var inputSize: Long = 0L withResource(makeMapColumn(numRows, mapSize, includeNulls, allNulls)) { cvList => inputSize += @@ -134,18 +144,26 @@ class GpuGenerateSuite _.getDeviceMemorySize } val batch = if (includeRepeatColumn) { - val dt: Array[DataType] = Seq(IntegerType, MapType(IntegerType, IntegerType)).toArray - val secondColumn = (0 until numRows).map { x => - val i = Int.box(x) - if (allNulls || includeNulls && i % 2 == 0) { - null - } else { - i - } + val carryAlongColumnTypes = (0 until carryAlongColumnCount).map(_ => IntegerType) + val dt: Array[DataType] = + (carryAlongColumnTypes ++ Seq(MapType(IntegerType, IntegerType))).toArray + val carryAlongColumns = (0 until carryAlongColumnCount).safeMap { c => + val elements = (0 until numRows).map { x => + // + c to add some differences per columnn + val i = Int.box(x + c) + if (allNulls || includeNulls && i % 2 == 0) { + null + } else { + i + } + }.toSeq + ColumnVector.fromBoxedInts(elements: _*) } - withResource(ColumnVector.fromBoxedInts(secondColumn: _*)) { repeatCol => - inputSize += repeatCol.getDeviceMemorySize - withResource(new Table(repeatCol, cvList)) { tbl => + withResource(carryAlongColumns) { _ => + carryAlongColumns.foreach { rc => + inputSize += mapSize * rc.getDeviceMemorySize + } + withResource(new Table((carryAlongColumns :+ cvList): _*)) { tbl => GpuColumnVector.from(tbl, dt) } } @@ -296,18 +314,21 @@ class GpuGenerateSuite generate: GpuGenerator, failingGenerate: TestGenerator, makeBatchFn: - (Int, Boolean) => (ColumnarBatch, Long) = makeBatch(_, includeRepeatColumn = true, _), + (Int, Boolean) => (ColumnarBatch, Long) = + makeBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2), outer: Boolean = false, includeNulls: Boolean = false) = { // numRows = 1: exercises the split code trying to save a 1-row scenario where // we are running OOM. // numRows = 2: is the regular split code + val generatorOffset = 2 // 0 and 1 are carry-along columns, 2 == collection to explode + (1 until 3).foreach { numRows => val (expected, _) = makeBatchFn(numRows, includeNulls) val itNoFailures = new GpuGenerateIterator( Seq(SpillableColumnarBatch(expected, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)), generator = generate, - generatorOffset = 1, + generatorOffset, outer, NoopMetric, NoopMetric, @@ -333,7 +354,7 @@ class GpuGenerateSuite val it = new GpuGenerateIterator( Seq(actualSpillable), generator = failingGenerate, - generatorOffset = 1, + generatorOffset, outer, NoopMetric, NoopMetric, @@ -364,6 +385,7 @@ class GpuGenerateSuite withResource(results) { _ => withResource(results.map(GpuColumnVector.from)) { resultTbls => + // toSeq is required for scala 2.13 here withResource(Table.concatenate(resultTbls.toSeq: _*)) { res => withResource(GpuColumnVector.from(expectedExploded)) { expectedTbl => TestUtils.compareTables(expectedTbl, res) @@ -424,7 +446,7 @@ class GpuGenerateSuite doGenerateWithSplitAndRetry( GpuExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), new TestExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), - makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _), + makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2), includeNulls = includeNulls ) } @@ -435,7 +457,7 @@ class GpuGenerateSuite doGenerateWithSplitAndRetry( GpuExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), new TestExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), - makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _), + makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2), outer = true, includeNulls = includeNulls ) @@ -447,7 +469,7 @@ class GpuGenerateSuite doGenerateWithSplitAndRetry( GpuPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), new TestPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), - makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _), + makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2), includeNulls = includeNulls ) } @@ -458,7 +480,7 @@ class GpuGenerateSuite doGenerateWithSplitAndRetry( GpuPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), new TestPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()), - makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _), + makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2), outer = true, includeNulls = includeNulls )