From c7eada9e21f78e9c3f16220f57ce8a99f5cbeeaa Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 1 Nov 2023 21:34:40 +0800 Subject: [PATCH 1/4] refactor: introduce `thiserror-ext` for painless error handling Signed-off-by: Bugen Zhao --- Cargo.lock | 20 ++++++++++++++++ Cargo.toml | 1 + src/expr/core/src/expr/expr_udf.rs | 2 +- .../core/src/table_function/user_defined.rs | 2 +- src/udf/Cargo.toml | 1 + src/udf/src/error.rs | 23 +++++-------------- src/udf/src/external.rs | 11 +++++---- src/udf/src/lib.rs | 2 ++ 8 files changed, 39 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8fef1708dc5fa..a9bee231d3791 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8414,6 +8414,7 @@ dependencies = [ "madsim-tonic", "static_assertions", "thiserror", + "thiserror-ext", ] [[package]] @@ -9956,6 +9957,25 @@ dependencies = [ "thiserror-impl", ] +[[package]] +name = "thiserror-ext" +version = "0.1.0" +source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=ba5d40#ba5d4012025da323e893d6d14af9502f0107272a" +dependencies = [ + "thiserror", + "thiserror-ext-derive", +] + +[[package]] +name = "thiserror-ext-derive" +version = "0.1.0" +source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=ba5d40#ba5d4012025da323e893d6d14af9502f0107272a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "thiserror-impl" version = "1.0.48" diff --git a/Cargo.toml b/Cargo.toml index dc38b19e237f4..fb2e513f1ad1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ arrow-buffer = "48" arrow-flight = "48" arrow-select = "48" arrow-ord = "48" +thiserror-ext = { git = "https://github.com/risingwavelabs/thiserror-ext.git", rev = "ba5d40" } tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index a11af2434b4f9..8aae5beae60a4 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -150,7 +150,7 @@ impl Build for UdfExpression { "", DataType::from(t) .try_into() - .map_err(risingwave_udf::Error::Unsupported)?, + .map_err(risingwave_udf::Error::unsupported)?, true, )) }) diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index 60fde34f9df1f..cbb8682ba48a7 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -140,7 +140,7 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result = std::result::Result; /// The error type for UDF operations. -#[derive(thiserror::Error, Debug)] -pub enum Error { +#[derive(thiserror::Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] +#[thiserror_ext(type = Error)] +pub enum ErrorInner { #[error("failed to connect to UDF service: {0}")] Connect(#[from] tonic::transport::Error), #[error("failed to send requests to UDF service: {0}")] - Tonic(#[from] Box), + Tonic(#[from] tonic::Status), #[error("failed to call UDF: {0}")] - Flight(#[from] Box), + Flight(#[from] FlightError), #[error("type mismatch: {0}")] TypeMismatch(String), @@ -45,16 +46,4 @@ pub enum Error { ServiceError(String), } -static_assertions::const_assert_eq!(std::mem::size_of::(), 40); - -impl From for Error { - fn from(status: tonic::Status) -> Self { - Error::from(Box::new(status)) - } -} - -impl From for Error { - fn from(error: FlightError) -> Self { - Error::from(Box::new(error)) - } -} +static_assertions::const_assert_eq!(std::mem::size_of::(), 8); diff --git a/src/udf/src/external.rs b/src/udf/src/external.rs index e77b96f2bdab4..8215e366b9816 100644 --- a/src/udf/src/external.rs +++ b/src/udf/src/external.rs @@ -59,7 +59,7 @@ impl ArrowFlightUdfClient { let full_schema = Schema::try_from(info) .map_err(|e| FlightError::DecodeError(format!("Error decoding schema: {e}")))?; if input_num > full_schema.fields.len() { - return Err(Error::ServiceError(format!( + return Err(Error::service_error(format!( "function {:?} schema info not consistency: input_num: {}, total_fields: {}", id, input_num, @@ -73,13 +73,13 @@ impl ArrowFlightUdfClient { let expect_input_types: Vec<_> = args.fields.iter().map(|f| f.data_type()).collect(); let expect_result_types: Vec<_> = returns.fields.iter().map(|f| f.data_type()).collect(); if !data_types_match(&expect_input_types, &actual_input_types) { - return Err(Error::TypeMismatch(format!( + return Err(Error::type_mismatch(format!( "function: {:?}, expect arguments: {:?}, actual: {:?}", id, expect_input_types, actual_input_types ))); } if !data_types_match(&expect_result_types, &actual_result_types) { - return Err(Error::TypeMismatch(format!( + return Err(Error::type_mismatch(format!( "function: {:?}, expect return: {:?}, actual: {:?}", id, expect_result_types, actual_result_types ))); @@ -91,7 +91,10 @@ impl ArrowFlightUdfClient { pub async fn call(&self, id: &str, input: RecordBatch) -> Result { let mut output_stream = self.call_stream(id, stream::once(async { input })).await?; // TODO: support no output - let head = output_stream.next().await.ok_or(Error::NoReturned)??; + let head = output_stream + .next() + .await + .ok_or_else(|| Error::no_returned())??; let mut remaining = vec![]; while let Some(batch) = output_stream.next().await { remaining.push(batch?); diff --git a/src/udf/src/lib.rs b/src/udf/src/lib.rs index 513551a9108af..25207c2c19edf 100644 --- a/src/udf/src/lib.rs +++ b/src/udf/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(error_generic_member_access)] + mod error; mod external; From fda56a3049aa09521855e2e18453b3c7089574e5 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 2 Nov 2023 17:34:19 +0800 Subject: [PATCH 2/4] bump and use Signed-off-by: Bugen Zhao --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- src/udf/src/error.rs | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9bee231d3791..c463720cf6bb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9960,7 +9960,7 @@ dependencies = [ [[package]] name = "thiserror-ext" version = "0.1.0" -source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=ba5d40#ba5d4012025da323e893d6d14af9502f0107272a" +source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=5ed5fe#5ed5fe9320acd5e7dac338919400f2b21341e360" dependencies = [ "thiserror", "thiserror-ext-derive", @@ -9969,7 +9969,7 @@ dependencies = [ [[package]] name = "thiserror-ext-derive" version = "0.1.0" -source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=ba5d40#ba5d4012025da323e893d6d14af9502f0107272a" +source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=5ed5fe#5ed5fe9320acd5e7dac338919400f2b21341e360" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index fb2e513f1ad1e..1c9b15b09b0cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ arrow-buffer = "48" arrow-flight = "48" arrow-select = "48" arrow-ord = "48" -thiserror-ext = { git = "https://github.com/risingwavelabs/thiserror-ext.git", rev = "ba5d40" } +thiserror-ext = { git = "https://github.com/risingwavelabs/thiserror-ext.git", rev = "5ed5fe" } tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/src/udf/src/error.rs b/src/udf/src/error.rs index d6cae438f0fbd..15fd2b7fbb8ce 100644 --- a/src/udf/src/error.rs +++ b/src/udf/src/error.rs @@ -13,12 +13,14 @@ // limitations under the License. use arrow_flight::error::FlightError; +use thiserror::Error; +use thiserror_ext::{Box, Construct}; /// A specialized `Result` type for UDF operations. pub type Result = std::result::Result; /// The error type for UDF operations. -#[derive(thiserror::Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] +#[derive(Error, Debug, Box, Construct)] #[thiserror_ext(type = Error)] pub enum ErrorInner { #[error("failed to connect to UDF service: {0}")] From 169ce75fe4a2a5b27ed75f57e5d944b48b767769 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 2 Nov 2023 17:35:24 +0800 Subject: [PATCH 3/4] minor Signed-off-by: Bugen Zhao --- src/udf/src/external.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/udf/src/external.rs b/src/udf/src/external.rs index 8215e366b9816..c5ed44850f1bf 100644 --- a/src/udf/src/external.rs +++ b/src/udf/src/external.rs @@ -94,7 +94,7 @@ impl ArrowFlightUdfClient { let head = output_stream .next() .await - .ok_or_else(|| Error::no_returned())??; + .ok_or_else(Error::no_returned)??; let mut remaining = vec![]; while let Some(batch) = output_stream.next().await { remaining.push(batch?); From 6b03485ee903c11a42b0930a927f0a0fe865d3a7 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 2 Nov 2023 19:32:57 +0800 Subject: [PATCH 4/4] use crates.io version Signed-off-by: Bugen Zhao --- Cargo.lock | 10 ++++++---- Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c463720cf6bb4..eedd4054cb4f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9959,8 +9959,9 @@ dependencies = [ [[package]] name = "thiserror-ext" -version = "0.1.0" -source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=5ed5fe#5ed5fe9320acd5e7dac338919400f2b21341e360" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f28a4a7351f496662affc257826b85dd2a613406ce3cc2f07b849685e166d8c" dependencies = [ "thiserror", "thiserror-ext-derive", @@ -9968,8 +9969,9 @@ dependencies = [ [[package]] name = "thiserror-ext-derive" -version = "0.1.0" -source = "git+https://github.com/risingwavelabs/thiserror-ext.git?rev=5ed5fe#5ed5fe9320acd5e7dac338919400f2b21341e360" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67621f6d39449754da63668ddd2423ad0c81c27434c16090f8805ad1db59b621" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 1c9b15b09b0cb..919c990deb7d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ arrow-buffer = "48" arrow-flight = "48" arrow-select = "48" arrow-ord = "48" -thiserror-ext = { git = "https://github.com/risingwavelabs/thiserror-ext.git", rev = "5ed5fe" } +thiserror-ext = "0.0.1" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling",