Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support deltalake sink with rust sdk #13600

Merged
merged 35 commits into from
Dec 14, 2023

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Nov 22, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

support deltalake sink with rust sdk.

resolve #13152

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@xxhZs xxhZs linked an issue Nov 22, 2023 that may be closed by this pull request
@xxhZs xxhZs marked this pull request as ready for review November 23, 2023 06:34
@xxhZs xxhZs requested a review from a team as a code owner November 23, 2023 06:34
Copy link
Member

@xxchan xxchan Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added arrow_copy_with_arrow46.rs to resolve conflict issues.

This doesn't look very cool. Why not upgrade deltalake's arrow version instead? (Can use a fork if upstream won't accept the upgrade and release a version soon)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okk. I will make efforts to unify the versions of both.

modify cargo lock

fix
Cargo.toml Outdated
arrow-select = "48.0.1"
arrow-ord = "48.0.1"
arrow-row = "48.0.1"
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for downgrading arrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because both Delta.rs and IceLake need to use Arrow, version 48.0.1 is the maximum version they both support.

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
}
}

fn covert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topo. covert -> convert.

This method looks like a general utils method. Is this method implemented in the delta lake sdk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only found the conversion method for datatype, so we need to write a loop iteration ourselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found that we can use deltalake::arrow::datatypes::Schema::try_from(dl_table.get_schema()?) in where we currently call convert_schema.

Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include some e2e test in e2e_test/?

src/connector/src/sink/deltalake.rs Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
Cargo.toml Outdated
arrow-select = "49"
arrow-ord = "49"
arrow-row = "49"
arrow-array = "48.0.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What compile error will we get if we don't use the arrow version of delta.rs? Does the compile error comes from unable to use the to_record_batch_with_schema implemented by us?

Copy link
Contributor Author

@xxhZs xxhZs Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If we use 49. Cargo will reload a 48 on the dependent link for delta, and then the output of our to_record_batch_with_schema method is 49. It cannot be input into the write of delta because the parameter accepted by write is 48.
So it is necessary to ensure that the versions of 'arrow' relied upon by 'icelake' and 'delta' are the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenym1
Copy link
Contributor

wenym1 commented Dec 1, 2023

In a new commit c890167, I just tried to introduce a separate arrow version for deltalake so that the arrow version of the current code will not be downgraded. This is achieved by manually setting the arrow version of icelake in Cargo.lock.

The current code of arrow datachunk conversion is moved to a macro gen_code, so that the conversion code can be applied to both versions.

The reason for delta-rs to depend on a older version (i.e. 48.0.1) of arrow is that its version of datafusion it depends on is 48.0.1. However, the latest version of datafusion is already using 49.0.0. So after the delta-rs upgrades their datafusion version to one that uses 49.0.0, we can then let both delta-rs to use the same version as our current code. FYI, I tried forked delta-rs and upgrades their datafusion version, but it cannot compile without some extra effort because there is some breaking changes in the upgrade. We can have a issue to track their version upgrade @xxhZs

@wangrunji0408 @xxchan Could you take a look whether the current hack looks good to you?

@xxhZs
Copy link
Contributor Author

xxhZs commented Dec 1, 2023

In a new commit c890167, I just tried to introduce a separate arrow version for deltalake so that the arrow version of the current code will not be downgraded. This is achieved by manually setting the arrow version of icelake in Cargo.lock.

The current code of arrow datachunk conversion is moved to a macro gen_code, so that the conversion code can be applied to both versions.

The reason for delta-rs to depend on a older version (i.e. 48.0.1) of arrow is that its version of datafusion it depends on is 48.0.1. However, the latest version of datafusion is already using 49.0.0. So after the delta-rs upgrades their datafusion version to one that uses 49.0.0, we can then let both delta-rs to use the same version as our current code. FYI, I tried forked delta-rs and upgrades their datafusion version, but it cannot compile without some extra effort because there is some breaking changes in the upgrade. We can have a issue to track their version upgrade @xxhZs

@wangrunji0408 @xxchan Could you take a look whether the current hack looks good to you?

Using macros to reuse two copies of code is acceptable, and we may need to keep it that way all the time. Because there will always be a time when icelack and delta.rs depend on an inconsistent arrow.
If we want to finally unify both of their dependencies on arrow, i.e. only one version of arrow exists in our system, I think we can totally use 48 in this pr, since there seems to be no component in our system that needs to work under 49, and after that we just need to wait for both to depend on one version (in fact, just need delta rs to upgrade to it, since icelake's dependency is >48, and the newer version is supported by default), and then update the arrow version.

@wenym1
Copy link
Contributor

wenym1 commented Dec 1, 2023

Using macros to reuse two copies of code is acceptable, and we may need to keep it that way all the time. Because there will always be a time when icelack and delta.rs depend on an inconsistent arrow.
If we want to finally unify both of their dependencies on arrow, i.e. only one version of arrow exists in our system, I think we can totally use 48 in this pr, since there seems to be no component in our system that needs to work under 49, and after that we just need to wait for both to depend on one version (in fact, just need delta rs to upgrade to it, since icelake's dependency is >48, and the newer version is supported by default), and then update the arrow version.

Not only deltalake and icelake are using arrow, but also our internal udf. In the future if we upgrade the arrow version of udf, it doesn't make sense to be blocked by arrow version of some external connectors. I think it's acceptable to support several versions of arrow at the same time if we don't have to duplicate the code.

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4509 files.

Valid Invalid Ignored Fixed
1982 3 2524 0
Click to see the invalid file list
  • src/common/src/array/arrow/arrow_common.rs
  • src/common/src/array/arrow/arrow_deltalake.rs
  • src/common/src/array/arrow/mod.rs

src/common/src/array/arrow/arrow_common.rs Outdated Show resolved Hide resolved
src/common/src/array/arrow/arrow_deltalake.rs Show resolved Hide resolved
src/common/src/array/arrow/mod.rs Outdated Show resolved Hide resolved
@wenym1
Copy link
Contributor

wenym1 commented Dec 1, 2023

I just changed to reload the same arrow.rs for twice for different arrow version. Now we don't even need to use macro.

pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema;
use {
    arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
    arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};

#[allow(clippy::duplicate_mod)]
#[path = "./arrow.rs"]
mod arrow_impl;

In the outer mod we rename the arrow packages to arrow_xxx, and in arrow.rs we use super::arrow_xxx to use arrow version specified in the outer mod.

@xxhZs xxhZs force-pushed the 13152-reimplement-delta-lake-sink-with-rust-sdk branch 4 times, most recently from de70454 to 8c41c0d Compare December 7, 2023 10:02
fix ci

fix ci

fix

fix ci

fix ci

fix
@xxhZs xxhZs force-pushed the 13152-reimplement-delta-lake-sink-with-rust-sdk branch from 8c41c0d to 5a492a6 Compare December 7, 2023 11:36
Cargo.toml Outdated Show resolved Hide resolved
src/connector/Cargo.toml Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
src/connector/src/sink/deltalake.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@wenym1 wenym1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the PR.

Unit test time out in CI because we add new test case in this PR. we may increase the timeout a bit to pass the CI.

e2e_test/sink/deltalake_rust_sink.slt Outdated Show resolved Hide resolved
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be so many Cargo.lock changes. Is this expected? Also, will this PR increase the compile time and binary size significantly given that many new dependencies are added. cc @xxhZs @wenym1

@wenym1
Copy link
Contributor

wenym1 commented Dec 13, 2023

There seems to be so many Cargo.lock changes. Is this expected? Also, will this PR increase the compile time and binary size significantly given that many new dependencies are added.

This is expected because most changes are introduced by the delta lake libraries.

I had a test locally on the main branch and this branch to test building the risingwave_cmd_all in release mode.

# of deps compile time binary size
current commit 1222 19m 36s 381,807,328
current commit w/o datafusion 1204 16m 47s 340898016
current commit w/o datafusion + dynamo db 1178 16m 20s 332395424
main 1148 16m 05s 316,990,384
main + arrow 0.48 1154 16m 32s 317,187,120

It looks like the great number of changes in Cargo.lock are introduced by datafusion and dynamo db. The datafusion is introduced by accidence in the dev-dependency and can be temporarily removed. The dynamo db dependency is bundled in the delta rs sdk when we enable the s3 feature of it. I have forked the repo and support conditionally removing the dependency on dynamo db so that we can avoid introducing the dynamo db sdk dependency.

After removing the dependency on datafusion and dynamo db, the compile time and binary size is greatly reduced, which is slightly increases compared to the main.

Besides, I had a wired observation that the datafusion dependency is introduced in dev-dependency and only used in unit tests. However, the Cargo.lock still include datafusion as a dependency of risingave_connector, which causes the increase in compile time.

@xxchan xxchan mentioned this pull request Dec 13, 2023
2 tasks
@xxhZs xxhZs added this pull request to the merge queue Dec 14, 2023
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Dec 14, 2023
@xxhZs xxhZs enabled auto-merge December 14, 2023 03:01
@xxhZs xxhZs added this pull request to the merge queue Dec 14, 2023
Merged via the queue into main with commit cd04aaa Dec 14, 2023
29 of 30 checks passed
@xxhZs xxhZs deleted the 13152-reimplement-delta-lake-sink-with-rust-sdk branch December 14, 2023 03:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reimplement delta lake sink with rust sdk
6 participants