-
Notifications
You must be signed in to change notification settings - Fork 834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC] Experimental parquet decoder with first-class filter pushdown support #6921
base: main
Are you sure you want to change the base?
Conversation
Implemented some more optimizations and tuning, here are ClickBench numbers on my machine. TLDR: about 15% total time reduction. We first compare no-pushdown vs our new push down implementation. Only Q27 has meaningful slow down, other queries are either similar or much faster. The fix for Q27 requires us to actually switch to a boolean mask-based selector implementation, like the one in #6624
Now we compare our new implementation with the old pushdown implementation -- only Q23 is a bit slower, others are either faster or similar. We do need some extra work to get the optimal performance of Q23. Nonetheless, we are faster than no-pushdown. I believe getting a fix for Q23 does not require foundamental changes to the existing decoding pipeline.
|
The implementation of course lacks tons of tests (I tried to mannually verify the clickbench results). If the high level stuff looks good, I'll try to send break down PRs that has tests and documentations. Like most performance related PRs, some of the code changes can be very non-intuitive, please let me know and I'll try my best to explain why some codes has to implement in that way |
Starting to check it out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @XiangpengHao -- TLDR I think this POC looks really nice and the overall structure makes sense to me. I am willing to help review this PR as it moves closer to reality
There are obvious ways to break this PR up into pieces, which is a nice bonus -- the core caching logic is fairly localized
cc @thinkharderdev @tustvold @Dandandan @etseidl for your comments / reviews as well
I also think the description on the PR is quite good and easy to follow -- thank you for that
(todo: cite myself)
😆 my favorite part of the description
if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough.
We can also consider caching arrow as a follow on PR / project. If this initial PR effectively avoids decompressing each page twice (though it still decodes each page to arrow twice) that still seems better than the current main
branch which decompresses and decodes twice.
@@ -69,6 +69,7 @@ paste = { version = "1.0" } | |||
half = { version = "2.1", default-features = false, features = ["num-traits"] } | |||
sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] } | |||
crc32fast = { version = "1.4.2", optional = true, default-features = false } | |||
simdutf8 = "0.1.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Is this the same as Faster parquet utf8 validation using
simdjson
#6668 from @Dandandan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's the same (slightly different for error handling).
@@ -307,6 +309,9 @@ impl ByteViewArrayDecoderPlain { | |||
num_values: Option<usize>, | |||
validate_utf8: bool, | |||
) -> Self { | |||
// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy | |||
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- @kylebarron found something similar in Add doctest example for
Buffer::from_bytes
#6920
Maybe it would be helpful to make a explicit API to create an arrow_buffer::Buffer
directly from bytes::Bytes
to make it clearer how to do so (and that it is zero copy)
@@ -210,6 +210,44 @@ impl ProjectionMask { | |||
pub fn leaf_included(&self, leaf_idx: usize) -> bool { | |||
self.mask.as_ref().map(|m| m[leaf_idx]).unwrap_or(true) | |||
} | |||
|
|||
/// Union two projection masks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are nice additions -- it would be fairly straightforward to split them into their own PR.
@@ -722,6 +747,8 @@ struct InMemoryRowGroup<'a> { | |||
offset_index: Option<&'a [OffsetIndexMetaData]>, | |||
column_chunks: Vec<Option<Arc<ColumnChunkData>>>, | |||
row_count: usize, | |||
cache: Arc<PageCache>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like the key change -- to cache the pages
|
||
// Approach 1 has the drawback of extra overhead of coalesce batch, which can be painful to be efficient. | ||
// Code below implements approach 2, where we keep consuming the selection until we select at least 3/4 of the batch size. | ||
// It boils down to leveraging array_reader's ability to collect large batches natively, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this rationale makes sense
/// A simple cache for decompressed pages. | ||
/// We cache only one dictionary page and one data page per column | ||
pub(crate) struct PageCache { | ||
inner: RwLock<PageCacheInner>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would help me understand how this works if we could add some comments about how col_id
was assigned (is it derived from some Parquet data or is it assigned by the cache / reader somehow) 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added much more documentations to explain what's going on in the PageCache
!
} | ||
|
||
struct CachedPage { | ||
dict: Option<(usize, Page)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are (offset, Page)
tuples which might be nice to explain in comments
} | ||
|
||
impl PageCache { | ||
const CAPACITY: usize = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be clearer if it was called INITIAL_CAPACITY
or something as I didn't see code that limits the overall size of the cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice @XiangpengHao. I think this makes a lot of sense.
/// ## How to identify a page | ||
/// We use the page offset (the offset to the Parquet file) to uniquely identify a page. | ||
pub(crate) struct PredicatePageCache { | ||
inner: RwLock<PredicatePageCacheInner>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need an RwLock
here? Seems like we should have a mut
reference to this guy when needed? I see that CachedPageReader
takes an Arc<PredicatePageCache>
but not sure I understand why we need an Arc
there?
Edit: Ok I see now, seems like we would need to change RowGroups::column_chunks
to take a mutable reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the RowGroups::column_chunks
only take immutable self.
Also the RefCell
won't work as well, because the PageReader
need to be Send
return Ok(None); | ||
}; | ||
|
||
let page = self.cache.get_page(self.col_id, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not a big deal, but this should always be uncontended so seems like it would be better on net to wrap PredicatePageCacheInner
in Mutex
instead of RwLock
and grap a mut reference here for the duration so we don't need to acquire the lock again in case of a cache miss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do soon
94a3f67
to
eae2850
Compare
DO NOT MERGE.
I'll send breakdown PRs once I nailed down the designs.
Which issue does this PR close?
Many long lasting issues in DataFusion and parquet-rs, todo: find some
Closes #.
Rationale for this change
Filter pushdown is great in concept, but current straightforward implementation actually slow down queries. The goal of a super fast filter pushdown parquet reader is described by @alamb in #5523 (comment):
I initially thought we can find simple/smart tricks to workaround some of the issues #5523 (comment) of the current implementation. After thinking more carefully about the design spaces and the implications, I believe the only way to reach that goal is to re-structure the parquet reading pipline, and also reuse as much existing implementation as possible.
Problems of current implementation
We follow a two phase decoding:
Phase 1: Build selectors on each predicate
Phase 2: Decode parquet data using the selector
The first problem is that we have to decode the predicate column twice, for example, if column 1 is being filtered, we need to first decode column 1 while evaluating the predicate, then decode it again to build the array.
The second problem is that, we need to decode and evaluate all the predicates before we can emit the first record batch. This not only cause high first-record-batch latency, but also making caching decoded value challenging -- we need to cache the entire column to avoid double-decoding.
I have described other issues in #5523 (comment), but they are relatively straghtforward to fix.
Proposed decoding pipeline
We can't pre-build the filter once and use it to decode data, instead, we need to build filter batch-by-batch along with decoding. The pipeline looks like this:
Once we have this pipeline, we can cache the
predicate columns for batch N
and reuse it whenload & emit batch N
, this avoids double decoding.However, the key challenge is to handle nesting types. The
predicate columns
is not an array, but a tree; same to the result columns. So the problem is not just to intersect two column lists, but also to reconstruct the predicate columns tree to the result column tree.A workaround is to cache the decompressed pages, rather than decoded arrow arrays. As some research suggests (todo: cite myself) decompressing pages costs up to twice as much as decoding arrow, if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough. Caching decompressed pages is much simpler to implement, as we can reuse the current array_readers and just implement a new PageReader.
This PR implements caching decompressed pages.
What changes are included in this PR?
Are there any user-facing changes?