Skip to content

Commit

Permalink
Fix explode with carry-along columns on GpuExplode single row retry h…
Browse files Browse the repository at this point in the history
…andling (#10193)

* Fixes bug where on split and retry the carry along columns were corrupted

---------

Signed-off-by: Alessandro Bellina <[email protected]>
  • Loading branch information
abellina authored Jan 12, 2024
1 parent 65b0716 commit df144b1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
colToReconstitute.makeListFromOffsets(1, offsets)
}
} else {
tbl.getColumn(0).incRefCount()
tbl.getColumn(col).incRefCount()
}
}
}
Expand Down
84 changes: 53 additions & 31 deletions tests/src/test/scala/com/nvidia/spark/rapids/GpuGenerateSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector, Table}
import ai.rapids.cudf.HostColumnVector.{BasicType, ListType, StructType}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.jni.GpuSplitAndRetryOOM

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
Expand Down Expand Up @@ -88,26 +89,34 @@ class GpuGenerateSuite
includeRepeatColumn: Boolean = true,
includeNulls: Boolean = false,
listSize: Int = 4,
allNulls: Boolean = false): (ColumnarBatch, Long) = {
allNulls: Boolean = false,
carryAlongColumnCount: Int = 1): (ColumnarBatch, Long) = {
var inputSize: Long = 0L
withResource(makeListColumn(numRows, listSize, includeNulls, allNulls)) { cvList =>
inputSize +=
withResource(cvList.getChildColumnView(0)) {
_.getDeviceMemorySize
}
val batch = if (includeRepeatColumn) {
val dt: Array[DataType] = Seq(IntegerType, ArrayType(IntegerType)).toArray
val secondColumn = (0 until numRows).map { x =>
val i = Int.box(x)
if (allNulls || includeNulls && i % 2 == 0) {
null
} else {
i
}
val carryAlongColumnTypes = (0 until carryAlongColumnCount).map(_ => IntegerType)
val dt: Array[DataType] = (carryAlongColumnTypes ++ Seq(ArrayType(IntegerType))).toArray
val carryAlongColumns = (0 until carryAlongColumnCount).safeMap { c =>
val elements = (0 until numRows).map { x =>
// + c to add some differences per columnn
val i = Int.box(x + c)
if (allNulls || includeNulls && i % 2 == 0) {
null
} else {
i
}
}.toSeq
ColumnVector.fromBoxedInts(elements:_*)
}
withResource(ColumnVector.fromBoxedInts(secondColumn: _*)) { repeatCol =>
inputSize += listSize * repeatCol.getDeviceMemorySize
withResource(new Table(repeatCol, cvList)) { tbl =>
withResource(carryAlongColumns) { _ =>
carryAlongColumns.foreach { rc =>
inputSize += listSize * rc.getDeviceMemorySize
}
withResource(new Table((carryAlongColumns :+ cvList): _*)) { tbl =>
GpuColumnVector.from(tbl, dt)
}
}
Expand All @@ -126,26 +135,35 @@ class GpuGenerateSuite
includeRepeatColumn: Boolean = true,
includeNulls: Boolean = false,
mapSize: Int = 4,
allNulls: Boolean = false): (ColumnarBatch, Long) = {
allNulls: Boolean = false,
carryAlongColumnCount: Int = 1): (ColumnarBatch, Long) = {
var inputSize: Long = 0L
withResource(makeMapColumn(numRows, mapSize, includeNulls, allNulls)) { cvList =>
inputSize +=
withResource(cvList.getChildColumnView(0)) {
_.getDeviceMemorySize
}
val batch = if (includeRepeatColumn) {
val dt: Array[DataType] = Seq(IntegerType, MapType(IntegerType, IntegerType)).toArray
val secondColumn = (0 until numRows).map { x =>
val i = Int.box(x)
if (allNulls || includeNulls && i % 2 == 0) {
null
} else {
i
}
val carryAlongColumnTypes = (0 until carryAlongColumnCount).map(_ => IntegerType)
val dt: Array[DataType] =
(carryAlongColumnTypes ++ Seq(MapType(IntegerType, IntegerType))).toArray
val carryAlongColumns = (0 until carryAlongColumnCount).safeMap { c =>
val elements = (0 until numRows).map { x =>
// + c to add some differences per columnn
val i = Int.box(x + c)
if (allNulls || includeNulls && i % 2 == 0) {
null
} else {
i
}
}.toSeq
ColumnVector.fromBoxedInts(elements: _*)
}
withResource(ColumnVector.fromBoxedInts(secondColumn: _*)) { repeatCol =>
inputSize += repeatCol.getDeviceMemorySize
withResource(new Table(repeatCol, cvList)) { tbl =>
withResource(carryAlongColumns) { _ =>
carryAlongColumns.foreach { rc =>
inputSize += mapSize * rc.getDeviceMemorySize
}
withResource(new Table((carryAlongColumns :+ cvList): _*)) { tbl =>
GpuColumnVector.from(tbl, dt)
}
}
Expand Down Expand Up @@ -296,18 +314,21 @@ class GpuGenerateSuite
generate: GpuGenerator,
failingGenerate: TestGenerator,
makeBatchFn:
(Int, Boolean) => (ColumnarBatch, Long) = makeBatch(_, includeRepeatColumn = true, _),
(Int, Boolean) => (ColumnarBatch, Long) =
makeBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2),
outer: Boolean = false,
includeNulls: Boolean = false) = {
// numRows = 1: exercises the split code trying to save a 1-row scenario where
// we are running OOM.
// numRows = 2: is the regular split code
val generatorOffset = 2 // 0 and 1 are carry-along columns, 2 == collection to explode

(1 until 3).foreach { numRows =>
val (expected, _) = makeBatchFn(numRows, includeNulls)
val itNoFailures = new GpuGenerateIterator(
Seq(SpillableColumnarBatch(expected, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)),
generator = generate,
generatorOffset = 1,
generatorOffset,
outer,
NoopMetric,
NoopMetric,
Expand All @@ -333,7 +354,7 @@ class GpuGenerateSuite
val it = new GpuGenerateIterator(
Seq(actualSpillable),
generator = failingGenerate,
generatorOffset = 1,
generatorOffset,
outer,
NoopMetric,
NoopMetric,
Expand Down Expand Up @@ -364,6 +385,7 @@ class GpuGenerateSuite

withResource(results) { _ =>
withResource(results.map(GpuColumnVector.from)) { resultTbls =>
// toSeq is required for scala 2.13 here
withResource(Table.concatenate(resultTbls.toSeq: _*)) { res =>
withResource(GpuColumnVector.from(expectedExploded)) { expectedTbl =>
TestUtils.compareTables(expectedTbl, res)
Expand Down Expand Up @@ -424,7 +446,7 @@ class GpuGenerateSuite
doGenerateWithSplitAndRetry(
GpuExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
new TestExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2),
includeNulls = includeNulls
)
}
Expand All @@ -435,7 +457,7 @@ class GpuGenerateSuite
doGenerateWithSplitAndRetry(
GpuExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
new TestExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2),
outer = true,
includeNulls = includeNulls
)
Expand All @@ -447,7 +469,7 @@ class GpuGenerateSuite
doGenerateWithSplitAndRetry(
GpuPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
new TestPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2),
includeNulls = includeNulls
)
}
Expand All @@ -458,7 +480,7 @@ class GpuGenerateSuite
doGenerateWithSplitAndRetry(
GpuPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
new TestPosExplode(AttributeReference("foo", MapType(IntegerType, IntegerType))()),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _),
makeBatchFn = makeMapBatch(_, includeRepeatColumn = true, _, carryAlongColumnCount=2),
outer = true,
includeNulls = includeNulls
)
Expand Down

0 comments on commit df144b1

Please sign in to comment.