-
Notifications
You must be signed in to change notification settings - Fork 824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a 'prefetch' option to ParquetRecordBatchStream
to load the next row group while decoding
#6676
base: main
Are you sure you want to change the base?
Conversation
commit 28b2bf1 Author: Mason Hall <[email protected]> Date: Fri Oct 18 15:49:31 2024 -0400 Cleaned up prefetch and added a test commit 3d7e018 Author: Mason Hall <[email protected]> Date: Fri Oct 18 13:32:22 2024 -0400 prefetch working
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to find some time over the next week to look into this, but I'm afraid it may be a while before I have time to sit down with this, the logic here is rather subtle and breakages can be very hard to spot / detect.
}, | ||
} | ||
StreamState::Prefetch(batch_reader, f) => { | ||
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the rationale for doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid any potential overhead from using the real context when polling the future here. Since we're always returning Poll::Ready out of this state (or transitioning to another state), we don't need to rely on the real context to wake the main stream future.
I'm not an expert at async rust code though so if it would make more sense to do something else here I'm happy to make that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern I have here is that this effectively decouples polling the underlying IO operations from the reactor. We end up polling the future more or less continuously (assuming the consumer is just polling the stream in a loop during decoding which is likely the case).
A different way to handle this would be to have a Prefetcher
which can spawn the prefetch op in the background and let the runtime poll it. You can just "assume tokio" in which case the prefetcher is just tokio::spawn
or try and abstract over async runtime behind a trait
This is still on my radar to review, I just haven't had a chance |
Sorry that this PR is languishing -- I think it is tricky enough logic that it needs a good thorough review and (and the other maintainers haven't been able to find the time 😢 ) @Dandandan or @thinkharderdev is there any chance someone from one of your teams has the bandwidth to review this PR? I suspect it will likely be an improvement but needs some careful review |
I apologize @masonh22 for the delay |
Hi, I like this PR. I'm trying to find a way to enable splitting IO-bound and CPU-bound jobs. For a query engine like Databend, we run IO tasks on an async runtime and CPU-bound jobs on dedicated threads. We schedule them using our pipeline system. Adding cc @alamb, I'm willing to start a new issue if you think this PR is not the right place. |
FWIW I did have some ideas about exposing lower level APIs in #5522 that would allow finer grained control of decode but I've not had time, and am unlikely to have time, to write up a concrete proposal. I agree that we should provide a way to concurrently fetch IO and do parallel decode, it is unclear that this can be made to work in the RecordBatchStream interface, or if it would need a new API as proposed in #5522 On a historical note, RecordBatchStream was itself a compromise, I originally wanted a push-based decoder #1605 but in the interests of time I went with the simpler, but more constraining option. |
I imagine that a part of the reason why this PR has stalled is that the design isn't great. It adds a lot of complexity to a part of the code that should be kept simple. It is, however, a very convenient interface for solving the specific problem I was running into. @Xuanwo that sounds like a much better design IMO, but I think it will be tricky to work that into ParquetRecordBatchStream. Maybe this should be broken out into something new. |
I can try and find some time to review this week. Have been thinking about something along these lines as well so happy to see someone is working on it :) |
Hi @tustvold, Thank you for the information—it's really helpful! I'll take a look and try out some of the ideas.
Agreed. I will continue the discussion at #5522 or #1605 instead. |
Thank you very much @thinkharderdev |
Hi, after building #6907, I realized it can also be applied to the use case we were aiming for here. Users can wrap their own |
} | ||
StreamState::Prefetch(batch_reader, f) => { | ||
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref()); | ||
match f.poll_unpin(&mut noop_cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I have tried to think about this problem at various points I get hung up at this part. Theoretically polling the next reader here should just be driving IO (and hence be cheap). But reading the row group includes both IO and evaluating row filters which can involve non-trivial compute and memory since we evaluate the row filter over the entire row group in one shot.
To really implement pre-fetch I think we need to have finer-grained control over the pipelining of IO and cpu operations. E.g. prefetch should only do the next IO operation
}, | ||
} | ||
StreamState::Prefetch(batch_reader, f) => { | ||
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concern I have here is that this effectively decouples polling the underlying IO operations from the reactor. We end up polling the future more or less continuously (assuming the consumer is just polling the stream in a loop during decoding which is likely the case).
A different way to handle this would be to have a Prefetcher
which can spawn the prefetch op in the background and let the runtime poll it. You can just "assume tokio" in which case the prefetcher is just tokio::spawn
or try and abstract over async runtime behind a trait
Which issue does this PR close?
Closes #6559.
Rationale for this change
This improves performance when reading from filesystems with high latency and/or low bandwidth.
What changes are included in this PR?
This adds an option to
ParquetRecordBatchStream
to load the next row group while decoding the current one. This adds a new state calledPrefetch
to the stream state. In this state, a future for the next row group is polled before returning data from the currentParquetRecordBatchReader
.Are there any user-facing changes?
There is a new option for
ParquetRecordBatchStreamBuilder
calledprefetch
that is set by a method calledwith_prefetch
.