Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some small improvements #11847

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ object GpuAggregateIterator extends Logging {
val dataTypes = (0 until numCols).map {
c => batchesToConcat.head.column(c).dataType
}.toArray
withResource(batchesToConcat.map(GpuColumnVector.from)) { tbl =>
withResource(batchesToConcat.safeMap(GpuColumnVector.from)) { tbl =>
withResource(cudf.Table.concatenate(tbl: _*)) { concatenated =>
val cb = GpuColumnVector.from(concatenated, dataTypes)
SpillableColumnarBatch(cb,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object ConcatAndConsumeAll {
if (arrayOfBatches.length == 1) {
arrayOfBatches(0)
} else {
val tables = arrayOfBatches.map(GpuColumnVector.from)
val tables = arrayOfBatches.safeMap(GpuColumnVector.from)
try {
val combined = Table.concatenate(tables: _*)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,27 +677,26 @@ abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGene
outer: Boolean): Iterator[ColumnarBatch] = {
val batchesToGenerate = inputSpillables.map(new BatchToGenerate(0, _))
withRetry(batchesToGenerate, generateSplitSpillableInHalfByRows(generatorOffset)) { attempt =>
withResource(attempt.spillable.getColumnarBatch()) { inputBatch =>
val (exploded, schema) = withResource(attempt.spillable.getColumnarBatch()) { inputBatch =>
require(inputBatch.numCols() - 1 == generatorOffset,
s"Internal Error ${getClass.getSimpleName} supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(
explodeFun(table, generatorOffset, outer, attempt.fixUpOffset)) { exploded =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
(explodeFun(table, generatorOffset, outer, attempt.fixUpOffset), schema)
}
}
withResource(exploded) { _ =>
child.dataType match {
case _: ArrayType =>
GpuColumnVector.from(exploded, schema)
case MapType(kt, vt, _) =>
// We need to pull the key and value of of the struct column
withResource(convertMapOutput(exploded, generatorOffset, kt, vt, outer)) { fixed =>
GpuColumnVector.from(fixed, schema)
}
}
case other =>
throw new IllegalArgumentException(
s"$other is not supported as explode input right now")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,34 +78,35 @@ object GpuFileFormatDataWriter {
* The input batch is closed in case of error or in case we have to split it.
* It is not closed if it wasn't split.
*
* @param batch ColumnarBatch to split (and close)
* @param scb Spillable ColumnarBatch to split (and close)
* @param maxRecordsPerFile max rowcount per file
* @param recordsInFile row count in the file so far
* @return array of SpillableColumnarBatch splits
*/
def splitToFitMaxRecordsAndClose(
batch: ColumnarBatch,
def splitToFitMaxRecordsAndCloseWithRetry(
scb: SpillableColumnarBatch,
maxRecordsPerFile: Long,
recordsInFile: Long): Array[SpillableColumnarBatch] = {
val (types, splitIndexes) = closeOnExcept(batch) { _ =>
val splitIndexes = getSplitIndexes(maxRecordsPerFile, recordsInFile, batch.numRows())
(GpuColumnVector.extractTypes(batch), splitIndexes)
val splitIndexes = closeOnExcept(scb) { _ =>
getSplitIndexes(maxRecordsPerFile, recordsInFile, scb.numRows())
}
if (splitIndexes.isEmpty) {
// this should never happen, as `splitToFitMaxRecordsAndClose` is called when
// this should never happen, as `splitToFitMaxRecordsAndCloseWithRetry` is called when
// splits should already happen, but making it more efficient in that case
Array(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
Array(scb)
} else {
// actually split it
val tbl = withResource(batch) { _ =>
GpuColumnVector.from(batch)
}
val cts = withResource(tbl) { _ =>
tbl.contiguousSplit(splitIndexes: _*)
}
withResource(cts) { _ =>
cts.safeMap(ct =>
SpillableColumnarBatch(ct, types, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
withRetryNoSplit(scb) { _ =>
val tbl = withResource(scb.getColumnarBatch()) { batch =>
GpuColumnVector.from(batch)
}
val cts = withResource(tbl) { _ =>
tbl.contiguousSplit(splitIndexes: _*)
}
withResource(cts) { _ =>
cts.safeMap(ct =>
SpillableColumnarBatch(ct, scb.dataTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}
}
}
}
Expand Down Expand Up @@ -257,14 +258,13 @@ class GpuSingleDirectoryDataWriter(
override def write(batch: ColumnarBatch): Unit = {
val maxRecordsPerFile = description.maxRecordsPerFile
val recordsInFile = currentWriterStatus.recordsInFile
val scb = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
if (!shouldSplitToFitMaxRecordsPerFile(
maxRecordsPerFile, recordsInFile, batch.numRows())) {
writeUpdateMetricsAndClose(
SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY),
currentWriterStatus)
maxRecordsPerFile, recordsInFile, scb.numRows())) {
writeUpdateMetricsAndClose(scb, currentWriterStatus)
} else {
val partBatches = splitToFitMaxRecordsAndClose(
batch, maxRecordsPerFile, recordsInFile)
val partBatches = splitToFitMaxRecordsAndCloseWithRetry(scb, maxRecordsPerFile,
recordsInFile)
val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile
closeOnExcept(partBatches) { _ =>
partBatches.zipWithIndex.foreach { case (partBatch, partIx) =>
Expand Down Expand Up @@ -593,10 +593,7 @@ class GpuDynamicPartitionDataSingleWriter(
if (!shouldSplitToFitMaxRecordsPerFile(maxRecordsPerFile, recordsInFile, scb.numRows())) {
writeUpdateMetricsAndClose(scb, writerStatus)
} else {
val batch = withRetryNoSplit(scb) { scb =>
scb.getColumnarBatch()
}
val splits = splitToFitMaxRecordsAndClose(batch, maxRecordsPerFile, recordsInFile)
val splits = splitToFitMaxRecordsAndCloseWithRetry(scb, maxRecordsPerFile, recordsInFile)
withResource(splits) { _ =>
val needNewWriterForFirstPart = recordsInFile >= maxRecordsPerFile
splits.zipWithIndex.foreach { case (part, partIx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,17 +700,18 @@ abstract class GpuBroadcastNestedLoopJoinExecBase(
val spillable = SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
withRetry(spillable, RmmRapidsRetryIterator.splitSpillableInHalfByRows) { spillBatch =>
withResource(spillBatch.getColumnarBatch()) { batch =>
GpuColumnVector.incRefCounts(batch)
val newCols = new Array[ColumnVector](batch.numCols + 1)
(0 until newCols.length - 1).foreach { i =>
newCols(i) = batch.column(i)
}
val existsCol = withResource(Scalar.fromBool(exists)) { existsScalar =>
GpuColumnVector.from(cudf.ColumnVector.fromScalar(existsScalar, batch.numRows),
BooleanType)
closeOnExcept(GpuColumnVector.incRefCounts(batch)) { _ =>
val newCols = new Array[ColumnVector](batch.numCols + 1)
(0 until newCols.length - 1).foreach { i =>
newCols(i) = batch.column(i)
}
val existsCol = withResource(Scalar.fromBool(exists)) { existsScalar =>
GpuColumnVector.from(cudf.ColumnVector.fromScalar(existsScalar, batch.numRows),
BooleanType)
}
newCols(batch.numCols) = existsCol
new ColumnarBatch(newCols, batch.numRows)
}
newCols(batch.numCols) = existsCol
new ColumnarBatch(newCols, batch.numRows)
}
}
}
Expand Down
Loading