-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Spill Hash Aggregation #89
Conversation
The design looks good to me. 👍 I am thinking about another approach. Instead of adapting the HashAgg/HashJoin to be spill-able by using partitioning, we might just replace the in-memory hash table with some disk-based structure, such as B-Tree e.g. sled, LSM-Tree e.g. RocksDB or on-disk hash table e.g. odht. The major benefit of this disk-based approach is that it doesn't consume any memory except some page cache, which is always safe to evict by the OS, so that the memory manager doesn't need to worry about batch memory consumption at all. I do agree that hybrid hash agg/join (this proposal) is the most commonly used approach in database area, which implies it may have the best performance when memory is not a bottleneck. But I am also curious about the performance gap between it and the disk-based approach. If the performance gap is small, it might be a good fit for RisingWave's case. |
Some drawbacks I come up with using a disk-based structure:
|
|
||
## Unresolved questions | ||
|
||
The above algorithm relies on the aggregation state that could be serialized. As far as I know, if the agg state is `AggregateState::Any`, they can't encode the state well, so this algorithm is only applicable to `AggregateState::Datum`. I think the most common aggregation function we know e.g. `count`, `sum`, `min`, `max` belongs to `AggregateState::Datum`, so it is fine. Any improvement later are welcome. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may convert the AggregateState
(of multiple groups, together) into some form of StreamChunk
s first, and then reuse the serialization of StreamChunk
, so that it can benefit from the columnar format of StreamChunk
[proto_len] | ||
[proto_bytes] | ||
|
||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very sure, should we add an CRC here? Personally, I tend to add CRC for any persisted data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, we can add a CRC at the end of the file.
|
||
### Partitions | ||
|
||
First, we need to choose a partition number to partition the hash table and input chunks. After partitioning, theoretically, each partition would only contain 1/partition_num of the original data. If this size could be fitted in the memory, we can process the HashAgg partition by partition. If this size is still too large to be fitted in the memory, we need to recursively apply the spill algorithm. When recursively applying the spill algorithm, we need to make sure they use different hash functions to avoid data skew. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When recursively applying the spill algorithm
Could you elaborate on this? Why would we need to recursively apply the spill algorithm? Don't we just spill the entire partition to disk each time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use vnode, instead of a separate partition strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we need to recursively apply the spill algorithm?
Because we don't know how much data the input has. Even if we have 20 partitions by default, a single partition data size still could be too large to fit in memory, so we need to spill the partition again, i.e. recursively apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use vnode, instead of a separate partition strategy?
For batch query, we don't always have a vnode. Vnode is only associated with the data within a table, but batch hash join input could be intermediate data.
Co-authored-by: Noel Kwan <[email protected]>
Tracking issue: risingwavelabs/risingwave#16615