Skip to content

Commit

Permalink
Some small refactors to improve the stability.
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Dec 10, 2024
1 parent 4fbecbc commit 79882f8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ object GpuAggregateIterator extends Logging {
val dataTypes = (0 until numCols).map {
c => batchesToConcat.head.column(c).dataType
}.toArray
withResource(batchesToConcat.map(GpuColumnVector.from)) { tbl =>
withResource(batchesToConcat.safeMap(GpuColumnVector.from)) { tbl =>
withResource(cudf.Table.concatenate(tbl: _*)) { concatenated =>
val cb = GpuColumnVector.from(concatenated, dataTypes)
SpillableColumnarBatch(cb,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object ConcatAndConsumeAll {
if (arrayOfBatches.length == 1) {
arrayOfBatches(0)
} else {
val tables = arrayOfBatches.map(GpuColumnVector.from)
val tables = arrayOfBatches.safeMap(GpuColumnVector.from)
try {
val combined = Table.concatenate(tables: _*)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,27 +677,26 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
outer: Boolean): Iterator[ColumnarBatch] = {
val batchesToGenerate = inputSpillables.map(new BatchToGenerate(0, _))
withRetry(batchesToGenerate, generateSplitSpillableInHalfByRows(generatorOffset)) { attempt =>
withResource(attempt.spillable.getColumnarBatch()) { inputBatch =>
val (exploded, schema) = withResource(attempt.spillable.getColumnarBatch()) { inputBatch =>
require(inputBatch.numCols() - 1 == generatorOffset,
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(
explodeFun(table, generatorOffset, outer, attempt.fixUpOffset)) { exploded =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
(explodeFun(table, generatorOffset, outer, attempt.fixUpOffset), schema)
}
}
withResource(exploded) { _ =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,34 +78,35 @@ object GpuFileFormatDataWriter {
* The input batch is closed in case of error or in case we have to split it.
* It is not closed if it wasn't split.
*
* @param batch ColumnarBatch to split (and close)
* @param scb Spillable ColumnarBatch to split (and close)
* @param maxRecordsPerFile max rowcount per file
* @param recordsInFile row count in the file so far
* @return array of SpillableColumnarBatch splits
*/
def splitToFitMaxRecordsAndClose(
batch: ColumnarBatch,
def splitToFitMaxRecordsAndCloseWithRetry(
scb: SpillableColumnarBatch,
maxRecordsPerFile: Long,
recordsInFile: Long): Array[SpillableColumnarBatch] = {
val (types, splitIndexes) = closeOnExcept(batch) { _ =>
val splitIndexes = getSplitIndexes(maxRecordsPerFile, recordsInFile, batch.numRows())
(GpuColumnVector.extractTypes(batch), splitIndexes)
val splitIndexes = closeOnExcept(scb) { _ =>
getSplitIndexes(maxRecordsPerFile, recordsInFile, scb.numRows())
}
if (splitIndexes.isEmpty) {
// this should never happen, as `splitToFitMaxRecordsAndClose` is called when
// this should never happen, as `splitToFitMaxRecordsAndCloseWithRetry` is called when
// splits should already happen, but making it more efficient in that case
Array(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
Array(scb)
} else {
// actually split it
val tbl = withResource(batch) { _ =>
GpuColumnVector.from(batch)
}
val cts = withResource(tbl) { _ =>
tbl.contiguousSplit(splitIndexes: _*)
}
withResource(cts) { _ =>
cts.safeMap(ct =>
SpillableColumnarBatch(ct, types, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
withRetryNoSplit(scb) { _ =>
val tbl = withResource(scb.getColumnarBatch()) { batch =>
GpuColumnVector.from(batch)
}
val cts = withResource(tbl) { _ =>
tbl.contiguousSplit(splitIndexes: _*)
}
withResource(cts) { _ =>
cts.safeMap(ct =>
SpillableColumnarBatch(ct, scb.dataTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}
}
}
}
Expand Down Expand Up @@ -257,14 +258,13 @@ class GpuSingleDirectoryDataWriter(
override def write(batch: ColumnarBatch): Unit = {
val maxRecordsPerFile = description.maxRecordsPerFile
val recordsInFile = currentWriterStatus.recordsInFile
val scb = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
if (!shouldSplitToFitMaxRecordsPerFile(
maxRecordsPerFile, recordsInFile, batch.numRows())) {
writeUpdateMetricsAndClose(
SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY),
currentWriterStatus)
maxRecordsPerFile, recordsInFile, scb.numRows())) {
writeUpdateMetricsAndClose(scb, currentWriterStatus)
} else {
val partBatches = splitToFitMaxRecordsAndClose(
batch, maxRecordsPerFile, recordsInFile)
val partBatches = splitToFitMaxRecordsAndCloseWithRetry(scb, maxRecordsPerFile,
recordsInFile)
val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile
closeOnExcept(partBatches) { _ =>
partBatches.zipWithIndex.foreach { case (partBatch, partIx) =>
Expand Down Expand Up @@ -593,10 +593,7 @@ class GpuDynamicPartitionDataSingleWriter(
if (!shouldSplitToFitMaxRecordsPerFile(maxRecordsPerFile, recordsInFile, scb.numRows())) {
writeUpdateMetricsAndClose(scb, writerStatus)
} else {
val batch = withRetryNoSplit(scb) { scb =>
scb.getColumnarBatch()
}
val splits = splitToFitMaxRecordsAndClose(batch, maxRecordsPerFile, recordsInFile)
val splits = splitToFitMaxRecordsAndCloseWithRetry(scb, maxRecordsPerFile, recordsInFile)
withResource(splits) { _ =>
val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile
splits.zipWithIndex.foreach { case (part, partIx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,17 +700,18 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val spillable = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
withRetry(spillable, RmmRapidsRetryIterator.splitSpillableInHalfByRows) { spillBatch =>
withResource(spillBatch.getColumnarBatch()) { batch =>
GpuColumnVector.incRefCounts(batch)
val newCols = new Array[ColumnVector](batch.numCols + 1)
(0 until newCols.length - 1).foreach { i =>
newCols(i) = batch.column(i)
}
val existsCol = withResource(Scalar.fromBool(exists)) { existsScalar =>
GpuColumnVector.from(cudf.ColumnVector.fromScalar(existsScalar, batch.numRows),
BooleanType)
closeOnExcept(GpuColumnVector.incRefCounts(batch)) { _ =>
val newCols = new Array[ColumnVector](batch.numCols + 1)
(0 until newCols.length - 1).foreach { i =>
newCols(i) = batch.column(i)
}
val existsCol = withResource(Scalar.fromBool(exists)) { existsScalar =>
GpuColumnVector.from(cudf.ColumnVector.fromScalar(existsScalar, batch.numRows),
BooleanType)
}
newCols(batch.numCols) = existsCol
new ColumnarBatch(newCols, batch.numRows)
}
newCols(batch.numCols) = existsCol
new ColumnarBatch(newCols, batch.numRows)
}
}
}
Expand Down

0 comments on commit 79882f8

Please sign in to comment.