Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Spill Hash Aggregation #89

Merged
merged 3 commits into from
Dec 4, 2024
Merged

RFC: Spill Hash Aggregation #89

merged 3 commits into from
Dec 4, 2024

Conversation

chenzl25
Copy link
Contributor

@chenzl25 chenzl25 commented May 13, 2024

@fuyufjh
Copy link
Member

fuyufjh commented May 13, 2024

The design looks good to me. 👍

I am thinking about another approach. Instead of adapting the HashAgg/HashJoin to be spill-able by using partitioning, we might just replace the in-memory hash table with some disk-based structure, such as B-Tree e.g. sled, LSM-Tree e.g. RocksDB or on-disk hash table e.g. odht. The major benefit of this disk-based approach is that it doesn't consume any memory except some page cache, which is always safe to evict by the OS, so that the memory manager doesn't need to worry about batch memory consumption at all.

I do agree that hybrid hash agg/join (this proposal) is the most commonly used approach in database area, which implies it may have the best performance when memory is not a bottleneck. But I am also curious about the performance gap between it and the disk-based approach. If the performance gap is small, it might be a good fit for RisingWave's case.

@chenzl25
Copy link
Contributor Author

I am thinking about another approach. Instead of adapting the HashAgg/HashJoin to be spill-able by using partitioning, we might just replace the in-memory hash table with some disk-based structure, such as B-Tree e.g. sled, LSM-Tree e.g. RocksDB or on-disk hash table e.g. odht.

Some drawbacks I come up with using a disk-based structure:

  1. We can't monitor the memory consumed by this structure precisely and monitoring the memory consumption in batch query is important, because whether to spill or throw an OOM error to users is determined by the memory monitor.
  2. Considering the data size of the current query is too large to be resident in memory, the IO should be the bottleneck. Therefore, using sequential IO or random IO matters. Using a disk-based structure potentially would introduce random IOs which perhaps could be mitigated with OS page cache, but I think when memory is not enough, page cache might not help in this case.
  3. Monitoring the disk IO is also important after spilling is enabled. With a disk-based structure, it might be hard to monitor the actual IO.


## Unresolved questions

The above algorithm relies on the aggregation state that could be serialized. As far as I know, if the agg state is `AggregateState::Any`, they can't encode the state well, so this algorithm is only applicable to `AggregateState::Datum`. I think the most common aggregation function we know e.g. `count`, `sum`, `min`, `max` belongs to `AggregateState::Datum`, so it is fine. Any improvement later are welcome.
Copy link
Member

@fuyufjh fuyufjh May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may convert the AggregateState (of multiple groups, together) into some form of StreamChunks first, and then reuse the serialization of StreamChunk, so that it can benefit from the columnar format of StreamChunk

[proto_len]
[proto_bytes]

```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very sure, should we add an CRC here? Personally, I tend to add CRC for any persisted data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we can add a CRC at the end of the file.


### Partitions

First, we need to choose a partition number to partition the hash table and input chunks. After partitioning, theoretically, each partition would only contain 1/partition_num of the original data. If this size could be fitted in the memory, we can process the HashAgg partition by partition. If this size is still too large to be fitted in the memory, we need to recursively apply the spill algorithm. When recursively applying the spill algorithm, we need to make sure they use different hash functions to avoid data skew.
Copy link
Contributor

@kwannoel kwannoel Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When recursively applying the spill algorithm

Could you elaborate on this? Why would we need to recursively apply the spill algorithm? Don't we just spill the entire partition to disk each time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use vnode, instead of a separate partition strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need to recursively apply the spill algorithm?

Because we don't know how much data the input has. Even if we have 20 partitions by default, a single partition data size still could be too large to fit in memory, so we need to spill the partition again, i.e. recursively apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use vnode, instead of a separate partition strategy?
For batch query, we don't always have a vnode. Vnode is only associated with the data within a table, but batch hash join input could be intermediate data.

@fuyufjh fuyufjh merged commit 24e8e6e into main Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants