Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Experimental parquet decoder with first-class filter pushdown support #6921

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

XiangpengHao
Copy link
Contributor

@XiangpengHao XiangpengHao commented Dec 30, 2024

DO NOT MERGE.

I'll send breakdown PRs once I nailed down the designs.

Which issue does this PR close?

Many long lasting issues in DataFusion and parquet-rs, todo: find some

Closes #.

Rationale for this change

Filter pushdown is great in concept, but current straightforward implementation actually slow down queries. The goal of a super fast filter pushdown parquet reader is described by @alamb in #5523 (comment):

is that evaluating predicates in ArrowFilter (aka pushed down predicates) is never worse than decoding the columns first and then filtering them with the filter kernel

I initially thought we can find simple/smart tricks to workaround some of the issues #5523 (comment) of the current implementation. After thinking more carefully about the design spaces and the implications, I believe the only way to reach that goal is to re-structure the parquet reading pipline, and also reuse as much existing implementation as possible.

Problems of current implementation

We follow a two phase decoding:
Phase 1: Build selectors on each predicate

Empty selector -> decode column 1 -> selection 1
Selection 1 -> decode column 2 -> selection 2
…
Selection K

Phase 2: Decode parquet data using the selector

Selection K -> decode column 1
Selection K -> decode column 2
…
Selection K -> decode column N

The first problem is that we have to decode the predicate column twice, for example, if column 1 is being filtered, we need to first decode column 1 while evaluating the predicate, then decode it again to build the array.

The second problem is that, we need to decode and evaluate all the predicates before we can emit the first record batch. This not only cause high first-record-batch latency, but also making caching decoded value challenging -- we need to cache the entire column to avoid double-decoding.

I have described other issues in #5523 (comment), but they are relatively straghtforward to fix.

Proposed decoding pipeline

We can't pre-build the filter once and use it to decode data, instead, we need to build filter batch-by-batch along with decoding. The pipeline looks like this:

Load predicate columns for batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1
Load predicate columns for batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2
...
Load predicate columns for batch N -> evaluate predicates -> filter N -> load & emit batch N

Once we have this pipeline, we can cache the predicate columns for batch N and reuse it when load & emit batch N, this avoids double decoding.
However, the key challenge is to handle nesting types. The predicate columns is not an array, but a tree; same to the result columns. So the problem is not just to intersect two column lists, but also to reconstruct the predicate columns tree to the result column tree.

A workaround is to cache the decompressed pages, rather than decoded arrow arrays. As some research suggests (todo: cite myself) decompressing pages costs up to twice as much as decoding arrow, if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough. Caching decompressed pages is much simpler to implement, as we can reuse the current array_readers and just implement a new PageReader.

This PR implements caching decompressed pages.

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added parquet Changes to the parquet crate arrow Changes to the arrow crate labels Dec 30, 2024
@XiangpengHao
Copy link
Contributor Author

Would be great if @alamb @tustvold can take a look and let me know your thoughts!

@alamb
Copy link
Contributor

alamb commented Dec 30, 2024

Would be great if @alamb @tustvold can take a look and let me know your thoughts!

😍 -- looks amazing. I plan to work on arrow-rs reviews tomorrow. I;ll put this one on the top of the list

@XiangpengHao
Copy link
Contributor Author

Implemented some more optimizations and tuning, here are ClickBench numbers on my machine. TLDR: about 15% total time reduction.

We first compare no-pushdown vs our new push down implementation. Only Q27 has meaningful slow down, other queries are either similar or much faster.

The fix for Q27 requires us to actually switch to a boolean mask-based selector implementation, like the one in #6624

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ no-pushdown ┃ new-pushdown ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │      0.47ms │       0.43ms │ +1.10x faster │
│ QQuery 1     │     51.10ms │      50.10ms │     no change │
│ QQuery 2     │     68.23ms │      64.49ms │ +1.06x faster │
│ QQuery 3     │     90.68ms │      86.73ms │     no change │
│ QQuery 4     │    458.93ms │     458.59ms │     no change │
│ QQuery 5     │    522.06ms │     478.50ms │ +1.09x faster │
│ QQuery 6     │     49.84ms │      49.94ms │     no change │
│ QQuery 7     │     55.09ms │      55.77ms │     no change │
│ QQuery 8     │    565.26ms │     556.95ms │     no change │
│ QQuery 9     │    575.83ms │     575.05ms │     no change │
│ QQuery 10    │    164.56ms │     178.23ms │  1.08x slower │
│ QQuery 11    │    177.20ms │     191.32ms │  1.08x slower │
│ QQuery 12    │    591.05ms │     569.92ms │     no change │
│ QQuery 13    │    861.06ms │     848.59ms │     no change │
│ QQuery 14    │    596.20ms │     580.73ms │     no change │
│ QQuery 15    │    554.96ms │     548.77ms │     no change │
│ QQuery 16    │   1175.08ms │    1146.07ms │     no change │
│ QQuery 17    │   1150.45ms │    1121.49ms │     no change │
│ QQuery 18    │   2634.75ms │    2494.07ms │ +1.06x faster │
│ QQuery 19    │     90.15ms │      89.24ms │     no change │
│ QQuery 20    │    620.15ms │     591.67ms │     no change │
│ QQuery 21    │    782.38ms │     703.15ms │ +1.11x faster │
│ QQuery 22    │   1927.94ms │    1404.35ms │ +1.37x faster │
│ QQuery 23    │   8104.11ms │    3610.76ms │ +2.24x faster │
│ QQuery 24    │    360.79ms │     330.55ms │ +1.09x faster │
│ QQuery 25    │    290.61ms │     252.54ms │ +1.15x faster │
│ QQuery 26    │    395.18ms │     362.72ms │ +1.09x faster │
│ QQuery 27    │    891.76ms │     959.39ms │  1.08x slower │
│ QQuery 28    │   4059.54ms │    4137.37ms │     no change │
│ QQuery 29    │    235.88ms │     228.99ms │     no change │
│ QQuery 30    │    564.22ms │     584.65ms │     no change │
│ QQuery 31    │    741.20ms │     757.87ms │     no change │
│ QQuery 32    │   2652.48ms │    2574.19ms │     no change │
│ QQuery 33    │   2373.71ms │    2327.10ms │     no change │
│ QQuery 34    │   2391.00ms │    2342.15ms │     no change │
│ QQuery 35    │    700.79ms │     694.51ms │     no change │
│ QQuery 36    │    151.51ms │     152.93ms │     no change │
│ QQuery 37    │    108.18ms │      86.03ms │ +1.26x faster │
│ QQuery 38    │    114.64ms │     106.22ms │ +1.08x faster │
│ QQuery 39    │    260.80ms │     239.13ms │ +1.09x faster │
│ QQuery 40    │     60.74ms │      73.29ms │  1.21x slower │
│ QQuery 41    │     58.75ms │      67.85ms │  1.15x slower │
│ QQuery 42    │     65.49ms │      68.11ms │     no change │
└──────────────┴─────────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (no-pushdown)    │ 38344.79ms │
│ Total Time (new-pushdown)   │ 32800.50ms │
│ Average Time (no-pushdown)  │   891.74ms │
│ Average Time (new-pushdown) │   762.80ms │
│ Queries Faster              │         13 │
│ Queries Slower              │          5 │
│ Queries with No Change      │         25 │
└─────────────────────────────┴────────────┘

Now we compare our new implementation with the old pushdown implementation -- only Q23 is a bit slower, others are either faster or similar.

We do need some extra work to get the optimal performance of Q23. Nonetheless, we are faster than no-pushdown. I believe getting a fix for Q23 does not require foundamental changes to the existing decoding pipeline.

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃  pushdown ┃ new-pushdown ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    0.48ms │       0.43ms │ +1.12x faster │
│ QQuery 1     │   51.49ms │      50.10ms │     no change │
│ QQuery 2     │   67.83ms │      64.49ms │     no change │
│ QQuery 3     │   89.68ms │      86.73ms │     no change │
│ QQuery 4     │  469.88ms │     458.59ms │     no change │
│ QQuery 5     │  523.97ms │     478.50ms │ +1.10x faster │
│ QQuery 6     │   50.37ms │      49.94ms │     no change │
│ QQuery 7     │   56.89ms │      55.77ms │     no change │
│ QQuery 8     │  560.69ms │     556.95ms │     no change │
│ QQuery 9     │  583.14ms │     575.05ms │     no change │
│ QQuery 10    │  155.75ms │     178.23ms │  1.14x slower │
│ QQuery 11    │  170.31ms │     191.32ms │  1.12x slower │
│ QQuery 12    │  723.13ms │     569.92ms │ +1.27x faster │
│ QQuery 13    │ 1181.34ms │     848.59ms │ +1.39x faster │
│ QQuery 14    │  736.95ms │     580.73ms │ +1.27x faster │
│ QQuery 15    │  551.74ms │     548.77ms │     no change │
│ QQuery 16    │ 1171.99ms │    1146.07ms │     no change │
│ QQuery 17    │ 1152.34ms │    1121.49ms │     no change │
│ QQuery 18    │ 2555.82ms │    2494.07ms │     no change │
│ QQuery 19    │   84.20ms │      89.24ms │  1.06x slower │
│ QQuery 20    │  606.77ms │     591.67ms │     no change │
│ QQuery 21    │  704.86ms │     703.15ms │     no change │
│ QQuery 22    │ 1633.53ms │    1404.35ms │ +1.16x faster │
│ QQuery 23    │ 2691.84ms │    3610.76ms │  1.34x slower │
│ QQuery 24    │  528.09ms │     330.55ms │ +1.60x faster │
│ QQuery 25    │  465.38ms │     252.54ms │ +1.84x faster │
│ QQuery 26    │  562.40ms │     362.72ms │ +1.55x faster │
│ QQuery 27    │ 1121.76ms │     959.39ms │ +1.17x faster │
│ QQuery 28    │ 4455.16ms │    4137.37ms │ +1.08x faster │
│ QQuery 29    │  234.18ms │     228.99ms │     no change │
│ QQuery 30    │  596.22ms │     584.65ms │     no change │
│ QQuery 31    │  754.21ms │     757.87ms │     no change │
│ QQuery 32    │ 2570.52ms │    2574.19ms │     no change │
│ QQuery 33    │ 2357.37ms │    2327.10ms │     no change │
│ QQuery 34    │ 2377.89ms │    2342.15ms │     no change │
│ QQuery 35    │  703.78ms │     694.51ms │     no change │
│ QQuery 36    │  162.29ms │     152.93ms │ +1.06x faster │
│ QQuery 37    │  129.96ms │      86.03ms │ +1.51x faster │
│ QQuery 38    │   90.79ms │     106.22ms │  1.17x slower │
│ QQuery 39    │  220.71ms │     239.13ms │  1.08x slower │
│ QQuery 40    │   72.87ms │      73.29ms │     no change │
│ QQuery 41    │   70.04ms │      67.85ms │     no change │
│ QQuery 42    │   68.17ms │      68.11ms │     no change │
└──────────────┴───────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (pushdown)       │ 34116.80ms │
│ Total Time (new-pushdown)   │ 32800.50ms │
│ Average Time (pushdown)     │   793.41ms │
│ Average Time (new-pushdown) │   762.80ms │
│ Queries Faster              │         13 │
│ Queries Slower              │          6 │
│ Queries with No Change      │         24 │
└─────────────────────────────┴────────────┘

@XiangpengHao
Copy link
Contributor Author

XiangpengHao commented Jan 1, 2025

The implementation of course lacks tons of tests (I tried to mannually verify the clickbench results). If the high level stuff looks good, I'll try to send break down PRs that has tests and documentations.

Like most performance related PRs, some of the code changes can be very non-intuitive, please let me know and I'll try my best to explain why some codes has to implement in that way

@alamb
Copy link
Contributor

alamb commented Jan 1, 2025

Starting to check it out

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @XiangpengHao -- TLDR I think this POC looks really nice and the overall structure makes sense to me. I am willing to help review this PR as it moves closer to reality

There are obvious ways to break this PR up into pieces, which is a nice bonus -- the core caching logic is fairly localized

cc @thinkharderdev @tustvold @Dandandan @etseidl for your comments / reviews as well

I also think the description on the PR is quite good and easy to follow -- thank you for that

(todo: cite myself)

😆 my favorite part of the description

if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough.

We can also consider caching arrow as a follow on PR / project. If this initial PR effectively avoids decompressing each page twice (though it still decodes each page to arrow twice) that still seems better than the current main branch which decompresses and decodes twice.

@@ -69,6 +69,7 @@ paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }
simdutf8 = "0.1.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's the same (slightly different for error handling).

@@ -307,6 +309,9 @@ impl ByteViewArrayDecoderPlain {
num_values: Option<usize>,
validate_utf8: bool,
) -> Self {
// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be helpful to make a explicit API to create an arrow_buffer::Buffer directly from bytes::Bytes to make it clearer how to do so (and that it is zero copy)

@@ -210,6 +210,44 @@ impl ProjectionMask {
pub fn leaf_included(&self, leaf_idx: usize) -> bool {
self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true)
}

/// Union two projection masks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are nice additions -- it would be fairly straightforward to split them into their own PR.

parquet/src/file/serialized_reader.rs Outdated Show resolved Hide resolved
@@ -722,6 +747,8 @@ struct InMemoryRowGroup<'a> {
offset_index: Option<&'a [OffsetIndexMetaData]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
cache: Arc<PageCache>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like the key change -- to cache the pages

arrow-buffer/src/buffer/boolean.rs Outdated Show resolved Hide resolved

// Approach 1 has the drawback of extra overhead of coalesce batch, which can be painful to be efficient.
// Code below implements approach 2, where we keep consuming the selection until we select at least 3/4 of the batch size.
// It boils down to leveraging array_reader's ability to collect large batches natively,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this rationale makes sense

/// A simple cache for decompressed pages.
/// We cache only one dictionary page and one data page per column
pub(crate) struct PageCache {
inner: RwLock<PageCacheInner>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would help me understand how this works if we could add some comments about how col_id was assigned (is it derived from some Parquet data or is it assigned by the cache / reader somehow) 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added much more documentations to explain what's going on in the PageCache!

}

struct CachedPage {
dict: Option<(usize, Page)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are (offset, Page) tuples which might be nice to explain in comments

}

impl PageCache {
const CAPACITY: usize = 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be clearer if it was called INITIAL_CAPACITY or something as I didn't see code that limits the overall size of the cache

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice @XiangpengHao. I think this makes a lot of sense.

/// ## How to identify a page
/// We use the page offset (the offset to the Parquet file) to uniquely identify a page.
pub(crate) struct PredicatePageCache {
inner: RwLock<PredicatePageCacheInner>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an RwLock here? Seems like we should have a mut reference to this guy when needed? I see that CachedPageReader takes an Arc<PredicatePageCache> but not sure I understand why we need an Arc there?

Edit: Ok I see now, seems like we would need to change RowGroups::column_chunks to take a mutable reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the RowGroups::column_chunks only take immutable self.
Also the RefCell won't work as well, because the PageReader need to be Send

return Ok(None);
};

let page = self.cache.get_page(self.col_id, offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not a big deal, but this should always be uncontended so seems like it would be better on net to wrap PredicatePageCacheInner in Mutex instead of RwLock and grap a mut reference here for the duration so we don't need to acquire the lock again in case of a cache miss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do soon

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants