Skip to content

Commit

Permalink
feat: support expression to compute iceberg sink partition value (#14470
Browse files Browse the repository at this point in the history
)

Co-authored-by: ZENOTME <[email protected]>
Co-authored-by: Renjie Liu <[email protected]>
  • Loading branch information
3 people authored and Little-Wallace committed Jan 20, 2024
1 parent 5500c70 commit 1d6df36
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 169 deletions.
300 changes: 135 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [
icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [
"prometheus",
] }
arrow-array = "49"
Expand Down
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ message ExprNode {
PG_GET_INDEXDEF = 2400;
COL_DESCRIPTION = 2401;
PG_GET_VIEWDEF = 2402;

// EXTERNAL
ICEBERG_TRANSFORM = 2201;
}
// Only use this field for function call. For other types of expression, it should be UNSPECIFIED.
Type function_type = 1;
Expand Down
2 changes: 2 additions & 0 deletions src/expr/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ normal = ["workspace-hack", "ctor"]
[dependencies]
aho-corasick = "1"
anyhow = "1"
arrow-schema = { workspace = true }
async-trait = "0.1"
auto_enums = "0.8"
chrono = { version = "0.4", default-features = false, features = [
Expand All @@ -28,6 +29,7 @@ fancy-regex = "0.13"
futures-async-stream = { workspace = true }
futures-util = "0.3"
hex = "0.4"
icelake = { workspace = true }
itertools = "0.12"
jsonbb = "0.1.2"
md5 = "0.7"
Expand Down
33 changes: 33 additions & 0 deletions src/expr/impl/src/scalar/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,36 @@ fn array(row: impl Row, ctx: &Context) -> ListValue {
fn row_(row: impl Row) -> StructValue {
StructValue::new(row.iter().map(|d| d.to_owned_datum()).collect())
}

#[cfg(test)]
mod tests {
use risingwave_common::array::DataChunk;
use risingwave_common::row::Row;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::build_from_pretty;

#[tokio::test]
async fn test_row_expr() {
let expr = build_from_pretty("(row:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)");
let (input, expected) = DataChunk::from_pretty(
"i i i <i,i,i>
1 2 3 (1,2,3)
4 2 1 (4,2,1)
9 1 3 (9,1,3)
1 1 1 (1,1,1)",
)
.split_column_at(3);

// test eval
let output = expr.eval(&input).await.unwrap();
assert_eq!(&output, expected.column_at(0));

// test eval_row
for (row, expected) in input.rows().zip_eq_debug(expected.rows()) {
let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
assert_eq!(result, expected.datum_at(0).to_owned_datum());
}
}
}
197 changes: 197 additions & 0 deletions src/expr/impl/src/scalar/external/iceberg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module contains the expression for computing the iceberg partition value.
//! spec ref: <https://iceberg.apache.org/spec/#partition-transforms>
use std::fmt::Formatter;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::anyhow;
use arrow_schema::DataType as ArrowDataType;
use icelake::types::{
create_transform_function, Any as IcelakeDataType, BoxedTransformFunction, Transform,
};
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::ensure;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr::BoxedExpression;
use risingwave_expr::{build_function, ExprError, Result};

pub struct IcebergTransform {
child: BoxedExpression,
transform: BoxedTransformFunction,
return_type: DataType,
}

impl std::fmt::Debug for IcebergTransform {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IcebergTransform")
.field("child", &self.child)
.field("return_type", &self.return_type)
.finish()
}
}

#[async_trait::async_trait]
impl risingwave_expr::expr::Expression for IcebergTransform {
fn return_type(&self) -> DataType {
self.return_type.clone()
}

async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> {
// Get the child array
let array = self.child.eval(data_chunk).await?;
// Convert to arrow array
let arrow_array = array.as_ref().try_into().unwrap();
// Transform
let res_array = self.transform.transform(arrow_array).unwrap();
// Convert back to array ref and return it
Ok(Arc::new((&res_array).try_into().unwrap()))
}

async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> {
Err(ExprError::Internal(anyhow!(
"eval_row in iceberg_transform is not supported yet"
)))
}
}

#[build_function("iceberg_transform(varchar, any) -> any", type_infer = "panic")]
fn build(return_type: DataType, mut children: Vec<BoxedExpression>) -> Result<BoxedExpression> {
let transform_type = {
let datum = children[0].eval_const()?.unwrap();
let str = datum.as_utf8();
Transform::from_str(str).map_err(|_| ExprError::InvalidParam {
name: "transform type in icberg_transform",
reason: format!("Fail to parse {str} as iceberg transform type").into(),
})?
};

// For Identity and Void transform, we will use `InputRef` and const null in frontend,
// so it should not reach here.
assert!(!matches!(
transform_type,
Transform::Identity | Transform::Void
));

// Check type:
// 1. input type can be transform successfully
// 2. return type is the same as the result type
let input_type = IcelakeDataType::try_from(ArrowDataType::try_from(children[1].return_type())?)
.map_err(|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!("Failed to convert input type to icelake type, got error: {err}",)
.into(),
})?;
let expect_res_type = transform_type.result_type(&input_type).map_err(
|err| ExprError::InvalidParam {
name: "input type in iceberg_transform",
reason: format!(
"Failed to get result type for transform type {:?} and input type {:?}, got error: {}",
transform_type, input_type, err
)
.into()
})?;
let actual_res_type = IcelakeDataType::try_from(ArrowDataType::try_from(return_type.clone())?)
.map_err(|err| ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!("Failed to convert return type to icelake type, got error: {err}",)
.into(),
})?;
ensure!(
expect_res_type == actual_res_type,
ExprError::InvalidParam {
name: "return type in iceberg_transform",
reason: format!(
"Expect return type {:?} but got {:?}",
expect_res_type, actual_res_type
)
.into()
}
);

Ok(Box::new(IcebergTransform {
child: children.remove(1),
transform: create_transform_function(&transform_type)
.map_err(|err| ExprError::Internal(err.into()))?,
return_type,
}))
}

#[cfg(test)]
mod test {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_expr::expr::build_from_pretty;

#[tokio::test]
async fn test_bucket() {
let (input, expected) = DataChunk::from_pretty(
"i i
34 1373",
)
.split_column_at(1);
let expr = build_from_pretty("(iceberg_transform:int4 bucket[2017]:varchar $0:int)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(0));
}

#[tokio::test]
async fn test_truncate() {
let (input, expected) = DataChunk::from_pretty(
"T T
iceberg ice
risingwave ris
delta del",
)
.split_column_at(1);
let expr = build_from_pretty("(iceberg_transform:varchar truncate[3]:varchar $0:varchar)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(0));
}

#[tokio::test]
async fn test_year_month_day_hour() {
let (input, expected) = DataChunk::from_pretty(
"TZ i i i i
1970-01-01T00:00:00.000000000+00:00 0 0 0 0
1971-02-01T01:00:00.000000000+00:00 1 13 396 9505
1972-03-01T02:00:00.000000000+00:00 2 26 790 18962
1970-05-01T06:00:00.000000000+00:00 0 4 120 2886
1970-06-01T07:00:00.000000000+00:00 0 5 151 3631",
)
.split_column_at(1);

// year
let expr = build_from_pretty("(iceberg_transform:int4 year:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(0));

// month
let expr = build_from_pretty("(iceberg_transform:int4 month:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(1));

// day
let expr = build_from_pretty("(iceberg_transform:int4 day:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(2));

// hour
let expr = build_from_pretty("(iceberg_transform:int4 hour:varchar $0:timestamptz)");
let res = expr.eval(&input).await.unwrap();
assert_eq!(res, *expected.column_at(3));
}
}
16 changes: 16 additions & 0 deletions src/expr/impl/src/scalar/external/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module contain the expression for external system. Such as partition compute expression for external sink.
pub mod iceberg;
1 change: 1 addition & 0 deletions src/expr/impl/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mod to_char;
mod to_jsonb;
mod vnode;
pub use to_jsonb::*;
mod external;
mod to_timestamp;
mod translate;
mod trigonometric;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ impl ExprVisitor for ImpureAnalyzer {
| expr_node::Type::Greatest
| expr_node::Type::Least
| expr_node::Type::ConvertFrom
| expr_node::Type::ConvertTo =>
| expr_node::Type::ConvertTo
| expr_node::Type::IcebergTransform =>
// expression output is deterministic(same result for the same input)
{
func_call
Expand Down
7 changes: 5 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ aws-smithy-runtime = { version = "1", default-features = false, features = ["cli
aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "hyper-0-14-x", "rt-tokio"] }
axum = { version = "0.6" }
base64 = { version = "0.21", features = ["alloc"] }
bigdecimal = { version = "0.4" }
bit-vec = { version = "0.6" }
bitflags = { version = "2", default-features = false, features = ["serde", "std"] }
byteorder = { version = "1" }
Expand Down Expand Up @@ -64,7 +65,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper = { version = "0.14", features = ["full"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["serde", "std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.11" }
itertools = { version = "0.10" }
jni = { version = "0.21", features = ["invocation"] }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
lexical-core = { version = "0.8", features = ["format"] }
Expand Down Expand Up @@ -148,6 +149,8 @@ url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["fast-rng", "serde", "v4"] }
whoami = { version = "1" }
zeroize = { version = "1" }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }

[build-dependencies]
Expand All @@ -166,7 +169,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] }
generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.11" }
itertools = { version = "0.10" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }
Expand Down

0 comments on commit 1d6df36

Please sign in to comment.