-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Fix issue 11790 #11792
[BUG] Fix issue 11790 #11792
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -219,9 +219,6 @@ object AggregateUtils extends Logging { | |||||||
): Boolean = { | ||||||||
|
||||||||
var repartitionHappened = false | ||||||||
if (hashSeed > 200) { | ||||||||
throw new IllegalStateException("Too many times of repartition, may hit a bug?") | ||||||||
} | ||||||||
|
||||||||
def repartitionAndClose(batch: SpillableColumnarBatch): Unit = { | ||||||||
|
||||||||
|
@@ -280,15 +277,23 @@ object AggregateUtils extends Logging { | |||||||
|
||||||||
val newBuckets = batchesByBucket.flatMap(bucket => { | ||||||||
if (needRepartitionAgain(bucket)) { | ||||||||
val nextLayerBuckets = | ||||||||
ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]()) | ||||||||
// Recursively merge and repartition the over sized bucket | ||||||||
repartitionHappened = | ||||||||
iterateAndRepartition( | ||||||||
new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize, | ||||||||
helper, hashKeys, hashBucketNum, hashSeed + 7, | ||||||||
nextLayerBuckets) || repartitionHappened | ||||||||
nextLayerBuckets | ||||||||
if (hashSeed + 7 > 200) { | ||||||||
log.warn("Too many times of repartition, may hit a bug? Size for each batch in " + | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please file an issue for the bug and link it here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hi @gerashegalov , there's no known bug here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed #11834 I agree that this is not a bug, but it is also not defensive, also the check requires tight coupling between the code the sets the hashSeed and this to work properly. I would like us to fix that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The decoupling thing should be fixed by #11816 , please take a look. |
||||||||
"current bucket: " + bucket.map(_.sizeInBytes).mkString(", ") + " rows: " + | ||||||||
bucket.map(_.numRows()).mkString(", ") + " targetMergeBatchSize: " | ||||||||
+ targetMergeBatchSize) | ||||||||
ArrayBuffer.apply(bucket) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could use the syntactic sugar here and elsewhere:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But I still see the ArrayBuffer.apply(bucket)??? spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala Line 285 in 738c8e3
Did you forget to check in your last changes before you merged this? Or is there a follow on issue that you plan on doing. |
||||||||
} else { | ||||||||
val nextLayerBuckets = | ||||||||
ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]()) | ||||||||
// Recursively merge and repartition the over sized bucket | ||||||||
repartitionHappened = | ||||||||
iterateAndRepartition( | ||||||||
new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize, | ||||||||
helper, hashKeys, hashBucketNum, hashSeed + 7, | ||||||||
nextLayerBuckets) || repartitionHappened | ||||||||
nextLayerBuckets | ||||||||
} | ||||||||
} else { | ||||||||
ArrayBuffer.apply(bucket) | ||||||||
} | ||||||||
|
@@ -1075,8 +1080,8 @@ class GpuMergeAggregateIterator( | |||||||
closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) { | ||||||||
toAggregateBuckets => | ||||||||
var currentSize = 0L | ||||||||
while (batchesByBucket.nonEmpty && | ||||||||
batchesByBucket.last.size() + currentSize < targetMergeBatchSize) { | ||||||||
while (batchesByBucket.nonEmpty && (toAggregateBuckets.isEmpty || | ||||||||
firestarman marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
batchesByBucket.last.size() + currentSize < targetMergeBatchSize)) { | ||||||||
val bucket = batchesByBucket.remove(batchesByBucket.size - 1) | ||||||||
currentSize += bucket.map(_.sizeInBytes).sum | ||||||||
GaryShen2008 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
toAggregateBuckets += bucket | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please decouple the hashSeed for the number of re-partition times being done? I think the code would be much more readable if we could say how many times we tried to re-partition the data instead of "too many times".