From 56e6fc4899423db6ab0cf9ee39c233a6a54e3956 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 25 Oct 2023 15:33:36 +0800 Subject: [PATCH] fix merge issue --- src/expr/core/src/expr/expr_udf.rs | 69 +++++++------------ .../core/src/table_function/user_defined.rs | 5 +- 2 files changed, 27 insertions(+), 47 deletions(-) diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index a7867f49ec392..9d694f4c4fda1 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -85,6 +85,7 @@ impl Expression for UdfExpression { self.eval_inner(columns, vis).await } + #[cfg_or_panic(not(madsim))] async fn eval_row(&self, input: &OwnedRow) -> Result { let mut columns = Vec::with_capacity(self.children.len()); for child in &self.children { @@ -146,7 +147,6 @@ impl UdfExpression { vis.len(), ); } - let data_chunk = DataChunk::try_from(&output).expect("failed to convert UDF output to DataChunk"); let output = data_chunk.uncompact(vis.clone()); @@ -154,6 +154,14 @@ impl UdfExpression { let Some(array) = output.columns().get(0) else { bail!("UDF returned no columns"); }; + if !array.data_type().equals_datatype(&self.return_type) { + bail!( + "UDF returned {:?}, but expected {:?}", + array.data_type(), + self.return_type, + ); + } + Ok(array.clone()) } } @@ -167,21 +175,6 @@ impl Build for UdfExpression { let return_type = DataType::from(prost.get_return_type().unwrap()); let udf = prost.get_rex_node().unwrap().as_udf().unwrap(); - let arg_schema = Arc::new(Schema::new( - udf.arg_types - .iter() - .map::, _>(|t| { - Ok(Field::new( - "", - DataType::from(t) - .try_into() - .map_err(risingwave_udf::Error::Unsupported)?, - true, - )) - }) - .try_collect::()?, - )); - let imp = match &udf.extra { None | Some(PbExtra::External(PbExternalUdfExtra {})) => UdfImpl::External { client: get_or_create_flight_client(&udf.link)?, @@ -203,6 +196,21 @@ impl Build for UdfExpression { } }; + let arg_schema = Arc::new(Schema::new( + udf.arg_types + .iter() + .map::, _>(|t| { + Ok(Field::new( + "", + DataType::from(t) + .try_into() + .map_err(risingwave_udf::Error::Unsupported)?, + true, + )) + }) + .try_collect::()?, + )); + Ok(Self { children: udf.children.iter().map(build_child).try_collect()?, arg_types: udf.arg_types.iter().map(|t| t.into()).collect(), @@ -227,35 +235,8 @@ pub(crate) fn get_or_create_flight_client(link: &str) -> Result DataType { - self.return_type.clone() - } - - async fn eval(&self, input: &DataChunk) -> Result { - panic!("UDF is not supported in simulation yet"); - } - - async fn eval_row(&self, input: &OwnedRow) -> Result { - panic!("UDF is not supported in simulation yet"); - } -} - -#[cfg(madsim)] -impl<'a> TryFrom<&'a ExprNode> for UdfExpression { - type Error = ExprError; - - fn try_from(prost: &'a ExprNode) -> Result { - panic!("UDF is not supported in simulation yet"); - } -} diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index be391ca224bc3..88d3252338fed 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -132,9 +132,6 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result Result()?, )); + // connect to UDF service + let client = crate::expr::expr_udf::get_or_create_flight_client(&udtf.link)?; Ok(UserDefinedTableFunction { children: prost.args.iter().map(expr_build_from_prost).try_collect()?,