Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add schema evolution to merge statement #3136

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

JustinRush80
Copy link

Description

Add schema evolution (only merge) to the MERGE statement. New columns are added based on the columns predicates in the MERGE operations (eg. target.id = source.id). Using when_not_matched_insert_all and when_matched_update_all will add any new column to the target schema

Related Issue(s)

Documentation

@github-actions github-actions bot added binding/python Issues for the Python package binding/rust Issues for the Rust crate labels Jan 16, 2025
@JustinRush80 JustinRush80 marked this pull request as ready for review January 16, 2025 14:16
@ion-elgreco
Copy link
Collaborator

@JustinRush80 can you rebase your branch against main, or allow us to rebase it

I will do thorough review tomorrow then :)

@ion-elgreco
Copy link
Collaborator

@JustinRush80 could you rebase again, something went wrong since files changed is huge

Copy link

codecov bot commented Jan 18, 2025

Codecov Report

Attention: Patch coverage is 94.85792% with 38 lines in your changes missing coverage. Please review.

Project coverage is 72.12%. Comparing base (523c6d7) to head (dbebc49).

Files with missing lines Patch % Lines
crates/core/src/operations/merge/mod.rs 95.63% 2 Missing and 30 partials ⚠️
python/src/merge.rs 0.00% 4 Missing ⚠️
python/src/lib.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3136      +/-   ##
==========================================
+ Coverage   71.73%   72.12%   +0.38%     
==========================================
  Files         138      138              
  Lines       44362    45087     +725     
  Branches    44362    45087     +725     
==========================================
+ Hits        31825    32520     +695     
- Misses      10496    10504       +8     
- Partials     2041     2063      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@ion-elgreco ion-elgreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for picking this up! Just a couple modifications required but looks good so far!

python/deltalake/table.py Outdated Show resolved Hide resolved
python/tests/test_merge.py Show resolved Hide resolved
)?;
let schema = Arc::new(schema_bulider.finish());
new_schema = Some(schema.clone());
if schema != snapshot.input_schema()? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might give false positives. A while ago I made merge_arrow_schema, pass through the large/view types. But the input_schema() will actually have small types.

I think we should do the not_eq comparison when it's a Delta Schema (StructType). Can you also add a test where merge_schema is True, where we write Large or View types to a table but without any new columns. Then the result shouldn't have a new schema action in the log history

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ion-elgreco is metrics.num_target_files_added the right field to see any new schema actions? or is there another way to see an added actions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would want to read the commit file I think, something like this should work
let actions = crate::logstore::get_actions(version, self.read_commit_entry(version).await).await;

{
if target_schema.field_from_column(columns).is_err() {
let new_fields = source_schema.field_with_unqualified_name(columns.name())?;
ending_schema.push(new_fields.to_owned().with_nullable(true));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we insert the new fields in the schema, we should actually do some safety checks on the metadata. We cannot add new fields which has generated columns enabled by adding generated expressions, you can look in the recent PR of generated columns how this is prevented.

Copy link
Author

@JustinRush80 JustinRush80 Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like return a error message when the source data has a generated columns and the end user wants to add it via schema evolution?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JustinRush80 exactly

.filter(|ops| matches!(ops.r#type, OperationType::Update | OperationType::Insert))
.flat_map(|ops| ops.operations.keys())
{
if target_schema.field_from_column(columns).is_err() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about schema evolution in nested types such as structs? I think field_from_column looks at top level fields, isn't it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so but I will added a unit test and refactor if needed!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this process works for struct!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also new fields within an existing struct?

Because above we only do this snapshot.schema().index_of(name).is_none()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yea let me check that!

crates/core/src/operations/merge/mod.rs Outdated Show resolved Hide resolved
crates/core/src/operations/merge/mod.rs Outdated Show resolved Hide resolved
Signed-off-by: JustinRush80 <[email protected]>
// merge_arrow_schema is used to tell whether the two schema can be merge but we use the operation statement to pick new columns
// this avoid the side effect of adding unnessary columns (eg. target.id = source.ID) "ID" will not be added since "id" exist in target and end user intended it to be "id"
let mut new_schema = None;
let mut schema_actions: Vec<Action> = vec![];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super small nit, but can we make this Option instead

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema_actions? that make senses

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the schema action

@ion-elgreco
Copy link
Collaborator

@JustinRush80 this is shaping up really nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Schema evolution on upsert (merge)
2 participants