diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index d5bbe15209d..b8ee363f5aa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -214,10 +214,12 @@ object AggregateUtils extends Logging { helper: AggHelper, hashKeys: Seq[GpuExpression], hashBucketNum: Int, - hashSeed: Int, + baseHashSeed: Int, + recursiveDepth: Int, + maxRecursiveDepth: Int, batchesByBucket: ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]] ): Boolean = { - + val hashSeed = baseHashSeed + recursiveDepth * 7 var repartitionHappened = false def repartitionAndClose(batch: SpillableColumnarBatch): Unit = { @@ -277,12 +279,19 @@ object AggregateUtils extends Logging { val newBuckets = batchesByBucket.flatMap(bucket => { if (needRepartitionAgain(bucket)) { - if (hashSeed + 7 > 200) { - log.warn("Too many times of repartition, may hit a bug? Size for each batch in " + - "current bucket: " + bucket.map(_.sizeInBytes).mkString(", ") + " rows: " + - bucket.map(_.numRows()).mkString(", ") + " targetMergeBatchSize: " - + targetMergeBatchSize) - ArrayBuffer.apply(bucket) + if (recursiveDepth >= maxRecursiveDepth) { + // Normally this should not happen, because we are repartitioning data that has + // already gone through first round of aggregation, so there shouldn't be too many + // duplicated rows (the duplication only happens in different batches) to prevent + // repartitioning out (considering we're changing seed each time we repartition). + // However, for some test cases with really small batch size, this can happen. So + // we're just logging some warnings here. + log.warn(s"The bucket is still too large after $recursiveDepth repartitions. " + + s"See https://github.com/NVIDIA/spark-rapids/issues/11834. " + + s"Sizes for each batch in current bucket: ${bucket.map(_.sizeInBytes).mkString(", ")}" + + s" rows: ${bucket.map(_.numRows()).mkString(", ")}" + + s" targetMergeBatchSize: $targetMergeBatchSize") + ArrayBuffer(bucket) } else { val nextLayerBuckets = ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]()) @@ -290,12 +299,12 @@ object AggregateUtils extends Logging { repartitionHappened = iterateAndRepartition( new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize, - helper, hashKeys, hashBucketNum, hashSeed + 7, - nextLayerBuckets) || repartitionHappened + helper, hashKeys, hashBucketNum, baseHashSeed, recursiveDepth + 1, + maxRecursiveDepth, nextLayerBuckets) || repartitionHappened nextLayerBuckets } } else { - ArrayBuffer.apply(bucket) + ArrayBuffer(bucket) } }) batchesByBucket.clear() @@ -920,7 +929,8 @@ class GpuMergeAggregateIterator( private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) private[this] val defaultHashBucketNum = 16 - private[this] val defaultHashSeed = 107 + private[this] val maxLevelsOfRepartition = 10 // this is the max level for recursive repartition + private[this] val baseHashSeed = 107 // this is the seed used for first level for repartition private[this] var batchesByBucket = ArrayBuffer.fill(defaultHashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]()) @@ -952,7 +962,7 @@ class GpuMergeAggregateIterator( // Handle reduction-only aggregation if (isReductionOnly) { - val batches = ArrayBuffer.apply[SpillableColumnarBatch]() + val batches = ArrayBuffer[SpillableColumnarBatch]() while (firstPassIter.hasNext) { batches += firstPassIter.next() } @@ -984,7 +994,7 @@ class GpuMergeAggregateIterator( s"$firstPassReductionRatioEstimate") // if so, skip any aggregation, return the origin batch directly - realIter = Some(ConcatIterator.apply(firstPassIter, configuredTargetBatchSize)) + realIter = Some(ConcatIterator(firstPassIter, configuredTargetBatchSize)) metrics.numTasksSkippedAgg += 1 return realIter.get.next() } else { @@ -1009,12 +1019,12 @@ class GpuMergeAggregateIterator( configuredTargetBatchSize, concatAndMergeHelper) , metrics, targetMergeBatchSize, concatAndMergeHelper, - hashKeys, defaultHashBucketNum, defaultHashSeed, batchesByBucket) + hashKeys, defaultHashBucketNum, baseHashSeed, 0, maxLevelsOfRepartition, batchesByBucket) if (repartitionHappened) { metrics.numTasksRepartitioned += 1 } - realIter = Some(ConcatIterator.apply( + realIter = Some(ConcatIterator( new CloseableBufferedIterator(buildBucketIterator()), configuredTargetBatchSize)) realIter.get.next() } @@ -1047,8 +1057,12 @@ class GpuMergeAggregateIterator( val spillCbs = ArrayBuffer[SpillableColumnarBatch]() var totalBytes = 0L closeOnExcept(spillCbs) { _ => - while (input.hasNext && (spillCbs.isEmpty || - (totalBytes + input.head.sizeInBytes) < targetSize)) { + while (input.hasNext && ( + // for some test cases targetSize is too small to fit any SpillableColumnarBatch, + // in this case we put the first SpillableColumnarBatch into spillCbs anyway + // refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples + spillCbs.isEmpty || + (totalBytes + input.head.sizeInBytes) <= targetSize)) { val tmp = input.next totalBytes += tmp.sizeInBytes spillCbs += tmp @@ -1080,8 +1094,13 @@ class GpuMergeAggregateIterator( closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) { toAggregateBuckets => var currentSize = 0L - while (batchesByBucket.nonEmpty && (toAggregateBuckets.isEmpty || - batchesByBucket.last.size() + currentSize < targetMergeBatchSize)) { + while (batchesByBucket.nonEmpty && + ( + // for some test cases targetMergeBatchSize is too small to fit any bucket, + // in this case we put the first bucket into toAggregateBuckets anyway + // refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples + toAggregateBuckets.isEmpty || + batchesByBucket.last.size() + currentSize <= targetMergeBatchSize)) { val bucket = batchesByBucket.remove(batchesByBucket.size - 1) currentSize += bucket.map(_.sizeInBytes).sum toAggregateBuckets += bucket