-
Notifications
You must be signed in to change notification settings - Fork 593
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add external/iceberg and construct_struct expression
ZENOTME
committed
Jan 10, 2024
1 parent
f3b0650
commit 6d55b5c
Showing
13 changed files
with
702 additions
and
171 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,399 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::fmt::{Debug, Formatter}; | ||
use std::sync::Arc; | ||
|
||
use icelake::types::{ | ||
Bucket as BucketTransform, Day as DayTransform, Hour as HourTransform, Month as MonthTransform, | ||
TransformFunction, Truncate as TruncateTransform, Year as YearTransform, | ||
}; | ||
use risingwave_common::array::{ArrayRef, DataChunk}; | ||
use risingwave_common::ensure; | ||
use risingwave_common::row::OwnedRow; | ||
use risingwave_common::types::{DataType, Datum}; | ||
use risingwave_expr::expr::{get_children_and_return_type_for_func_call, BoxedExpression, Build}; | ||
use risingwave_expr::Result; | ||
use risingwave_pb::expr::ExprNode; | ||
|
||
/// This module contains the iceberg expression for computing the partition value. | ||
/// spec ref: <https://iceberg.apache.org/spec/#partition-transforms> | ||
pub struct Bucket { | ||
child: BoxedExpression, | ||
n: i32, | ||
transform: BucketTransform, | ||
} | ||
|
||
impl Debug for Bucket { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "Iceberg_Bucket({})", self.n) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl risingwave_expr::expr::Expression for Bucket { | ||
fn return_type(&self) -> DataType { | ||
DataType::Int32 | ||
} | ||
|
||
async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> { | ||
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); | ||
// Transform | ||
let res_array = self.transform.transform(arrow_array).unwrap(); | ||
// Convert back to array ref and return it | ||
Ok(Arc::new((&res_array).try_into().unwrap())) | ||
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl Build for Bucket { | ||
fn build( | ||
prost: &ExprNode, | ||
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>, | ||
) -> Result<Self> { | ||
let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; | ||
|
||
// Check expression | ||
ensure!(children.len() == 2); | ||
ensure!(matches!( | ||
children[0].get_return_type().unwrap().into(), | ||
DataType::Int32 | ||
| DataType::Int64 | ||
| DataType::Decimal | ||
| DataType::Date | ||
| DataType::Time | ||
| DataType::Timestamp | ||
| DataType::Timestamptz | ||
| DataType::Varchar | ||
| DataType::Bytea | ||
)); | ||
ensure!(DataType::Int32 == children[1].get_return_type().unwrap().into()); | ||
ensure!(res_type == DataType::Int32); | ||
|
||
// Get the second child as const param | ||
let literal = build_child(&children[1])?; | ||
let n = *literal.eval_const()?.unwrap().as_int32(); | ||
|
||
// Build the child | ||
let child = build_child(&children[0])?; | ||
Ok(Bucket { | ||
child, | ||
n, | ||
transform: BucketTransform::new(n), | ||
}) | ||
} | ||
} | ||
|
||
pub struct Truncate { | ||
child: BoxedExpression, | ||
w: i32, | ||
transform: TruncateTransform, | ||
} | ||
|
||
impl Debug for Truncate { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "Iceberg_Truncate({})", self.w) | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl risingwave_expr::expr::Expression for Truncate { | ||
fn return_type(&self) -> DataType { | ||
self.child.return_type() | ||
} | ||
|
||
async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> { | ||
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); | ||
// Transform | ||
let res_array = self.transform.transform(arrow_array).unwrap(); | ||
// Convert back to array ref and return it | ||
Ok(Arc::new((&res_array).try_into().unwrap())) | ||
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl Build for Truncate { | ||
fn build( | ||
prost: &ExprNode, | ||
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>, | ||
) -> Result<Self> { | ||
let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; | ||
|
||
// Check expression | ||
ensure!(children.len() == 2); | ||
ensure!(matches!( | ||
children[0].get_return_type().unwrap().into(), | ||
DataType::Int32 | DataType::Int64 | DataType::Decimal | DataType::Varchar | ||
)); | ||
ensure!(DataType::Int32 == children[1].get_return_type().unwrap().into()); | ||
ensure!(res_type == children[0].get_return_type().unwrap().into()); | ||
|
||
// Get the second child as const param | ||
let literal = build_child(&children[1])?; | ||
let w = *literal.eval_const()?.unwrap().as_int32(); | ||
|
||
// Build the child | ||
let child = build_child(&children[0])?; | ||
Ok(Truncate { | ||
child, | ||
w, | ||
transform: TruncateTransform::new(w), | ||
}) | ||
} | ||
} | ||
|
||
// Year | ||
pub struct Year { | ||
child: BoxedExpression, | ||
transform: YearTransform, | ||
} | ||
|
||
impl Debug for Year { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "Iceberg_Year") | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl risingwave_expr::expr::Expression for Year { | ||
fn return_type(&self) -> DataType { | ||
DataType::Int32 | ||
} | ||
|
||
async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> { | ||
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); | ||
// Transform | ||
let res_array = self.transform.transform(arrow_array).unwrap(); | ||
// Convert back to array ref and return it | ||
Ok(Arc::new((&res_array).try_into().unwrap())) | ||
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl Build for Year { | ||
fn build( | ||
prost: &ExprNode, | ||
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>, | ||
) -> Result<Self> { | ||
let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; | ||
|
||
// Check expression | ||
ensure!(children.len() == 1); | ||
ensure!(matches!( | ||
children[0].get_return_type().unwrap().into(), | ||
DataType::Date | DataType::Timestamp | DataType::Timestamptz | ||
)); | ||
ensure!(res_type == DataType::Int32); | ||
|
||
// Build the child | ||
let child = build_child(&children[0])?; | ||
Ok(Year { | ||
child, | ||
transform: YearTransform {}, | ||
}) | ||
} | ||
} | ||
|
||
// Month | ||
pub struct Month { | ||
child: BoxedExpression, | ||
transform: MonthTransform, | ||
} | ||
|
||
impl Debug for Month { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "Iceberg_Month") | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl risingwave_expr::expr::Expression for Month { | ||
fn return_type(&self) -> DataType { | ||
DataType::Int32 | ||
} | ||
|
||
async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> { | ||
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); | ||
// Transform | ||
let res_array = self.transform.transform(arrow_array).unwrap(); | ||
// Convert back to array ref and return it | ||
Ok(Arc::new((&res_array).try_into().unwrap())) | ||
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl Build for Month { | ||
fn build( | ||
prost: &ExprNode, | ||
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>, | ||
) -> Result<Self> { | ||
let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; | ||
|
||
// Check expression | ||
ensure!(children.len() == 1); | ||
ensure!(matches!( | ||
children[0].get_return_type().unwrap().into(), | ||
DataType::Date | DataType::Timestamp | DataType::Timestamptz | ||
)); | ||
ensure!(res_type == DataType::Int32); | ||
|
||
// Build the child | ||
let child = build_child(&children[0])?; | ||
Ok(Month { | ||
child, | ||
transform: MonthTransform {}, | ||
}) | ||
} | ||
} | ||
|
||
// Day | ||
pub struct Day { | ||
child: BoxedExpression, | ||
transform: DayTransform, | ||
} | ||
|
||
impl Debug for Day { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "Iceberg_Day") | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl risingwave_expr::expr::Expression for Day { | ||
fn return_type(&self) -> DataType { | ||
DataType::Int32 | ||
} | ||
|
||
async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> { | ||
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); | ||
// Transform | ||
let res_array = self.transform.transform(arrow_array).unwrap(); | ||
// Convert back to array ref and return it | ||
Ok(Arc::new((&res_array).try_into().unwrap())) | ||
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl Build for Day { | ||
fn build( | ||
prost: &ExprNode, | ||
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>, | ||
) -> Result<Self> { | ||
let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; | ||
|
||
// Check expression | ||
ensure!(children.len() == 1); | ||
ensure!(matches!( | ||
children[0].get_return_type().unwrap().into(), | ||
DataType::Date | DataType::Timestamp | DataType::Timestamptz | ||
)); | ||
ensure!(res_type == DataType::Int32); | ||
|
||
// Build the child | ||
let child = build_child(&children[0])?; | ||
Ok(Day { | ||
child, | ||
transform: DayTransform {}, | ||
}) | ||
} | ||
} | ||
|
||
// Hour | ||
pub struct Hour { | ||
child: BoxedExpression, | ||
transform: HourTransform, | ||
} | ||
|
||
impl Debug for Hour { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "Iceberg_Hour") | ||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl risingwave_expr::expr::Expression for Hour { | ||
fn return_type(&self) -> DataType { | ||
DataType::Int32 | ||
} | ||
|
||
async fn eval(&self, data_chunk: &DataChunk) -> Result<ArrayRef> { | ||
// Get the child array | ||
let array = self.child.eval(data_chunk).await?; | ||
// Convert to arrow array | ||
let arrow_array = array.as_ref().try_into().unwrap(); | ||
// Transform | ||
let res_array = self.transform.transform(arrow_array).unwrap(); | ||
// Convert back to array ref and return it | ||
Ok(Arc::new((&res_array).try_into().unwrap())) | ||
} | ||
|
||
async fn eval_row(&self, _row: &OwnedRow) -> Result<Datum> { | ||
unimplemented!() | ||
} | ||
} | ||
|
||
impl Build for Hour { | ||
fn build( | ||
prost: &ExprNode, | ||
build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>, | ||
) -> Result<Self> { | ||
let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; | ||
|
||
// Check expression | ||
ensure!(children.len() == 1); | ||
ensure!(matches!( | ||
children[0].get_return_type().unwrap().into(), | ||
DataType::Timestamp | DataType::Timestamptz | ||
)); | ||
ensure!(res_type == DataType::Int32); | ||
|
||
// Build the child | ||
let child = build_child(&children[0])?; | ||
Ok(Hour { | ||
child, | ||
transform: HourTransform {}, | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
pub mod iceberg; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Copyright 2024 RisingWave Labs | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use std::sync::Arc; | ||
|
||
use risingwave_common::array::{ArrayImpl, ArrayRef, DataChunk, StructArray}; | ||
use risingwave_common::row::OwnedRow; | ||
use risingwave_common::types::{DataType, Datum, ScalarImpl, StructValue}; | ||
use risingwave_common::util::iter_util::ZipEqFast; | ||
use risingwave_expr::expr::{BoxedExpression, Expression}; | ||
use risingwave_expr::{build_function, Result}; | ||
|
||
#[derive(Debug)] | ||
pub struct ConstructStructExpression { | ||
return_type: DataType, | ||
children: Vec<BoxedExpression>, | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Expression for ConstructStructExpression { | ||
fn return_type(&self) -> DataType { | ||
self.return_type.clone() | ||
} | ||
|
||
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> { | ||
let mut struct_cols = Vec::with_capacity(self.children.len()); | ||
for child in &self.children { | ||
let res = child.eval(input).await?; | ||
struct_cols.push(res); | ||
} | ||
Ok(Arc::new(ArrayImpl::Struct(StructArray::new( | ||
self.return_type.as_struct().clone(), | ||
struct_cols, | ||
input.visibility().clone(), | ||
)))) | ||
} | ||
|
||
async fn eval_row(&self, input: &OwnedRow) -> Result<Datum> { | ||
let mut datums = Vec::with_capacity(self.children.len()); | ||
for child in &self.children { | ||
let res = child.eval_row(input).await?; | ||
datums.push(res); | ||
} | ||
Ok(Some(ScalarImpl::Struct(StructValue::new(datums)))) | ||
} | ||
} | ||
|
||
#[build_function("construct_struct(...) -> struct", type_infer = "panic")] | ||
fn build(return_type: DataType, children: Vec<BoxedExpression>) -> Result<BoxedExpression> { | ||
assert!(return_type.is_struct()); | ||
return_type | ||
.as_struct() | ||
.types() | ||
.zip_eq_fast(children.iter()) | ||
.for_each(|(ty, child)| { | ||
assert_eq!(*ty, child.return_type()); | ||
}); | ||
|
||
Ok(Box::new(ConstructStructExpression { | ||
return_type, | ||
children, | ||
})) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use risingwave_common::array::DataChunk; | ||
use risingwave_common::row::Row; | ||
use risingwave_common::test_prelude::DataChunkTestExt; | ||
use risingwave_common::types::ToOwnedDatum; | ||
use risingwave_common::util::iter_util::ZipEqDebug; | ||
use risingwave_expr::expr::build_from_pretty; | ||
|
||
#[tokio::test] | ||
async fn test_construct_struct_expr() { | ||
let expr = build_from_pretty( | ||
"(construct_struct:struct<a_int4,b_int4,c_int4> $0:int4 $1:int4 $2:int4)", | ||
); | ||
let (input, expected) = DataChunk::from_pretty( | ||
"i i i <i,i,i> | ||
1 2 3 (1,2,3) | ||
4 2 1 (4,2,1) | ||
9 1 3 (9,1,3) | ||
1 1 1 (1,1,1)", | ||
) | ||
.split_column_at(3); | ||
|
||
// test eval | ||
let output = expr.eval(&input).await.unwrap(); | ||
assert_eq!(&output, expected.column_at(0)); | ||
|
||
// test eval_row | ||
for (row, expected) in input.rows().zip_eq_debug(expected.rows()) { | ||
let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); | ||
assert_eq!(result, expected.datum_at(0).to_owned_datum()); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters