-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Only create one native plan for a query on an executor #1204
Comments
One of the challenges related to this is that we start executing |
Okay. I forgot this point. It should be the cause of many test failures in the draft PR #1203. I think it is a good direction to go, not just for performance but also for one possible feature I would like to achieve (the feature is only possible when only one native plan on an executor). |
I will think if there is a possible way to work around the issue. |
I've been considering proposing a change in DataFusion to relax the rules when creating record batches in an operator. Rather than requiring the same physical types for each batch, it would be better just to enforce the same logical type and then coerce the physical types when needed. For example, with a string column, we would then be able to use either I'm not sure how large a change it would be, but I think that would solve our problem. |
It seems a fundamental rule in DataFusion physical plan. In many places, physical schema of children operator are used. So once any difference is found between physical schema during planning and actual physical schema during execution, an error would be happened. In these places, some type coercions might be needed to add, maybe it also requires some array casting during execution. |
With the new Parquet POC 1 & 2, we will use ParquetExec instead of the current ScanExec, so at leat for that case the schema will already be known and we will no longer need to fetch the first batch to determine it. This doesn't help with other uses of ScanExec though. |
If ScanExec will be rarely used and we would like to use ParquetExec for most time, maybe I can just add an internal cast to ScanExec if the schema is different. Though it might hurt performance a little bit. But if it is for rare case, it should be acceptable. |
We'll still use ScanExec for shuffle reader though. The main reason for the initial batch scan is to determine if strings are dictionary-encoded or not. We then cast all batches to match the first batch (either unpacking dictionaries or forcing dictionary encoding). We always unpack dictionaries (in CopyExec) before a Sort or a Join anyway, so maybe we should just unpack them directly in ScanExec if there is no performance impact. I did experiment with this before but I do not remember what the performance impact was but I think it was small. |
Okay. Then seems we can get rid of first batch fetch in ScanExec and assign the scan schema from Spark. I will make a try. |
What is the problem the feature request solves?
Currently we create a native plan per task for a query. Even those tasks are on same executor, they still have separate native plans. For the approach, one problem is the additional cost for deserializing the same query plan multiple times and extra memory storing duplicate native plans.
Describe the potential solution
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: