Skip to content

Commit

Permalink
refine check
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 12, 2024
1 parent ae6fd30 commit 3d218ce
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 53 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "cc27aa20d34b1d252bd7b8aaadee64c34140e687", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ normal = ["workspace-hack", "ctor"]
[dependencies]
aho-corasick = "1"
anyhow = "1"
arrow-schema = { workspace = true }
async-trait = "0.1"
auto_enums = "0.8"
chrono = { version = "0.4", default-features = false, features = [
Expand Down
97 changes: 46 additions & 51 deletions src/expr/impl/src/scalar/external/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
use icelake::types::{create_transform_function, BoxedTransformFunction, Transform};
use arrow_schema::DataType as ArrowDataType;
use icelake::types::{
create_transform_function, Any as IcelakeDataType, BoxedTransformFunction, Transform,
};
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::ensure;
use risingwave_common::row::OwnedRow;
Expand Down Expand Up @@ -66,7 +69,7 @@ impl risingwave_expr::expr::Expression for IcebergTransform {
}
}

#[build_function("iceberg_transform(varchar, any) -> any")]
#[build_function("iceberg_transform(varchar, any) -> any", type_infer = "panic")]
fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<BoxedExpression> {
let transform_type = {
let datum = children[0].eval_const()?.unwrap();
Expand All @@ -77,19 +80,48 @@ fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<Bo
})?
};

// Check type
match &transform_type {
Transform::Bucket(_) => IcebergTransform::check_bucket(&return_type, &children)?,
Transform::Truncate(_) => IcebergTransform::check_truncate(&return_type, &children)?,
Transform::Year | Transform::Month | Transform::Day | Transform::Hour => {
IcebergTransform::check_time(&return_type, &children)?
// For Identity and Void transform, we will use `InputRef` and const null in frontend,
// so it should not reach here.
assert!(!matches!(
transform_type,
Transform::Identity | Transform::Void
));

// Check type:
// 1. input type can be transform successfully
// 2. return type is the same as the result type
let input_type = IcelakeDataType::try_from(ArrowDataType::try_from(children[1].return_type())?)
.map_err(|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!("Failed to convert input type to icelake type, got error: {err}",)
.into(),
})?;
let expect_res_type = transform_type.result_type(&input_type).map_err(
|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!(
"Failed to get result type for transform type {:?} and input type {:?}, got error: {}",
transform_type, input_type, err
)
.into()
})?;
let actual_res_type = IcelakeDataType::try_from(ArrowDataType::try_from(return_type.clone())?)
.map_err(|err| ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!("Failed to convert return type to icelake type, got error: {err}",)
.into(),
})?;
ensure!(
expect_res_type == actual_res_type,
ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!(
"Expect return type {:?} but got {:?}",
expect_res_type, actual_res_type
)
.into()
}
Transform::Identity | Transform::Void => {
return Err(ExprError::Internal(anyhow!(
"identity or void type should not be used in iceberg_transform"
)))
}
}
);

Ok(Box::new(IcebergTransform {
child: children.remove(1),
Expand All @@ -99,43 +131,6 @@ fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<Bo
}))
}

impl IcebergTransform {
fn check_bucket(return_type: &DataType, children: &Vec<BoxedExpression>) -> Result<()> {
ensure!(matches!(
children[1].return_type(),
DataType::Int32
| DataType::Int64
| DataType::Decimal
| DataType::Date
| DataType::Time
| DataType::Timestamp
| DataType::Timestamptz
| DataType::Varchar
| DataType::Bytea
));
ensure!(*return_type == DataType::Int32);
Ok(())
}

fn check_truncate(return_type: &DataType, children: &Vec<BoxedExpression>) -> Result<()> {
ensure!(matches!(
children[1].return_type(),
DataType::Int32 | DataType::Int64 | DataType::Decimal | DataType::Varchar
));
ensure!(*return_type == children[1].return_type());
Ok(())
}

fn check_time(return_type: &DataType, children: &Vec<BoxedExpression>) -> Result<()> {
ensure!(matches!(
children[1].return_type(),
DataType::Date | DataType::Timestamp | DataType::Timestamptz
));
ensure!(*return_type == DataType::Int32);
Ok(())
}
}

#[cfg(test)]
mod test {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
Expand Down

0 comments on commit 3d218ce

Please sign in to comment.