Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add partitioning scheme for unresolved shuffle and shuffle reader exec #1144

Merged
merged 3 commits into from
Dec 19, 2024

Conversation

onursatici
Copy link
Contributor

Which issue does this PR close?

Closes #16.

Rationale for this change

unresolved shuffle and shuffle reader exec does not have partitioning info. This becomes a problem when a stage has InterleaveExec, which requires all its children to have hash partitioning on the same expressions. Currently even though the partitioning satisfies InterleaveExec's requirements, because children show UnknownPartitioning planning such plans into stages fail.

What changes are included in this PR?

Carry partitioning scheme through shuffles

Are there any user-facing changes?

@@ -291,8 +294,11 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
)?))
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
let default_codec =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to make default_codec property BallistaPhysicalExtensionCodec, so it can be overridden if needed? BallistaLogicalExtensionCodec does similar thing. (also there is few other places where DefaultPhysicalCodec is created)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks!

@milenkovicm
Copy link
Contributor

@Dandandan would you be able to have a look, please

@Dandandan
Copy link
Contributor

Changes I think look good. Can we add a test for it?

@onursatici
Copy link
Contributor Author

@Dandandan should be ready for another look

@@ -50,14 +50,15 @@ message ShuffleWriterExecNode {
message UnresolvedShuffleExecNode {
uint32 stage_id = 1;
datafusion_common.Schema schema = 2;
uint32 output_partition_count = 4;
datafusion.Partitioning partitioning = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one small note, is there any reason we can't use 3 instead of 5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there was a field with field number 3 in the past that got deleted, so putting 3 for the new field would violate back compatibility. I don't think we need to preserve back compat here though, not sure if there are users that do rolling updates to their ballista scheduler and executors, but wanted to be safe.

https://protobuf.dev/programming-guides/proto3/#consequences

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume that its not used much

@Dandandan
Copy link
Contributor

Thanks @onursatici

@Dandandan Dandandan merged commit f840585 into apache:main Dec 19, 2024
24 checks passed
@onursatici onursatici deleted the os/shuffle-partitioning-info branch December 25, 2024 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ballista: UnresolvedShuffleExec and ShuffleReaderExec should show correct partitioning scheme
4 participants