Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only create one native plan for a query on an executor #1204

Open
viirya opened this issue Dec 29, 2024 · 9 comments · May be fixed by #1203
Open

Only create one native plan for a query on an executor #1204

viirya opened this issue Dec 29, 2024 · 9 comments · May be fixed by #1203
Labels
enhancement New feature or request

Comments

@viirya
Copy link
Member

viirya commented Dec 29, 2024

What is the problem the feature request solves?

Currently we create a native plan per task for a query. Even those tasks are on same executor, they still have separate native plans. For the approach, one problem is the additional cost for deserializing the same query plan multiple times and extra memory storing duplicate native plans.

Describe the potential solution

No response

Additional context

No response

@viirya viirya added the enhancement New feature or request label Dec 29, 2024
@andygrove
Copy link
Member

One of the challenges related to this is that we start executing ScanExec during construction, as part of query planning. If we want to share one plan per executor (which I agree would be good), then we will need to postpone execution until the execute function is called.

@viirya
Copy link
Member Author

viirya commented Dec 31, 2024

Okay. I forgot this point. It should be the cause of many test failures in the draft PR #1203. I think it is a good direction to go, not just for performance but also for one possible feature I would like to achieve (the feature is only possible when only one native plan on an executor).

@viirya
Copy link
Member Author

viirya commented Dec 31, 2024

I will think if there is a possible way to work around the issue.

@andygrove
Copy link
Member

I've been considering proposing a change in DataFusion to relax the rules when creating record batches in an operator. Rather than requiring the same physical types for each batch, it would be better just to enforce the same logical type and then coerce the physical types when needed. For example, with a string column, we would then be able to use either Utf8 or Dictionary<_, Utf8>.

I'm not sure how large a change it would be, but I think that would solve our problem.

@viirya
Copy link
Member Author

viirya commented Dec 31, 2024

It seems a fundamental rule in DataFusion physical plan. In many places, physical schema of children operator are used. So once any difference is found between physical schema during planning and actual physical schema during execution, an error would be happened. In these places, some type coercions might be needed to add, maybe it also requires some array casting during execution.

@andygrove
Copy link
Member

With the new Parquet POC 1 & 2, we will use ParquetExec instead of the current ScanExec, so at leat for that case the schema will already be known and we will no longer need to fetch the first batch to determine it.

This doesn't help with other uses of ScanExec though.

@viirya
Copy link
Member Author

viirya commented Jan 6, 2025

If ScanExec will be rarely used and we would like to use ParquetExec for most time, maybe I can just add an internal cast to ScanExec if the schema is different. Though it might hurt performance a little bit. But if it is for rare case, it should be acceptable.

@andygrove
Copy link
Member

We'll still use ScanExec for shuffle reader though. The main reason for the initial batch scan is to determine if strings are dictionary-encoded or not. We then cast all batches to match the first batch (either unpacking dictionaries or forcing dictionary encoding). We always unpack dictionaries (in CopyExec) before a Sort or a Join anyway, so maybe we should just unpack them directly in ScanExec if there is no performance impact. I did experiment with this before but I do not remember what the performance impact was but I think it was small.

@viirya
Copy link
Member Author

viirya commented Jan 6, 2025

Okay. Then seems we can get rid of first batch fetch in ScanExec and assign the scan schema from Spark. I will make a try.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants