[FEA] try to right size hash repartition count for aggregates #11901
Labels
? - Needs Triage
Need team to review and classify
feature request
New feature or request
performance
A performance related task/issue
reliability
Features to improve reliability or bugs that severly impact the reliability of the plugin
Is your feature request related to a problem? Please describe.
In the current hash aggregation code, if the intermediate data grows too large, then we repartition it to try and have smaller pieces that can be merged to get a final result.
The code as of #11816 will recursively repartition outputs that are too large 16 ways, up to 10 times. That could result in up to 16 ^ 10 partitions being output. So for even a single 1 byte partition this could handle 1 TiB of input, which is more than we would ever need. But it also would have to run something on the order of 16^9 kernels and the output would require a crazy amount of host memory to hold the metadata. This is an unrealistic case, but we probably should still try to optimize this given that partitioning the data two ways might not be that uncommon, and if we ever do hit a case where we might need to partition it with a depth of 3, then we are wasting a lot of resources, and might be spilling a lot more than we need to.
We don't have a lot of information as we do the first pass of repartition because we want to avoid spilling, so we don't know how much data there is in total until we are done reading it. For this pass we probably want to do the repatition 16-ways, but as soon as we have read all of the data we know
Along with this we know that all of the rows in a batch have unique keys (because it has passed the initial aggregation stage)
From this we can derive that the number of unique keys (cardinality) in a bucket is between
aggregateBatches.map(_.numRows).max()
andaggregatebatches.map(_.numRows).total
With this we can do a fairly simple algorithm to decide how many batches we need to split the data into.
If after repartitioning a second time we still have batches that are too large, it probably means that we have a lot of duplicate values and we can just merge the batches to output a result.
In addition to this we should be more aggressive about releasing batches that are small enough to process instead of waiting for all of the data to be repartitioned. It is not that uncommon to have skewed keys in an aggregation. It is unlikely to be that skewed after an initial aggregation pass, but it is not impossible. So if we do a first pass repartition and some of the batches are small enough we should just merge them and release them to be processed instead of trying to repartition the data fully.
The text was updated successfully, but these errors were encountered: