diff --git a/Cargo.lock b/Cargo.lock index a3075f4163410..b2e68b0047216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5051,7 +5051,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=cc27aa20d34b1d252bd7b8aaadee64c34140e687#cc27aa20d34b1d252bd7b8aaadee64c34140e687" +source = "git+https://github.com/icelake-io/icelake?rev=b4f4ca3c6d29092bd331925ead0bcceaa38bdd57#b4f4ca3c6d29092bd331925ead0bcceaa38bdd57" dependencies = [ "anyhow", "apache-avro 0.17.0", @@ -9113,6 +9113,7 @@ version = "1.5.0-alpha" dependencies = [ "aho-corasick", "anyhow", + "arrow-schema 49.0.0", "async-trait", "auto_enums", "chrono", diff --git a/Cargo.toml b/Cargo.toml index ffc2ce1af3af4..685fcbdc299f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "cc27aa20d34b1d252bd7b8aaadee64c34140e687", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [ "prometheus", ] } arrow-array = "49" diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index 7ff6b10f44d02..bee2e4eb10c04 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -18,6 +18,7 @@ normal = ["workspace-hack", "ctor"] [dependencies] aho-corasick = "1" anyhow = "1" +arrow-schema = { workspace = true } async-trait = "0.1" auto_enums = "0.8" chrono = { version = "0.4", default-features = false, features = [ diff --git a/src/expr/impl/src/scalar/external/iceberg.rs b/src/expr/impl/src/scalar/external/iceberg.rs index 5576a6418e986..3973efee559d6 100644 --- a/src/expr/impl/src/scalar/external/iceberg.rs +++ b/src/expr/impl/src/scalar/external/iceberg.rs @@ -19,7 +19,10 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::anyhow; -use icelake::types::{create_transform_function, BoxedTransformFunction, Transform}; +use arrow_schema::DataType as ArrowDataType; +use icelake::types::{ + create_transform_function, Any as IcelakeDataType, BoxedTransformFunction, Transform, +}; use risingwave_common::array::{ArrayRef, DataChunk}; use risingwave_common::ensure; use risingwave_common::row::OwnedRow; @@ -66,7 +69,7 @@ impl risingwave_expr::expr::Expression for IcebergTransform { } } -#[build_function("iceberg_transform(varchar, any) -> any")] +#[build_function("iceberg_transform(varchar, any) -> any", type_infer = "panic")] fn build(return_type: DataType, mut children: Vec) -> Result { let transform_type = { let datum = children[0].eval_const()?.unwrap(); @@ -77,19 +80,48 @@ fn build(return_type: DataType, mut children: Vec) -> Result IcebergTransform::check_bucket(&return_type, &children)?, - Transform::Truncate(_) => IcebergTransform::check_truncate(&return_type, &children)?, - Transform::Year | Transform::Month | Transform::Day | Transform::Hour => { - IcebergTransform::check_time(&return_type, &children)? + // For Identity and Void transform, we will use `InputRef` and const null in frontend, + // so it should not reach here. + assert!(!matches!( + transform_type, + Transform::Identity | Transform::Void + )); + + // Check type: + // 1. input type can be transform successfully + // 2. return type is the same as the result type + let input_type = IcelakeDataType::try_from(ArrowDataType::try_from(children[1].return_type())?) + .map_err(|err| ExprError::InvalidParam { + name: "input type in iceberg_transform", + reason: format!("Failed to convert input type to icelake type, got error: {err}",) + .into(), + })?; + let expect_res_type = transform_type.result_type(&input_type).map_err( + |err| ExprError::InvalidParam { + name: "input type in iceberg_transform", + reason: format!( + "Failed to get result type for transform type {:?} and input type {:?}, got error: {}", + transform_type, input_type, err + ) + .into() + })?; + let actual_res_type = IcelakeDataType::try_from(ArrowDataType::try_from(return_type.clone())?) + .map_err(|err| ExprError::InvalidParam { + name: "return type in iceberg_transform", + reason: format!("Failed to convert return type to icelake type, got error: {err}",) + .into(), + })?; + ensure!( + expect_res_type == actual_res_type, + ExprError::InvalidParam { + name: "return type in iceberg_transform", + reason: format!( + "Expect return type {:?} but got {:?}", + expect_res_type, actual_res_type + ) + .into() } - Transform::Identity | Transform::Void => { - return Err(ExprError::Internal(anyhow!( - "identity or void type should not be used in iceberg_transform" - ))) - } - } + ); Ok(Box::new(IcebergTransform { child: children.remove(1), @@ -99,43 +131,6 @@ fn build(return_type: DataType, mut children: Vec) -> Result) -> Result<()> { - ensure!(matches!( - children[1].return_type(), - DataType::Int32 - | DataType::Int64 - | DataType::Decimal - | DataType::Date - | DataType::Time - | DataType::Timestamp - | DataType::Timestamptz - | DataType::Varchar - | DataType::Bytea - )); - ensure!(*return_type == DataType::Int32); - Ok(()) - } - - fn check_truncate(return_type: &DataType, children: &Vec) -> Result<()> { - ensure!(matches!( - children[1].return_type(), - DataType::Int32 | DataType::Int64 | DataType::Decimal | DataType::Varchar - )); - ensure!(*return_type == children[1].return_type()); - Ok(()) - } - - fn check_time(return_type: &DataType, children: &Vec) -> Result<()> { - ensure!(matches!( - children[1].return_type(), - DataType::Date | DataType::Timestamp | DataType::Timestamptz - )); - ensure!(*return_type == DataType::Int32); - Ok(()) - } -} - #[cfg(test)] mod test { use risingwave_common::array::{DataChunk, DataChunkTestExt};