Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a 'prefetch' option to ParquetRecordBatchStream to load the next row group while decoding #6676

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

masonh22
Copy link

@masonh22 masonh22 commented Nov 3, 2024

Which issue does this PR close?

Closes #6559.

Rationale for this change

This improves performance when reading from filesystems with high latency and/or low bandwidth.

What changes are included in this PR?

This adds an option to ParquetRecordBatchStream to load the next row group while decoding the current one. This adds a new state called Prefetch to the stream state. In this state, a future for the next row group is polled before returning data from the current ParquetRecordBatchReader.

Are there any user-facing changes?

There is a new option for ParquetRecordBatchStreamBuilder called prefetch that is set by a method called with_prefetch.

commit 28b2bf1
Author: Mason Hall <[email protected]>
Date:   Fri Oct 18 15:49:31 2024 -0400

    Cleaned up prefetch and added a test

commit 3d7e018
Author: Mason Hall <[email protected]>
Date:   Fri Oct 18 13:32:22 2024 -0400

    prefetch working
@github-actions github-actions bot added the parquet Changes to the parquet crate label Nov 3, 2024
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to find some time over the next week to look into this, but I'm afraid it may be a while before I have time to sit down with this, the logic here is rather subtle and breakages can be very hard to spot / detect.

},
}
StreamState::Prefetch(batch_reader, f) => {
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for doing this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid any potential overhead from using the real context when polling the future here. Since we're always returning Poll::Ready out of this state (or transitioning to another state), we don't need to rely on the real context to wake the main stream future.

I'm not an expert at async rust code though so if it would make more sense to do something else here I'm happy to make that change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern I have here is that this effectively decouples polling the underlying IO operations from the reactor. We end up polling the future more or less continuously (assuming the consumer is just polling the stream in a loop during decoding which is likely the case).

A different way to handle this would be to have a Prefetcher which can spawn the prefetch op in the background and let the runtime poll it. You can just "assume tokio" in which case the prefetcher is just tokio::spawn or try and abstract over async runtime behind a trait

@alamb
Copy link
Contributor

alamb commented Nov 15, 2024

This is still on my radar to review, I just haven't had a chance

@alamb
Copy link
Contributor

alamb commented Dec 17, 2024

Sorry that this PR is languishing -- I think it is tricky enough logic that it needs a good thorough review and (and the other maintainers haven't been able to find the time 😢 )

@Dandandan or @thinkharderdev is there any chance someone from one of your teams has the bandwidth to review this PR? I suspect it will likely be an improvement but needs some careful review

@alamb
Copy link
Contributor

alamb commented Dec 17, 2024

I apologize @masonh22 for the delay

@Xuanwo
Copy link
Member

Xuanwo commented Dec 18, 2024

Hi, I like this PR. I'm trying to find a way to enable splitting IO-bound and CPU-bound jobs. For a query engine like Databend, we run IO tasks on an async runtime and CPU-bound jobs on dedicated threads. We schedule them using our pipeline system.

Adding fetch seems like a good starting point for me. Is it possible to take it a step further by allowing users to call fetch().await and then perform next() in a separate blocking manner? The challenge might lie in designing an API that is sufficiently well-structured.

cc @alamb, I'm willing to start a new issue if you think this PR is not the right place.

@tustvold
Copy link
Contributor

FWIW I did have some ideas about exposing lower level APIs in #5522 that would allow finer grained control of decode but I've not had time, and am unlikely to have time, to write up a concrete proposal.

I agree that we should provide a way to concurrently fetch IO and do parallel decode, it is unclear that this can be made to work in the RecordBatchStream interface, or if it would need a new API as proposed in #5522

On a historical note, RecordBatchStream was itself a compromise, I originally wanted a push-based decoder #1605 but in the interests of time I went with the simpler, but more constraining option.

@masonh22
Copy link
Author

I imagine that a part of the reason why this PR has stalled is that the design isn't great. It adds a lot of complexity to a part of the code that should be kept simple. It is, however, a very convenient interface for solving the specific problem I was running into.

@Xuanwo that sounds like a much better design IMO, but I think it will be tricky to work that into ParquetRecordBatchStream. Maybe this should be broken out into something new.

@thinkharderdev
Copy link
Contributor

Sorry that this PR is languishing -- I think it is tricky enough logic that it needs a good thorough review and (and the other maintainers haven't been able to find the time 😢 )

@Dandandan or @thinkharderdev is there any chance someone from one of your teams has the bandwidth to review this PR? I suspect it will likely be an improvement but needs some careful review

I can try and find some time to review this week. Have been thinking about something along these lines as well so happy to see someone is working on it :)

@Xuanwo
Copy link
Member

Xuanwo commented Dec 19, 2024

FWIW I did have some ideas about exposing lower level APIs in #5522 that would allow finer grained control of decode but I've not had time, and am unlikely to have time, to write up a concrete proposal.

I agree that we should provide a way to concurrently fetch IO and do parallel decode, it is unclear that this can be made to work in the RecordBatchStream interface, or if it would need a new API as proposed in #5522

Hi @tustvold,

Thank you for the information—it's really helpful! I'll take a look and try out some of the ideas.

Maybe this should be broken out into something new.

Agreed. I will continue the discussion at #5522 or #1605 instead.

@alamb
Copy link
Contributor

alamb commented Dec 19, 2024

Sorry that this PR is languishing -- I think it is tricky enough logic that it needs a good thorough review and (and the other maintainers haven't been able to find the time 😢 )
@Dandandan or @thinkharderdev is there any chance someone from one of your teams has the bandwidth to review this PR? I suspect it will likely be an improvement but needs some careful review

I can try and find some time to review this week. Have been thinking about something along these lines as well so happy to see someone is working on it :)

Thank you very much @thinkharderdev

@Xuanwo
Copy link
Member

Xuanwo commented Dec 20, 2024

Hi, after building #6907, I realized it can also be applied to the use case we were aiming for here. Users can wrap their own Stream<Item=Result<RecordBatch>> based on the new API we provide.

}
StreamState::Prefetch(batch_reader, f) => {
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
match f.poll_unpin(&mut noop_cx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I have tried to think about this problem at various points I get hung up at this part. Theoretically polling the next reader here should just be driving IO (and hence be cheap). But reading the row group includes both IO and evaluating row filters which can involve non-trivial compute and memory since we evaluate the row filter over the entire row group in one shot.

To really implement pre-fetch I think we need to have finer-grained control over the pipelining of IO and cpu operations. E.g. prefetch should only do the next IO operation

},
}
StreamState::Prefetch(batch_reader, f) => {
let mut noop_cx = Context::from_waker(futures::task::noop_waker_ref());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern I have here is that this effectively decouples polling the underlying IO operations from the reactor. We end up polling the future more or less continuously (assuming the consumer is just polling the stream in a loop during decoding which is likely the case).

A different way to handle this would be to have a Prefetcher which can spawn the prefetch op in the background and let the runtime poll it. You can just "assume tokio" in which case the prefetcher is just tokio::spawn or try and abstract over async runtime behind a trait

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParquetRecordBatchStream API to fetch the next row group while decoding
5 participants