-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support deltalake sink with rust sdk #13600
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We added arrow_copy_with_arrow46.rs to resolve conflict issues.
This doesn't look very cool. Why not upgrade deltalake
's arrow version instead? (Can use a fork if upstream won't accept the upgrade and release a version soon)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okk. I will make efforts to unify the versions of both.
Cargo.toml
Outdated
arrow-select = "48.0.1" | ||
arrow-ord = "48.0.1" | ||
arrow-row = "48.0.1" | ||
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason for downgrading arrow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because both Delta.rs and IceLake need to use Arrow, version 48.0.1 is the maximum version they both support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
src/connector/src/sink/deltalake.rs
Outdated
} | ||
} | ||
|
||
fn covert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topo. covert
-> convert
.
This method looks like a general utils method. Is this method implemented in the delta lake sdk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only found the conversion method for datatype, so we need to write a loop iteration ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just found that we can use deltalake::arrow::datatypes::Schema::try_from(dl_table.get_schema()?)
in where we currently call convert_schema
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we include some e2e test in e2e_test/
?
Cargo.toml
Outdated
arrow-select = "49" | ||
arrow-ord = "49" | ||
arrow-row = "49" | ||
arrow-array = "48.0.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What compile error will we get if we don't use the arrow version of delta.rs? Does the compile error comes from unable to use the to_record_batch_with_schema
implemented by us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If we use 49. Cargo will reload a 48 on the dependent link for delta, and then the output of our to_record_batch_with_schema
method is 49. It cannot be input into the write of delta because the parameter accepted by write is 48.
So it is necessary to ensure that the versions of 'arrow' relied upon by 'icelake' and 'delta' are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #13600 (comment).
In a new commit c890167, I just tried to introduce a separate arrow version for deltalake so that the arrow version of the current code will not be downgraded. This is achieved by manually setting the arrow version of icelake in Cargo.lock. The current code of arrow datachunk conversion is moved to a macro The reason for delta-rs to depend on a older version (i.e. 48.0.1) of arrow is that its version of datafusion it depends on is 48.0.1. However, the latest version of datafusion is already using 49.0.0. So after the delta-rs upgrades their datafusion version to one that uses 49.0.0, we can then let both delta-rs to use the same version as our current code. FYI, I tried forked delta-rs and upgrades their datafusion version, but it cannot compile without some extra effort because there is some breaking changes in the upgrade. We can have a issue to track their version upgrade @xxhZs @wangrunji0408 @xxchan Could you take a look whether the current hack looks good to you? |
Using macros to reuse two copies of code is acceptable, and we may need to keep it that way all the time. Because there will always be a time when icelack and delta.rs depend on an inconsistent arrow. |
Not only deltalake and icelake are using arrow, but also our internal udf. In the future if we upgrade the arrow version of udf, it doesn't make sense to be blocked by arrow version of some external connectors. I think it's acceptable to support several versions of arrow at the same time if we don't have to duplicate the code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4509 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
1982 | 3 | 2524 | 0 |
Click to see the invalid file list
- src/common/src/array/arrow/arrow_common.rs
- src/common/src/array/arrow/arrow_deltalake.rs
- src/common/src/array/arrow/mod.rs
I just changed to reload the same
In the outer mod we rename the arrow packages to |
de70454
to
8c41c0d
Compare
8c41c0d
to
5a492a6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the PR.
Unit test time out in CI because we add new test case in this PR. we may increase the timeout a bit to pass the CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected because most changes are introduced by the delta lake libraries. I had a test locally on the main branch and this branch to test building the
It looks like the great number of changes in Cargo.lock are introduced by datafusion and dynamo db. The datafusion is introduced by accidence in the dev-dependency and can be temporarily removed. The dynamo db dependency is bundled in the delta rs sdk when we enable the s3 feature of it. I have forked the repo and support conditionally removing the dependency on dynamo db so that we can avoid introducing the dynamo db sdk dependency. After removing the dependency on datafusion and dynamo db, the compile time and binary size is greatly reduced, which is slightly increases compared to the main. Besides, I had a wired observation that the datafusion dependency is introduced in dev-dependency and only used in unit tests. However, the Cargo.lock still include datafusion as a dependency of risingave_connector, which causes the increase in compile time. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
support deltalake sink with rust sdk.
resolve #13152
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.