From 2637f8678c520c84207e58722e5feaf0cd40dd18 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 19 Sep 2023 14:59:57 +0800 Subject: [PATCH 1/3] refactor(udf): use `cfg_or_panic` for UDF implementations Signed-off-by: Bugen Zhao --- Cargo.lock | 2 ++ src/expr/Cargo.toml | 1 + src/expr/src/expr/expr_udf.rs | 31 +++---------------- src/expr/src/table_function/user_defined.rs | 25 ++-------------- src/udf/Cargo.toml | 1 + src/udf/src/external.rs | 33 ++------------------- 6 files changed, 14 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a47ade465d8d4..4cb27cf9cd3ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6886,6 +6886,7 @@ dependencies = [ "async-trait", "auto_enums", "await-tree", + "cfg-or-panic", "chrono", "chrono-tz", "criterion", @@ -7589,6 +7590,7 @@ dependencies = [ "arrow-flight", "arrow-schema", "arrow-select", + "cfg-or-panic", "futures-util", "madsim-tokio", "madsim-tonic", diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 3736cb15700d7..91c68999fd6ac 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -22,6 +22,7 @@ arrow-schema = { workspace = true } async-trait = "0.1" auto_enums = "0.8" await-tree = { workspace = true } +cfg-or-panic = "0.1" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } chrono-tz = { version = "0.8", features = ["case-insensitive"] } ctor = "0.2" diff --git a/src/expr/src/expr/expr_udf.rs b/src/expr/src/expr/expr_udf.rs index d0ecf58c3af98..6a5e56bbdba52 100644 --- a/src/expr/src/expr/expr_udf.rs +++ b/src/expr/src/expr/expr_udf.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, LazyLock, Mutex, Weak}; use arrow_schema::{Field, Fields, Schema, SchemaRef}; use await_tree::InstrumentAwait; +use cfg_or_panic::cfg_or_panic; use risingwave_common::array::{ArrayImpl, ArrayRef, DataChunk}; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; @@ -39,13 +40,13 @@ pub struct UdfExpression { span: await_tree::Span, } -#[cfg(not(madsim))] #[async_trait::async_trait] impl Expression for UdfExpression { fn return_type(&self) -> DataType { self.return_type.clone() } + #[cfg_or_panic(not(madsim))] async fn eval(&self, input: &DataChunk) -> Result { let vis = input.vis().to_bitmap(); let mut columns = Vec::with_capacity(self.children.len()); @@ -56,6 +57,7 @@ impl Expression for UdfExpression { self.eval_inner(columns, vis).await } + #[cfg_or_panic(not(madsim))] async fn eval_row(&self, input: &OwnedRow) -> Result { let mut columns = Vec::with_capacity(self.children.len()); for child in &self.children { @@ -114,7 +116,7 @@ impl UdfExpression { } } -#[cfg(not(madsim))] +#[cfg_or_panic(not(madsim))] impl<'a> TryFrom<&'a ExprNode> for UdfExpression { type Error = ExprError; @@ -171,28 +173,3 @@ pub(crate) fn get_or_create_client(link: &str) -> Result DataType { - self.return_type.clone() - } - - async fn eval(&self, input: &DataChunk) -> Result { - panic!("UDF is not supported in simulation yet"); - } - - async fn eval_row(&self, input: &OwnedRow) -> Result { - panic!("UDF is not supported in simulation yet"); - } -} - -#[cfg(madsim)] -impl<'a> TryFrom<&'a ExprNode> for UdfExpression { - type Error = ExprError; - - fn try_from(prost: &'a ExprNode) -> Result { - panic!("UDF is not supported in simulation yet"); - } -} diff --git a/src/expr/src/table_function/user_defined.rs b/src/expr/src/table_function/user_defined.rs index 813cf23504482..7ce3b878bace1 100644 --- a/src/expr/src/table_function/user_defined.rs +++ b/src/expr/src/table_function/user_defined.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; +use cfg_or_panic::cfg_or_panic; use futures_util::stream; use risingwave_common::array::{DataChunk, I32Array}; use risingwave_common::bail; @@ -35,13 +36,13 @@ pub struct UserDefinedTableFunction { chunk_size: usize, } -#[cfg(not(madsim))] #[async_trait::async_trait] impl TableFunction for UserDefinedTableFunction { fn return_type(&self) -> DataType { self.return_type.clone() } + #[cfg_or_panic(not(madsim))] async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result> { self.eval_inner(input) } @@ -124,7 +125,7 @@ impl UserDefinedTableFunction { } } -#[cfg(not(madsim))] +#[cfg_or_panic(not(madsim))] pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result { let Some(udtf) = &prost.udtf else { bail!("expect UDTF"); @@ -157,23 +158,3 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result DataType { - panic!("UDF is not supported in simulation yet"); - } - - async fn eval<'a>(&'a self, input: &'a DataChunk) -> BoxStream<'a, Result> { - panic!("UDF is not supported in simulation yet"); - } -} - -#[cfg(madsim)] -pub fn new_user_defined( - _prost: &PbTableFunction, - _chunk_size: usize, -) -> Result { - panic!("UDF is not supported in simulation yet"); -} diff --git a/src/udf/Cargo.toml b/src/udf/Cargo.toml index bad8f46a4c62d..cf75500437768 100644 --- a/src/udf/Cargo.toml +++ b/src/udf/Cargo.toml @@ -15,6 +15,7 @@ arrow-array = { workspace = true } arrow-flight = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } +cfg-or-panic = "0.1.1" futures-util = "0.3.28" static_assertions = "1" thiserror = "1" diff --git a/src/udf/src/external.rs b/src/udf/src/external.rs index 585adc7ebec5b..e9ff8652f31ec 100644 --- a/src/udf/src/external.rs +++ b/src/udf/src/external.rs @@ -19,6 +19,7 @@ use arrow_flight::error::FlightError; use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::{FlightData, FlightDescriptor}; use arrow_schema::Schema; +use cfg_or_panic::cfg_or_panic; use futures_util::{stream, Stream, StreamExt, TryStreamExt}; use tonic::transport::Channel; @@ -30,7 +31,8 @@ pub struct ArrowFlightUdfClient { client: FlightServiceClient, } -#[cfg(not(madsim))] +// TODO: support UDF in simulation +#[cfg_or_panic(not(madsim))] impl ArrowFlightUdfClient { /// Connect to a UDF service. pub async fn connect(addr: &str) -> Result { @@ -129,35 +131,6 @@ impl ArrowFlightUdfClient { } } -// TODO: support UDF in simulation -#[cfg(madsim)] -impl ArrowFlightUdfClient { - /// Connect to a UDF service. - pub async fn connect(_addr: &str) -> Result { - panic!("UDF is not supported in simulation yet") - } - - /// Check if the function is available. - pub async fn check(&self, _id: &str, _args: &Schema, _returns: &Schema) -> Result<()> { - panic!("UDF is not supported in simulation yet") - } - - /// Call a function. - pub async fn call(&self, _id: &str, _input: RecordBatch) -> Result { - panic!("UDF is not supported in simulation yet") - } - - /// Call a function with streaming input and output. - pub async fn call_stream( - &self, - _id: &str, - _inputs: impl Stream + Send + 'static, - ) -> Result> + Send + 'static> { - panic!("UDF is not supported in simulation yet"); - Ok(stream::empty()) - } -} - /// Check if two list of data types match, ignoring field names. fn data_types_match(a: &[&arrow_schema::DataType], b: &[&arrow_schema::DataType]) -> bool { if a.len() != b.len() { From c1738d630594deb82fae9a459db9158e19433c8f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 19 Sep 2023 15:03:06 +0800 Subject: [PATCH 2/3] unify version Signed-off-by: Bugen Zhao --- src/udf/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/udf/Cargo.toml b/src/udf/Cargo.toml index cf75500437768..2bdf12d44cfda 100644 --- a/src/udf/Cargo.toml +++ b/src/udf/Cargo.toml @@ -15,7 +15,7 @@ arrow-array = { workspace = true } arrow-flight = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } -cfg-or-panic = "0.1.1" +cfg-or-panic = "0.1" futures-util = "0.3.28" static_assertions = "1" thiserror = "1" From 7d3876dbe56448d973017ceb17aa3a62380662e7 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 19 Sep 2023 16:26:23 +0800 Subject: [PATCH 3/3] add panic return Signed-off-by: Bugen Zhao --- Cargo.lock | 4 ++-- src/expr/Cargo.toml | 2 +- src/tests/simulation/Cargo.toml | 2 +- src/udf/Cargo.toml | 2 +- src/udf/src/external.rs | 1 + 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cb27cf9cd3ca..36a3068229fe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1453,9 +1453,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "cfg-or-panic" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf85d5384815558275789d91d1895d1d9919a6e2534d6144650f036ac65691a6" +checksum = "bc7cb2538d4ecc42b6c3b57a83094d8c69894e74468d18cd045a09fdea807358" dependencies = [ "proc-macro2", "quote", diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 91c68999fd6ac..eab539364b32a 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -22,7 +22,7 @@ arrow-schema = { workspace = true } async-trait = "0.1" auto_enums = "0.8" await-tree = { workspace = true } -cfg-or-panic = "0.1" +cfg-or-panic = "0.2" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } chrono-tz = { version = "0.8", features = ["case-insensitive"] } ctor = "0.2" diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index ab9201e6ddd3f..127d40855652d 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -14,7 +14,7 @@ normal = ["serde"] anyhow = "1.0" async-trait = "0.1" aws-sdk-s3 = { version = "0.2", package = "madsim-aws-sdk-s3" } -cfg-or-panic = "0.1" +cfg-or-panic = "0.2" clap = { version = "4", features = ["derive"] } console = "0.15" etcd-client = { workspace = true } diff --git a/src/udf/Cargo.toml b/src/udf/Cargo.toml index 2bdf12d44cfda..2d13f39bdddc4 100644 --- a/src/udf/Cargo.toml +++ b/src/udf/Cargo.toml @@ -15,7 +15,7 @@ arrow-array = { workspace = true } arrow-flight = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } -cfg-or-panic = "0.1" +cfg-or-panic = "0.2" futures-util = "0.3.28" static_assertions = "1" thiserror = "1" diff --git a/src/udf/src/external.rs b/src/udf/src/external.rs index e9ff8652f31ec..08f5c35c21c1f 100644 --- a/src/udf/src/external.rs +++ b/src/udf/src/external.rs @@ -100,6 +100,7 @@ impl ArrowFlightUdfClient { } /// Call a function with streaming input and output. + #[panic_return = "Result>"] pub async fn call_stream( &self, id: &str,