Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support expression to compute iceberg sink partition value #14470

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 135 additions & 165 deletions Cargo.lock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, why Cargo.lock has so many updates? Especially why syn, serde is updated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because avro specified the minor number (and uses dependabot) and thus forces downstream to update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker. But want to take a look at the CHANGELOG of syn, serde, thiserror

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ message ExprNode {
PG_GET_INDEXDEF = 2400;
COL_DESCRIPTION = 2401;
PG_GET_VIEWDEF = 2402;

// EXTERNAL
ICEBERG_TRANSFORM = 2201;
}
// Only use this field for function call. For other types of expression, it should be UNSPECIFIED.
Type function_type = 1;
Expand Down
2 changes: 2 additions & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ normal = ["workspace-hack", "ctor"]
[dependencies]
aho-corasick = "1"
anyhow = "1"
arrow-schema = { workspace = true }
async-trait = "0.1"
auto_enums = "0.8"
chrono = { version = "0.4", default-features = false, features = [
Expand All @@ -28,6 +29,7 @@ fancy-regex = "0.13"
futures-async-stream = { workspace = true }
futures-util = "0.3"
hex = "0.4"
icelake = { workspace = true }
itertools = "0.12"
jsonbb = "0.1.2"
md5 = "0.7"
Expand Down
33 changes: 33 additions & 0 deletions src/expr/impl/src/scalar/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,36 @@ fn array(row: impl Row, ctx: &Context) -> ListValue {
fn row_(row: impl Row) -> StructValue {
StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect())
}

#[cfg(test)]
mod tests {
use risingwave_common::array::DataChunk;
use risingwave_common::row::Row;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::build_from_pretty;

#[tokio::test]
async fn test_row_expr() {
let expr = build_from_pretty("(row:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)");
let (input, expected) = DataChunk::from_pretty(
"i i i <i,i,i>
1 2 3 (1,2,3)
4 2 1 (4,2,1)
9 1 3 (9,1,3)
1 1 1 (1,1,1)",
)
.split_column_at(3);

// test eval
let output = expr.eval(&input).await.unwrap();
assert_eq!(&output, expected.column_at(0));

// test eval_row
for (row, expected) in input.rows().zip_eq_debug(expected.rows()) {
let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
assert_eq!(result, expected.datum_at(0).to_owned_datum());
}
}
}
197 changes: 197 additions & 0 deletions src/expr/impl/src/scalar/external/iceberg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module contains the expression for computing the iceberg partition value.
//! spec ref: <https://iceberg.apache.org/spec/#partition-transforms>
use std::fmt::Formatter;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
use arrow_schema::DataType as ArrowDataType;
use icelake::types::{
create_transform_function, Any as IcelakeDataType, BoxedTransformFunction, Transform,
};
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::ensure;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr::BoxedExpression;
use risingwave_expr::{build_function, ExprError, Result};

pub struct IcebergTransform {
child: BoxedExpression,
transform: BoxedTransformFunction,
return_type: DataType,
}

impl std::fmt::Debug for IcebergTransform {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IcebergTransform")
.field("child", &self.child)
.field("return_type", &self.return_type)
.finish()
}
}

#[async_trait::async_trait]
impl risingwave_expr::expr::Expression for IcebergTransform {
fn return_type(&self) -> DataType {
self.return_type.clone()
}

async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> {
// Get the child array
let array = self.child.eval(data_chunk).await?;
// Convert to arrow array
let arrow_array = array.as_ref().try_into().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unwrap here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will convert it as an error in the next PR.

// Transform
let res_array = self.transform.transform(arrow_array).unwrap();
// Convert back to array ref and return it
Ok(Arc::new((&res_array).try_into().unwrap()))
}

async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> {
Err(ExprError::Internal(anyhow!(
"eval_row in iceberg_transform is not supported yet"
)))
}
}

#[build_function("iceberg_transform(varchar, any) -> any", type_infer = "panic")]
fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<BoxedExpression> {
let transform_type = {
let datum = children[0].eval_const()?.unwrap();
let str = datum.as_utf8();
Transform::from_str(str).map_err(|_| ExprError::InvalidParam {
name: "transform type in icberg_transform",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name: "transform type in icberg_transform",
name: "Invalid transform type in icberg_transform",

reason: format!("Fail to parse {str} as iceberg transform type").into(),
})?
};

// For Identity and Void transform, we will use `InputRef` and const null in frontend,
// so it should not reach here.
assert!(!matches!(
transform_type,
Transform::Identity | Transform::Void
));

// Check type:
// 1. input type can be transform successfully
// 2. return type is the same as the result type
let input_type = IcelakeDataType::try_from(ArrowDataType::try_from(children[1].return_type())?)
.map_err(|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!("Failed to convert input type to icelake type, got error: {err}",)
.into(),
})?;
let expect_res_type = transform_type.result_type(&input_type).map_err(
|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!(
"Failed to get result type for transform type {:?} and input type {:?}, got error: {}",
transform_type, input_type, err
)
.into()
})?;
let actual_res_type = IcelakeDataType::try_from(ArrowDataType::try_from(return_type.clone())?)
.map_err(|err| ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!("Failed to convert return type to icelake type, got error: {err}",)
.into(),
})?;
ensure!(
expect_res_type == actual_res_type,
ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!(
"Expect return type {:?} but got {:?}",
expect_res_type, actual_res_type
)
.into()
}
);

Ok(Box::new(IcebergTransform {
child: children.remove(1),
transform: create_transform_function(&transform_type)
.map_err(|err| ExprError::Internal(err.into()))?,
return_type,
}))
}

#[cfg(test)]
mod test {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_expr::expr::build_from_pretty;

#[tokio::test]
async fn test_bucket() {
let (input, expected) = DataChunk::from_pretty(
"i i
34 1373",
)
.split_column_at(1);
let expr = build_from_pretty("(iceberg_transform:int4 bucket[2017]:varchar $0:int)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(0));
}

#[tokio::test]
async fn test_truncate() {
let (input, expected) = DataChunk::from_pretty(
"T T
iceberg ice
risingwave ris
delta del",
)
.split_column_at(1);
let expr = build_from_pretty("(iceberg_transform:varchar truncate[3]:varchar $0:varchar)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(0));
}

#[tokio::test]
async fn test_year_month_day_hour() {
let (input, expected) = DataChunk::from_pretty(
"TZ i i i i
1970-01-01T00:00:00.000000000+00:00 0 0 0 0
1971-02-01T01:00:00.000000000+00:00 1 13 396 9505
1972-03-01T02:00:00.000000000+00:00 2 26 790 18962
1970-05-01T06:00:00.000000000+00:00 0 4 120 2886
1970-06-01T07:00:00.000000000+00:00 0 5 151 3631",
)
.split_column_at(1);

// year
let expr = build_from_pretty("(iceberg_transform:int4 year:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(0));

// month
let expr = build_from_pretty("(iceberg_transform:int4 month:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(1));

// day
let expr = build_from_pretty("(iceberg_transform:int4 day:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(2));

// hour
let expr = build_from_pretty("(iceberg_transform:int4 hour:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(3));
}
}
16 changes: 16 additions & 0 deletions src/expr/impl/src/scalar/external/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module contain the expression for external system. Such as partition compute expression for external sink.
pub mod iceberg;
1 change: 1 addition & 0 deletions src/expr/impl/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mod to_char;
mod to_jsonb;
mod vnode;
pub use to_jsonb::*;
mod external;
mod to_timestamp;
mod translate;
mod trigonometric;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ impl ExprVisitor for ImpureAnalyzer {
| expr_node::Type::Greatest
| expr_node::Type::Least
| expr_node::Type::ConvertFrom
| expr_node::Type::ConvertTo =>
| expr_node::Type::ConvertTo
| expr_node::Type::IcebergTransform =>
// expression output is deterministic(same result for the same input)
{
func_call
Expand Down
7 changes: 5 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ aws-smithy-runtime = { version = "1", default-features = false, features = ["cli
aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "hyper-0-14-x", "rt-tokio"] }
axum = { version = "0.6" }
base64 = { version = "0.21", features = ["alloc"] }
bigdecimal = { version = "0.4" }
bit-vec = { version = "0.6" }
bitflags = { version = "2", default-features = false, features = ["serde", "std"] }
byteorder = { version = "1" }
Expand Down Expand Up @@ -64,7 +65,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper = { version = "0.14", features = ["full"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["serde", "std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.11" }
itertools = { version = "0.10" }
jni = { version = "0.21", features = ["invocation"] }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
lexical-core = { version = "0.8", features = ["format"] }
Expand Down Expand Up @@ -148,6 +149,8 @@ url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["fast-rng", "serde", "v4"] }
whoami = { version = "1" }
zeroize = { version = "1" }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }

[build-dependencies]
Expand All @@ -166,7 +169,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] }
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.11" }
itertools = { version = "0.10" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }
Expand Down
Loading