Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support expression to compute iceberg sink partition value #14470

Merged
merged 5 commits into from
Jan 15, 2024

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Jan 10, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

This PR is the first part of #13898. For the whole picture, can refer the draft PR: #14135.

This PR introduces two expression to compute partition value:

  1. construct struct expression: It can take multiple expression and combine the result as a struct type
  2. iceberg expression: used to compute the partition value, spec: https://iceberg.apache.org/spec/#partition-transforms. This expression use the transform function in icelake to compute.

For now, the transform function in icelake only support to take a arrow array as input. In the future, we will refatocr it as a function which take a value as input so that we can implement these external function using the expression macro in expr/impl. Like:

#[iceberg_bucket(i32, i32) -> i32]
fn iceberg_bucket(val: i32, n: i32) -> i32 {
  icelake::transform_function::bucket(val,n)
}

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

Cargo.lock Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, why Cargo.lock has so many updates? Especially why syn, serde is updated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because avro specified the minor number (and uses dependabot) and thus forces downstream to update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker. But want to take a look at the CHANGELOG of syn, serde, thiserror

// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add mod comments to describe what does "external" mean?

Comment on lines 30 to 31
/// This module contains the iceberg expression for computing the partition value.
/// spec ref: <https://iceberg.apache.org/spec/#partition-transforms>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use //! and put at the beginning of the file

proto/expr.proto Outdated
Comment on lines 280 to 287

// EXTERNAL
ICEBERG_BUCKET = 2201;
ICEBERG_TRUNCATE = 2202;
ICEBERG_YEAR = 2203;
ICEBERG_MONTH = 2204;
ICEBERG_DAY = 2205;
ICEBERG_HOUR = 2206;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little surprising that this many expressions are needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a bit why a single function, maybe like iceberg_partition_value isn't enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is it because the "input" isn't exprs, but iceberg "transform"s, so it's not easy to construct? Hmmm 🧐

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is it because the "input" isn't exprs, but iceberg "transform"s, so it's not easy to construct? Hmmm 🧐

Yes. Another reason is that different transform function has different signatures.
e.g.
bucket(input,n) -> i32, input should be int, long, decimal, date, time, timestamp, timestamptz, string, uuid, fixed, binary.
truncate(input,w) -> input, input should be int, long, decimal, string.

Actually, we also can use a function like 🤔
iceberg_transform(type, metadata, input) -> any
But then we need to add lots of check logic in a single function like following and this will make this function too complex I think.

fn iceberg_transform<T>(type:i32, metadata: T, input:..) {
  switch type {
    bucket => ,
    truncate => ,
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I find a way to avoid this, let me have a try.🥵

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new implementation have avoid it now.

@xxchan xxchan changed the title feat: support expression to compute paritition value feat: support expression to compute iceberg sink partition value Jan 10, 2024
Copy link
Contributor

@wangrunji0408 wangrunji0408 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some unit tests and e2e tests for these functions?

src/expr/impl/src/scalar/construct_struct.rs Outdated Show resolved Hide resolved
src/expr/core/src/expr/external/iceberg.rs Outdated Show resolved Hide resolved
src/expr/core/src/expr/build.rs Outdated Show resolved Hide resolved
@ZENOTME ZENOTME force-pushed the zj/partition_compute_expr branch from 6d55b5c to c6455cc Compare January 10, 2024 08:59
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 10, 2024

Could you add some unit tests and e2e tests for these functions?

I have added the unit test. And these expressions don't expect to be called by the user directly so we didn't support them in frontend. (But maybe it's an interesting feature.😜) The e2e test will be postponed to the next PR in sink test.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

}

async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> {
unimplemented!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will panic, I would suggest to return feature not supported error.

use risingwave_expr::{build_function, Result};

#[derive(Debug)]
pub enum IcebergTransformType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reuse the Transform in icelake so that we don't need to maintain two sets of enums?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess defining a new enum is because we need a fixed mapping between transform types and integers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess defining a new enum is because we need a fixed mapping between transform types and integers?

This way maybe a function would be enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl<'a> ToString for &'a Transform {
    fn to_string(&self) -> String {
        match self {
            Transform::Identity => "identity".to_string(),
            Transform::Year => "year".to_string(),
            Transform::Month => "month".to_string(),
            Transform::Day => "day".to_string(),
            Transform::Hour => "hour".to_string(),
            Transform::Void => "void".to_string(),
            Transform::Bucket(length) => format!("bucket[{}]", length),
            Transform::Truncate(width) => format!("truncate[{}]", width),
        }
    }
}

I find that we can use string so that we don't even need extra function meta param( all info contains in the string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's Just serializaion/deserializatiom

Copy link
Contributor

@wangrunji0408 wangrunji0408 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

src/expr/impl/src/lib.rs Outdated Show resolved Hide resolved
@ZENOTME ZENOTME force-pushed the zj/partition_compute_expr branch from c6455cc to 5647507 Compare January 11, 2024 05:25
Cargo.lock Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

let datum = children[0].eval_const()?.unwrap();
let str = datum.as_utf8();
Transform::from_str(str).map_err(|_| ExprError::InvalidParam {
name: "transform type in icberg_transform",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name: "transform type in icberg_transform",
name: "Invalid transform type in icberg_transform",

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 11, 2024

For now, the signature is #[build_function("iceberg_transform(varchar, any) -> any")]. According to #12262, the macro will generate a type_infer treat these two as the same type.
But in this function. these two any type can be different. So I guess it will cause that the type_infer will generate a wrong type.🤔 @wangrunji0408

oh, I guess we can just use new_unchecked to bypass type infer. But maybe we can have a way to custom type_infer.🤔

Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
IcebergTransform::check_time(&return_type, &children)?
}
Transform::Identity | Transform::Void => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why identity and void is illegal? Identity means user has calculated partition value for us, so I think it should be valid. Void is a little complex, if all partition fields are void, it should be treated as unpartitioned. But if only one field is partitioned, we should return null.

Copy link
Contributor Author

@ZENOTME ZENOTME Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For identity and void, we can directly represent them in the project as InputRef and const null. It can avoid extra expression compute and data convert, but I'm not sure whether it's a good practice...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean do this in optimizer? Sounds reasonable to me, but please add some comment here.

}

impl IcebergTransform {
fn check_bucket(return_type: &DataType, children: &Vec<BoxedExpression>) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to use the return_type api in Transform to do the verification so that we can keep consistency

@wangrunji0408
Copy link
Contributor

For now, the signature is #[build_function("iceberg_transform(varchar, any) -> any")]. According to #12262, the macro will generate a type_infer treat these two as the same type. But in this function. these two any type can be different. So I guess it will cause that the type_infer will generate a wrong type.🤔 @wangrunji0408

oh, I guess we can just use new_unchecked to bypass type infer. But maybe we can have a way to custom type_infer.🤔

Yes, the auto-generated type_infer function is not correct. You can customize it using:

#[build_function("...", type_infer = "...")]

But at present we don't call type_infer for scalar functions. So I think it's safe to leave it as is.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM

@ZENOTME ZENOTME force-pushed the zj/partition_compute_expr branch from 5647507 to 3d218ce Compare January 12, 2024 07:15
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 12, 2024

For now, the signature is #[build_function("iceberg_transform(varchar, any) -> any")]. According to #12262, the macro will generate a type_infer treat these two as the same type. But in this function. these two any type can be different. So I guess it will cause that the type_infer will generate a wrong type.🤔 @wangrunji0408
oh, I guess we can just use new_unchecked to bypass type infer. But maybe we can have a way to custom type_infer.🤔

Yes, the auto-generated type_infer function is not correct. You can customize it using:

#[build_function("...", type_infer = "...")]

But at present we don't call type_infer for scalar functions. So I think it's safe to leave it as is.

To avoid use type_infer, I mark it as type_infer="panic"

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

// Get the child array
let array = self.child.eval(data_chunk).await?;
// Convert to arrow array
let arrow_array = array.as_ref().try_into().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unwrap here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will convert it as an error in the next PR.

@liurenjie1024
Copy link
Contributor

cc @wangrunji0408 @xxchan Any other comments?

Copy link
Contributor

@wangrunji0408 wangrunji0408 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@liurenjie1024 liurenjie1024 added this pull request to the merge queue Jan 15, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 15, 2024
@liurenjie1024 liurenjie1024 added this pull request to the merge queue Jan 15, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 15, 2024
@liurenjie1024 liurenjie1024 added this pull request to the merge queue Jan 15, 2024
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 15, 2024
@liurenjie1024 liurenjie1024 added this pull request to the merge queue Jan 15, 2024
Merged via the queue into main with commit 9162478 Jan 15, 2024
29 of 30 checks passed
@liurenjie1024 liurenjie1024 deleted the zj/partition_compute_expr branch January 15, 2024 15:03
Little-Wallace pushed a commit that referenced this pull request Jan 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants