Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

address some comments for 11792 #11816

Open
wants to merge 3 commits into
base: branch-25.02
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,12 @@ object AggregateUtils extends Logging {
helper: AggHelper,
hashKeys: Seq[GpuExpression],
hashBucketNum: Int,
hashSeed: Int,
baseHashSeed: Int,
recursiveDepth: Int,
maxRecursiveDepth: Int,
batchesByBucket: ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]
): Boolean = {

val hashSeed = baseHashSeed + recursiveDepth * 7
var repartitionHappened = false

def repartitionAndClose(batch: SpillableColumnarBatch): Unit = {
Expand Down Expand Up @@ -277,25 +279,32 @@ object AggregateUtils extends Logging {

val newBuckets = batchesByBucket.flatMap(bucket => {
if (needRepartitionAgain(bucket)) {
if (hashSeed + 7 > 200) {
log.warn("Too many times of repartition, may hit a bug? Size for each batch in " +
"current bucket: " + bucket.map(_.sizeInBytes).mkString(", ") + " rows: " +
bucket.map(_.numRows()).mkString(", ") + " targetMergeBatchSize: "
+ targetMergeBatchSize)
ArrayBuffer.apply(bucket)
if (recursiveDepth >= maxRecursiveDepth) {
// Normally this should not happen, because we are repartitioning data that has
// already gone through first round of aggregation, so there shouldn't be too many
// duplicated rows (the duplication only happens in different batches) to prevent
// repartitioning out (considering we're changing seed each time we repartition).
// However, for some test cases with really small batch size, this can happen. So
// we're just logging some warnings here.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the max to a higher number and throw?

Copy link
Collaborator Author

@binmahone binmahone Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid not. Even if we change the max repartition number from 10 to 20, there will still be failed test cases because of this. Actually, given a small enough batch size, we can always create a contrived case where it "can not repartiton out"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about this a little more and I think we do need to throw an exception, but we might need to fix some other issues first. We should be doing some debugging to understand fully why the tests are outputting this warning/failing.

Our tests are a little crazy because we can set the batch size to be very small, which can expose some bad cases. But if we do the math this should not be that big of a problem. By default we split the input data 16 ways on each pass. We set the batch size bytes to 250 bytes in some extreme tests. That means if we have a limit of 9 times that we can partition the data before we get a warning that means we have (250 * 16 ^ 9). That is 15.625 TiB of data, assuming that there are no hash collisions.

That said we should be able to know how many partitions we need to do up front because we know the size, row count, and batch count for all of the input data. We used 16 on the join because we didn't know how much data would be processed yet.

I think what is more likely happening is that because we have lots of small batches, after a single pass of the aggregate no output batches have been reduced in size. So then we start trying to partition, but one or two rows for the same key might be large enough that we cannot hit that 250 byte batch size Because of that it does not matter how many times we re-partition the data it will never be small enough.

That is a live lock situation I want to avoid. So we either need to throw an exception when we hit this case or we need to just give up trying to make the data smaller and just cross our fingers and hope that we have enough memory to make it work.

But this is just me speculating. Someone needs to debug what is happening with the failing tests and understand how we can fix them.

Copy link
Collaborator Author

@binmahone binmahone Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I can look into it for a further look. But it might take a while and this PR will be pending longer, is that ok to you (considering this is a follow-up work of #11792 )? @revans2

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some debugging of my own and I understand what is happening. I reproduced the issue by running

env TEST_PARALLEL=0 MAX_PARALLEL=20 DATAGEN_SEED=1732770215 ./run_pyspark_from_build.sh -k 'test_hash_multiple_mode_query' -s 2>&1 | tee log.txt

against spark 3.4.0. Not all of the tests failed, but two of them did. I also added in a lot of debugging.

It turns out that the problem is because of how Spark handles NULL in a hash. A NULL is a noop and returns the seed unchanged. This can lead to a high number of hash collisions. So if we are hashing two columns (NaN, NULL) and (NULL, NaN) (in this case), then the two have the same hash, but different values.

So if we have a very small batch size (like in the tests) and NULLs show up in the data we can run into situations like this.

    // Deal with the over sized buckets
    def needRepartitionAgain(bucket: AutoClosableArrayBuffer[SpillableColumnarBatch]) = {
      bucket.map(_.sizeInBytes).sum > targetMergeBatchSize &&
        bucket.size() != 1 &&
        !bucket.forall(_.numRows() == 1) // this is for test
    }

needRepartionAgain has a check to see if all of the batches have a single row in them to avoid issues with the tests, but technically this is flawed because our hash function is flawed. Using the CUDF implementation instead of the Spark one might fix the issue, but there could be other issues hiding in the hash function. As such I am fine with how the code is now, but I would like a follow on issue where we explore the idea of doing the re-partition once, because we know the size of the input data.

Copy link
Collaborator Author

@binmahone binmahone Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Bobby, I think we don't know the size of the input data ? In previous sort based agg implementation, all input batches are fetched before doing sort, but in current repartition based agg implementation, we repartition each input batch before next input batch is fetched. We did so to avoid unnecessary spills.

Copy link
Collaborator

@revans2 revans2 Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true for the first pass. But if a second pass is needed we know that size of the data because we have processed the entire input at that point and made it spillable. We know a lot at that point.

  1. The size of the input data in bytes
  2. The number of rows total and per batch
  3. The number of input batches (although my debugging showed that we are putting a lot of empty batches in here and should probably filter them out)

Along with this we know that all of the rows in a batch have unique keys (because it has passed the merge stage)

From this we can derive that the number of unique keys in the bucket is between aggregateBatches.map(_.numRows).max() and aggregatebatches.map(_numRows).total

That means if we have a good hash implementation, which we do not but can probably fix, then we should be able to split the input so for a second pass so that we have

val minCardinality = aggreateBatches.map(_.numRows).max()
val targetBatchesBySize = aggregateBatches.map(_.size).sum() / targetBatchSize
val numBuckets = min(minCardinality, targetBatchesBySize)

If after re-partitioning it the second time, then we have probably done the best we could possibly have done and we should just try and make the aggregation work. This should make the worst case situation much better because we will only go through the data 3 times at the most instead of the 11 times max that we have today.

  1. to read it in and repartition it once
  2. to read in repartitioned data and repartition it a second time
  3. to read in second order repartitioned data and do the merge aggregation.

The second optimization that I would make (especially if we want to reduce spilling). Is to not re-partition everything in one go.

The order of operations should be something like the following. Note there is a lot of hand waving here.

val buckets = repartition_pass_1
val remaining = mergeSmallEnoughBucketsAndReturnFromIterator(buckets)
remaining.for_each { tooLargeBucket =>
  val buckets = repartition_pass_2(tooLargeBucket)
  mergeBucketsAndRetrunFromIterator(buckets)
}

This way we will have released as much spilled data as we can after the first repartition pass instead of holding on to it just so we can finish repartitioning everything. It should not reduce the maximum memory used, but it should reduce how long we are at that maximum memory used mark.

I'm not sure if this is going to help in practice. A good hash algorithm should spread the data fairly evenly so we should not see much skew/hash collisions.

log.warn(s"The bucket is still too large after $recursiveDepth repartitions. " +
s"See https://github.com/NVIDIA/spark-rapids/issues/11834. " +
s"Sizes for each batch in current bucket: ${bucket.map(_.sizeInBytes).mkString(", ")}"
+ s" rows: ${bucket.map(_.numRows()).mkString(", ")}" +
s" targetMergeBatchSize: $targetMergeBatchSize")
ArrayBuffer(bucket)
} else {
val nextLayerBuckets =
ArrayBuffer.fill(hashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]())
// Recursively merge and repartition the over sized bucket
repartitionHappened =
iterateAndRepartition(
new CloseableBufferedIterator(bucket.iterator), metrics, targetMergeBatchSize,
helper, hashKeys, hashBucketNum, hashSeed + 7,
nextLayerBuckets) || repartitionHappened
helper, hashKeys, hashBucketNum, baseHashSeed, recursiveDepth + 1,
maxRecursiveDepth, nextLayerBuckets) || repartitionHappened
nextLayerBuckets
}
} else {
ArrayBuffer.apply(bucket)
ArrayBuffer(bucket)
}
})
batchesByBucket.clear()
Expand Down Expand Up @@ -920,7 +929,8 @@ class GpuMergeAggregateIterator(
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize)

private[this] val defaultHashBucketNum = 16
private[this] val defaultHashSeed = 107
private[this] val maxLevelsOfRepartition = 10 // this is the max level for recursive repartition
private[this] val baseHashSeed = 107 // this is the seed used for first level for repartition
private[this] var batchesByBucket =
ArrayBuffer.fill(defaultHashBucketNum)(new AutoClosableArrayBuffer[SpillableColumnarBatch]())

Expand Down Expand Up @@ -952,7 +962,7 @@ class GpuMergeAggregateIterator(

// Handle reduction-only aggregation
if (isReductionOnly) {
val batches = ArrayBuffer.apply[SpillableColumnarBatch]()
val batches = ArrayBuffer[SpillableColumnarBatch]()
while (firstPassIter.hasNext) {
batches += firstPassIter.next()
}
Expand Down Expand Up @@ -984,7 +994,7 @@ class GpuMergeAggregateIterator(
s"$firstPassReductionRatioEstimate")
// if so, skip any aggregation, return the origin batch directly

realIter = Some(ConcatIterator.apply(firstPassIter, configuredTargetBatchSize))
realIter = Some(ConcatIterator(firstPassIter, configuredTargetBatchSize))
metrics.numTasksSkippedAgg += 1
return realIter.get.next()
} else {
Expand All @@ -1009,12 +1019,12 @@ class GpuMergeAggregateIterator(
configuredTargetBatchSize,
concatAndMergeHelper)
, metrics, targetMergeBatchSize, concatAndMergeHelper,
hashKeys, defaultHashBucketNum, defaultHashSeed, batchesByBucket)
hashKeys, defaultHashBucketNum, baseHashSeed, 0, maxLevelsOfRepartition, batchesByBucket)
if (repartitionHappened) {
metrics.numTasksRepartitioned += 1
}

realIter = Some(ConcatIterator.apply(
realIter = Some(ConcatIterator(
new CloseableBufferedIterator(buildBucketIterator()), configuredTargetBatchSize))
realIter.get.next()
}
Expand Down Expand Up @@ -1047,8 +1057,12 @@ class GpuMergeAggregateIterator(
val spillCbs = ArrayBuffer[SpillableColumnarBatch]()
var totalBytes = 0L
closeOnExcept(spillCbs) { _ =>
while (input.hasNext && (spillCbs.isEmpty ||
(totalBytes + input.head.sizeInBytes) < targetSize)) {
while (input.hasNext && (
// for some test cases targetSize is too small to fit any SpillableColumnarBatch,
// in this case we put the first SpillableColumnarBatch into spillCbs anyway
// refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples
spillCbs.isEmpty ||
(totalBytes + input.head.sizeInBytes) <= targetSize)) {
val tmp = input.next
totalBytes += tmp.sizeInBytes
spillCbs += tmp
Expand Down Expand Up @@ -1080,8 +1094,13 @@ class GpuMergeAggregateIterator(
closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) {
toAggregateBuckets =>
var currentSize = 0L
while (batchesByBucket.nonEmpty && (toAggregateBuckets.isEmpty ||
batchesByBucket.last.size() + currentSize < targetMergeBatchSize)) {
while (batchesByBucket.nonEmpty &&
(
// for some test cases targetMergeBatchSize is too small to fit any bucket,
// in this case we put the first bucket into toAggregateBuckets anyway
// refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples
toAggregateBuckets.isEmpty ||
batchesByBucket.last.size() + currentSize <= targetMergeBatchSize)) {
val bucket = batchesByBucket.remove(batchesByBucket.size - 1)
currentSize += bucket.map(_.sizeInBytes).sum
toAggregateBuckets += bucket
Expand Down
Loading