Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix issue 11790 #11792

Merged
merged 1 commit into from
Nov 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ object AggregateUtils extends Logging {
): Boolean = {

var repartitionHappened = false
if (hashSeed > 200) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please decouple the hashSeed for the number of re-partition times being done? I think the code would be much more readable if we could say how many times we tried to re-partition the data instead of "too many times".

throw new IllegalStateException("Too many times of repartition, may hit a bug?")
}

def repartitionAndClose(batch: SpillableColumnarBatch): Unit = {

Expand Down Expand Up @@ -280,15 +277,23 @@ object AggregateUtils extends Logging {

val newBuckets = batchesByBucket.flatMap(bucket => {
if (needRepartitionAgain(bucket)) {
val nextLayerBuckets =
ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]())
// Recursively merge and repartition the over sized bucket
repartitionHappened =
iterateAndRepartition(
new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize,
helper, hashKeys, hashBucketNum, hashSeed + 7,
nextLayerBuckets) || repartitionHappened
nextLayerBuckets
if (hashSeed + 7 > 200) {
log.warn("Too many times of repartition, may hit a bug? Size for each batch in " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file an issue for the bug and link it here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @gerashegalov , there's no known bug here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #11834 I agree that this is not a bug, but it is also not defensive, also the check requires tight coupling between the code the sets the hashSeed and this to work properly. I would like us to fix that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decoupling thing should be fixed by #11816 , please take a look.

"current bucket: " + bucket.map(_.sizeInBytes).mkString(", ") + " rows: " +
bucket.map(_.numRows()).mkString(", ") + " targetMergeBatchSize: "
+ targetMergeBatchSize)
ArrayBuffer.apply(bucket)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use the syntactic sugar here and elsewhere:

Suggested change
ArrayBuffer.apply(bucket)
ArrayBuffer(bucket)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I still see the ArrayBuffer.apply(bucket)???

Did you forget to check in your last changes before you merged this? Or is there a follow on issue that you plan on doing.

} else {
val nextLayerBuckets =
ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]())
// Recursively merge and repartition the over sized bucket
repartitionHappened =
iterateAndRepartition(
new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize,
helper, hashKeys, hashBucketNum, hashSeed + 7,
nextLayerBuckets) || repartitionHappened
nextLayerBuckets
}
} else {
ArrayBuffer.apply(bucket)
}
Expand Down Expand Up @@ -1075,8 +1080,8 @@ class GpuMergeAggregateIterator(
closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) {
toAggregateBuckets =>
var currentSize = 0L
while (batchesByBucket.nonEmpty &&
batchesByBucket.last.size() + currentSize < targetMergeBatchSize) {
while (batchesByBucket.nonEmpty && (toAggregateBuckets.isEmpty ||
firestarman marked this conversation as resolved.
Show resolved Hide resolved
batchesByBucket.last.size() + currentSize < targetMergeBatchSize)) {
val bucket = batchesByBucket.remove(batchesByBucket.size - 1)
currentSize += bucket.map(_.sizeInBytes).sum
GaryShen2008 marked this conversation as resolved.
Show resolved Hide resolved
toAggregateBuckets += bucket
Expand Down