Skip to content

Commit

Permalink
address some comments for 11792
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone committed Dec 4, 2024
1 parent 738c8e3 commit d3d56ed
Showing 1 changed file with 36 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,12 @@ object AggregateUtils extends Logging {
helper: AggHelper,
hashKeys: Seq[GpuExpression],
hashBucketNum: Int,
hashSeed: Int,
baseHashSeed: Int,
recursiveDepth: Int,
maxRecursiveDepth: Int,
batchesByBucket: ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]
): Boolean = {

val hashSeed = baseHashSeed + recursiveDepth * 7
var repartitionHappened = false

def repartitionAndClose(batch: SpillableColumnarBatch): Unit = {
Expand Down Expand Up @@ -277,25 +279,32 @@ object AggregateUtils extends Logging {

val newBuckets = batchesByBucket.flatMap(bucket => {
if (needRepartitionAgain(bucket)) {
if (hashSeed + 7 > 200) {
log.warn("Too many times of repartition, may hit a bug? Size for each batch in " +
if (recursiveDepth >= maxRecursiveDepth) {
// Normally this should not happen, because we are repartitioning data that has
// already went through first round of aggregation, so there shouldn't be too many
// duplicated rows (the duplication only happens in different batches) to prevent
// repartitioning out (considering we're changing seed each time we repartition).
// However for some test cases with really small batch size, this can happen. So
// we're just logging some warnings here.
log.warn("The bucket is still too large after " + recursiveDepth +
" times of repartition, may hit a bug? Size for each batch in " +
"current bucket: " + bucket.map(_.sizeInBytes).mkString(", ") + " rows: " +
bucket.map(_.numRows()).mkString(", ") + " targetMergeBatchSize: "
+ targetMergeBatchSize)
ArrayBuffer.apply(bucket)
ArrayBuffer(bucket)
} else {
val nextLayerBuckets =
ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]())
// Recursively merge and repartition the over sized bucket
repartitionHappened =
iterateAndRepartition(
new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize,
helper, hashKeys, hashBucketNum, hashSeed + 7,
nextLayerBuckets) || repartitionHappened
helper, hashKeys, hashBucketNum, baseHashSeed, recursiveDepth + 1,
maxRecursiveDepth, nextLayerBuckets) || repartitionHappened
nextLayerBuckets
}
} else {
ArrayBuffer.apply(bucket)
ArrayBuffer(bucket)
}
})
batchesByBucket.clear()
Expand Down Expand Up @@ -920,7 +929,8 @@ class GpuMergeAggregateIterator(
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize)

private[this] val defaultHashBucketNum = 16
private[this] val defaultHashSeed = 107
private[this] val maxLevelsOfRepartition = 10 // this is the max level for recursive repartition
private[this] val baseHashSeed = 107 // this is the seed used for first level for repartition
private[this] var batchesByBucket =
ArrayBuffer.fill(defaultHashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]())

Expand Down Expand Up @@ -952,7 +962,7 @@ class GpuMergeAggregateIterator(

// Handle reduction-only aggregation
if (isReductionOnly) {
val batches = ArrayBuffer.apply[SpillableColumnarBatch]()
val batches = ArrayBuffer[SpillableColumnarBatch]()
while (firstPassIter.hasNext) {
batches += firstPassIter.next()
}
Expand Down Expand Up @@ -984,7 +994,7 @@ class GpuMergeAggregateIterator(
s"$firstPassReductionRatioEstimate")
// if so, skip any aggregation, return the origin batch directly

realIter = Some(ConcatIterator.apply(firstPassIter, configuredTargetBatchSize))
realIter = Some(ConcatIterator(firstPassIter, configuredTargetBatchSize))
metrics.numTasksSkippedAgg += 1
return realIter.get.next()
} else {
Expand All @@ -1009,12 +1019,12 @@ class GpuMergeAggregateIterator(
configuredTargetBatchSize,
concatAndMergeHelper)
, metrics, targetMergeBatchSize, concatAndMergeHelper,
hashKeys, defaultHashBucketNum, defaultHashSeed, batchesByBucket)
hashKeys, defaultHashBucketNum, baseHashSeed, 0, maxLevelsOfRepartition, batchesByBucket)
if (repartitionHappened) {
metrics.numTasksRepartitioned += 1
}

realIter = Some(ConcatIterator.apply(
realIter = Some(ConcatIterator(
new CloseableBufferedIterator(buildBucketIterator()), configuredTargetBatchSize))
realIter.get.next()
}
Expand Down Expand Up @@ -1047,8 +1057,12 @@ class GpuMergeAggregateIterator(
val spillCbs = ArrayBuffer[SpillableColumnarBatch]()
var totalBytes = 0L
closeOnExcept(spillCbs) { _ =>
while (input.hasNext && (spillCbs.isEmpty ||
(totalBytes + input.head.sizeInBytes) < targetSize)) {
while (input.hasNext && (
// for some test cases targetSize is too small to fit any SpillableColumnarBatch,
// in this case we put the first SpillableColumnarBatch into spillCbs anyway
// refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples
spillCbs.isEmpty ||
(totalBytes + input.head.sizeInBytes) <= targetSize)) {
val tmp = input.next
totalBytes += tmp.sizeInBytes
spillCbs += tmp
Expand Down Expand Up @@ -1080,8 +1094,13 @@ class GpuMergeAggregateIterator(
closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) {
toAggregateBuckets =>
var currentSize = 0L
while (batchesByBucket.nonEmpty && (toAggregateBuckets.isEmpty ||
batchesByBucket.last.size() + currentSize < targetMergeBatchSize)) {
while (batchesByBucket.nonEmpty &&
(
// for some test cases targetMergeBatchSize is too small to fit any bucket,
// in this case we put the first bucket into toAggregateBuckets anyway
// refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples
toAggregateBuckets.isEmpty ||
batchesByBucket.last.size() + currentSize <= targetMergeBatchSize)) {
val bucket = batchesByBucket.remove(batchesByBucket.size - 1)
currentSize += bucket.map(_.sizeInBytes).sum
toAggregateBuckets += bucket
Expand Down

0 comments on commit d3d56ed

Please sign in to comment.