From 96507dea624ac77b7090967c3f40baab4e23fa80 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 6 Feb 2024 16:00:31 +0800 Subject: [PATCH 1/5] refactor(connector): make `ConnectorError` a wrapper of `anyhow::Error` --- Cargo.lock | 1 + src/connector/src/error.rs | 55 ++---------- src/connector/src/source/cdc/external/mod.rs | 21 ++--- .../src/source/pulsar/source/reader.rs | 12 +-- src/error/Cargo.toml | 1 + src/error/src/anyhow.rs | 89 +++++++++++++++++++ src/error/src/lib.rs | 1 + 7 files changed, 112 insertions(+), 68 deletions(-) create mode 100644 src/error/src/anyhow.rs diff --git a/Cargo.lock b/Cargo.lock index 8f91a6aa97020..7c89a6da1f53f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9145,6 +9145,7 @@ dependencies = [ name = "risingwave_error" version = "1.7.0-alpha" dependencies = [ + "anyhow", "bincode 1.3.3", "bytes", "easy-ext", diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index d25658dfaf5e5..08b538568712f 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -12,50 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum ConnectorError { - #[error("Parse error: {0}")] - Parse(&'static str), - - #[error("Invalid parameter {name}: {reason}")] - InvalidParam { name: &'static str, reason: String }, - - #[error("Kafka error: {0}")] - Kafka(#[from] rdkafka::error::KafkaError), - - #[error("Config error: {0}")] - Config( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error("Connection error: {0}")] - Connection( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error("MySQL error: {0}")] - MySql(#[from] mysql_async::Error), - - #[error("Postgres error: {0}")] - Postgres(#[from] tokio_postgres::Error), - - #[error("Pulsar error: {0}")] - Pulsar( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error(transparent)] - Internal( - #[from] - #[backtrace] - anyhow::Error, - ), +use risingwave_common::error::v2::def_anyhow_newtype; + +def_anyhow_newtype! { + /// The error type for the `connector` crate. + /// + /// We use [`anyhow::Error`] under the hood as the connector has to deal with + /// various kinds of errors from different external crates. It acts more like an + /// application and callers may not expect to handle it in a fine-grained way. + pub ConnectorError; } diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 78c6c714e2bc6..c9fe938da4b9f 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -17,7 +17,7 @@ mod postgres; use std::collections::HashMap; -use anyhow::{anyhow, Context}; +use anyhow::Context; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -77,10 +77,7 @@ impl CdcTableType { Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(with_properties, schema).await?, )), - _ => bail!(ConnectorError::Config(anyhow!( - "invalid external table type: {:?}", - *self - ))), + _ => bail!("invalid external table type: {:?}", *self), } } } @@ -405,19 +402,11 @@ impl MySqlExternalTableReader { DataType::Date => Value::from(value.into_date().0), DataType::Time => Value::from(value.into_time().0), DataType::Timestamp => Value::from(value.into_timestamp().0), - _ => { - return Err(ConnectorError::Internal(anyhow!( - "unsupported primary key data type: {}", - ty - ))) - } + _ => bail!("unsupported primary key data type: {}", ty), }; - Ok((pk.clone(), val)) + ConnectorResult::Ok((pk.clone(), val)) } else { - Err(ConnectorError::Internal(anyhow!( - "primary key {} cannot be null", - pk - ))) + bail!("primary key {} cannot be null", pk); } }) .try_collect()?; diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 7181710b70868..139af839bd16d 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use arrow_array::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::catalog::ROWID_PREFIX; use risingwave_common::{bail, ensure}; -use crate::error::ConnectorError; use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; @@ -398,10 +397,11 @@ impl PulsarIcebergReader { fn build_iceberg_configs(&self) -> anyhow::Result> { let mut iceberg_configs = HashMap::new(); - let bucket = - self.props.iceberg_bucket.as_ref().ok_or_else(|| { - ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured")) - })?; + let bucket = self + .props + .iceberg_bucket + .as_ref() + .context("Iceberg bucket is not configured")?; iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string()); iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string()); diff --git a/src/error/Cargo.toml b/src/error/Cargo.toml index 13bb50a371853..4a99711db6c41 100644 --- a/src/error/Cargo.toml +++ b/src/error/Cargo.toml @@ -8,6 +8,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] +anyhow = "1" bincode = "1" bytes = "1" easy-ext = "1" diff --git a/src/error/src/anyhow.rs b/src/error/src/anyhow.rs new file mode 100644 index 0000000000000..6f3dbc9c4765c --- /dev/null +++ b/src/error/src/anyhow.rs @@ -0,0 +1,89 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Define a newtype wrapper around [`anyhow::Error`]. +/// +/// # Motivation +/// +/// [`anyhow::Error`] is good enough if one just wants to make the error type +/// informative but not necessarily actionable. However, given a value of type +/// [`anyhow::Error`], it is hard to tell which module or crate it comes from, +/// which may blur the boundary between modules when passing it around, leading +/// to abuse. +/// +/// # Usage +/// +/// ```ignore +/// def_anyhow_newtype! { +/// /// Documentation for the newtype. +/// #[derive(..)] +/// pub MyError; +/// } +/// ``` +/// +/// This will define a newtype `MyError` around [`anyhow::Error`]. +/// +/// The newtype can be converted from any type that implements `Into`, +/// so the developing experience is kept almost the same. To construct a new error, +/// one can still use macros like `anyhow::anyhow!` or `risingwave_common::bail!`. +/// +/// Since `bail!` and `?` already imply an `into()` call, developers do not need to +/// care about the conversion from [`anyhow::Error`] to the newtype most of the time. +/// +/// # Limitation +/// +/// Note that the newtype does not implement [`std::error::Error`] just like +/// [`anyhow::Error`]. However, it can be dereferenced to `dyn std::error::Error` +/// to be used in places like `thiserror`'s `#[source]` attribute. +#[macro_export] +macro_rules! def_anyhow_newtype { + ($(#[$attr:meta])* $vis:vis $name:ident $(;)?) => { + $(#[$attr])* $vis struct $name(::anyhow::Error); + + impl std::ops::Deref for $name { + type Target = ::anyhow::Error; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl std::fmt::Debug for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } + + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } + + impl From for $name + where + E: Into<::anyhow::Error>, + { + fn from(value: E) -> Self { + Self(value.into()) + } + } + + impl From<$name> for Box { + fn from(value: $name) -> Self { + value.0.into() + } + } + }; +} diff --git a/src/error/src/lib.rs b/src/error/src/lib.rs index ccfec0cfbcc19..d3364485e8f2f 100644 --- a/src/error/src/lib.rs +++ b/src/error/src/lib.rs @@ -21,4 +21,5 @@ #![feature(register_tool)] #![register_tool(rw)] +pub mod anyhow; pub mod tonic; From cfcd1b823c389d7653e2afad9723a61fe8760b64 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 6 Feb 2024 16:17:13 +0800 Subject: [PATCH 2/5] move connector result Signed-off-by: Bugen Zhao --- src/connector/src/error.rs | 2 ++ .../src/source/cdc/external/mock_external_table.rs | 5 ++--- src/connector/src/source/cdc/external/mod.rs | 4 +--- src/connector/src/source/cdc/external/postgres.rs | 6 +++--- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 08b538568712f..99aaa15d871ca 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -22,3 +22,5 @@ def_anyhow_newtype! { /// application and callers may not expect to handle it in a fine-grained way. pub ConnectorError; } + +pub type ConnectorResult = std::result::Result; diff --git a/src/connector/src/source/cdc/external/mock_external_table.rs b/src/connector/src/source/cdc/external/mock_external_table.rs index 2c8ee00f67af9..d7c39e2a9aa8b 100644 --- a/src/connector/src/source/cdc/external/mock_external_table.rs +++ b/src/connector/src/source/cdc/external/mock_external_table.rs @@ -19,10 +19,9 @@ use futures_async_stream::try_stream; use risingwave_common::row::OwnedRow; use risingwave_common::types::ScalarImpl; -use crate::error::ConnectorError; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::cdc::external::{ - CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset, - SchemaTableName, + CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName, }; #[derive(Debug)] diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index c9fe938da4b9f..f281d1ecea58a 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -32,13 +32,11 @@ use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use serde_derive::{Deserialize, Serialize}; -use crate::error::ConnectorError; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::mysql_row_to_owned_row; use crate::source::cdc::external::mock_external_table::MockExternalTableReader; use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; -pub type ConnectorResult = std::result::Result; - #[derive(Debug)] pub enum CdcTableType { Undefined, diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index f8f0c9d402347..bd8a0b51c04e7 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -28,11 +28,11 @@ use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; use tokio_postgres::NoTls; -use crate::error::ConnectorError; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::postgres_row_to_owned_row; use crate::source::cdc::external::{ - CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig, - ExternalTableReader, SchemaTableName, + CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader, + SchemaTableName, }; #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] From 6636d8ee2d6b3781ad5b2da30221295eed480181 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 7 Feb 2024 11:19:33 +0800 Subject: [PATCH 3/5] impl `Context` trait and `into_inner` Signed-off-by: Bugen Zhao --- src/error/src/anyhow.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/error/src/anyhow.rs b/src/error/src/anyhow.rs index 6f3dbc9c4765c..e765d4194f237 100644 --- a/src/error/src/anyhow.rs +++ b/src/error/src/anyhow.rs @@ -51,6 +51,13 @@ macro_rules! def_anyhow_newtype { ($(#[$attr:meta])* $vis:vis $name:ident $(;)?) => { $(#[$attr])* $vis struct $name(::anyhow::Error); + impl $name { + /// Unwrap the newtype to get the inner [`anyhow::Error`]. + pub fn into_inner(self) -> ::anyhow::Error { + self.0 + } + } + impl std::ops::Deref for $name { type Target = ::anyhow::Error; @@ -85,5 +92,32 @@ macro_rules! def_anyhow_newtype { value.0.into() } } + + paste::paste! { + /// Provides the `context` method for `Result` of [`ConnectorError`]. + /// + /// This trait is the supplement of the [`anyhow::Context`] trait as it cannot be + /// implemented for types outside of the crate. + #[easy_ext::ext([< $name Context >])] + $vis impl Result { + /// Wrap the error value with additional context. + fn context(self, context: C) -> Result + where + C: std::fmt::Display + Send + Sync + 'static, + { + ::anyhow::Context::context(self.map_err(|error| error.0), context) + } + + /// Wrap the error value with additional context that is evaluated lazily + /// only once an error does occur. + fn with_context(self, context: F) -> Result + where + C: std::fmt::Display + Send + Sync + 'static, + F: FnOnce() -> C, + { + ::anyhow::Context::with_context(self.map_err(|error| error.0), context) + } + } + } }; } From aeab5afb12a550c1fb71717c15cc7af1e676803a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 7 Feb 2024 18:33:23 +0800 Subject: [PATCH 4/5] revert anyhow newtype Signed-off-by: Bugen Zhao --- src/connector/src/error.rs | 29 ++++++--- src/error/src/anyhow.rs | 123 ------------------------------------- src/error/src/lib.rs | 1 - 3 files changed, 20 insertions(+), 133 deletions(-) delete mode 100644 src/error/src/anyhow.rs diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 99aaa15d871ca..db773add56364 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -12,15 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::error::v2::def_anyhow_newtype; +use thiserror::Error; -def_anyhow_newtype! { - /// The error type for the `connector` crate. - /// - /// We use [`anyhow::Error`] under the hood as the connector has to deal with - /// various kinds of errors from different external crates. It acts more like an - /// application and callers may not expect to handle it in a fine-grained way. - pub ConnectorError; +#[derive(Error, Debug)] +pub enum ConnectorError { + #[error("MySQL error: {0}")] + MySql( + #[from] + #[backtrace] + mysql_async::Error, + ), + + #[error("Postgres error: {0}")] + Postgres(#[from] tokio_postgres::Error), + + #[error(transparent)] + Uncategorized( + #[from] + #[backtrace] + anyhow::Error, + ), } -pub type ConnectorResult = std::result::Result; +pub type ConnectorResult = Result; diff --git a/src/error/src/anyhow.rs b/src/error/src/anyhow.rs deleted file mode 100644 index e765d4194f237..0000000000000 --- a/src/error/src/anyhow.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// Define a newtype wrapper around [`anyhow::Error`]. -/// -/// # Motivation -/// -/// [`anyhow::Error`] is good enough if one just wants to make the error type -/// informative but not necessarily actionable. However, given a value of type -/// [`anyhow::Error`], it is hard to tell which module or crate it comes from, -/// which may blur the boundary between modules when passing it around, leading -/// to abuse. -/// -/// # Usage -/// -/// ```ignore -/// def_anyhow_newtype! { -/// /// Documentation for the newtype. -/// #[derive(..)] -/// pub MyError; -/// } -/// ``` -/// -/// This will define a newtype `MyError` around [`anyhow::Error`]. -/// -/// The newtype can be converted from any type that implements `Into`, -/// so the developing experience is kept almost the same. To construct a new error, -/// one can still use macros like `anyhow::anyhow!` or `risingwave_common::bail!`. -/// -/// Since `bail!` and `?` already imply an `into()` call, developers do not need to -/// care about the conversion from [`anyhow::Error`] to the newtype most of the time. -/// -/// # Limitation -/// -/// Note that the newtype does not implement [`std::error::Error`] just like -/// [`anyhow::Error`]. However, it can be dereferenced to `dyn std::error::Error` -/// to be used in places like `thiserror`'s `#[source]` attribute. -#[macro_export] -macro_rules! def_anyhow_newtype { - ($(#[$attr:meta])* $vis:vis $name:ident $(;)?) => { - $(#[$attr])* $vis struct $name(::anyhow::Error); - - impl $name { - /// Unwrap the newtype to get the inner [`anyhow::Error`]. - pub fn into_inner(self) -> ::anyhow::Error { - self.0 - } - } - - impl std::ops::Deref for $name { - type Target = ::anyhow::Error; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - - impl std::fmt::Debug for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } - } - - impl std::fmt::Display for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } - } - - impl From for $name - where - E: Into<::anyhow::Error>, - { - fn from(value: E) -> Self { - Self(value.into()) - } - } - - impl From<$name> for Box { - fn from(value: $name) -> Self { - value.0.into() - } - } - - paste::paste! { - /// Provides the `context` method for `Result` of [`ConnectorError`]. - /// - /// This trait is the supplement of the [`anyhow::Context`] trait as it cannot be - /// implemented for types outside of the crate. - #[easy_ext::ext([< $name Context >])] - $vis impl Result { - /// Wrap the error value with additional context. - fn context(self, context: C) -> Result - where - C: std::fmt::Display + Send + Sync + 'static, - { - ::anyhow::Context::context(self.map_err(|error| error.0), context) - } - - /// Wrap the error value with additional context that is evaluated lazily - /// only once an error does occur. - fn with_context(self, context: F) -> Result - where - C: std::fmt::Display + Send + Sync + 'static, - F: FnOnce() -> C, - { - ::anyhow::Context::with_context(self.map_err(|error| error.0), context) - } - } - } - }; -} diff --git a/src/error/src/lib.rs b/src/error/src/lib.rs index d3364485e8f2f..ccfec0cfbcc19 100644 --- a/src/error/src/lib.rs +++ b/src/error/src/lib.rs @@ -21,5 +21,4 @@ #![feature(register_tool)] #![register_tool(rw)] -pub mod anyhow; pub mod tonic; From 2dbb5fcef3b3ce8abb15b7cfb4b697ab3aaf6840 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 13 Feb 2024 13:57:05 +0800 Subject: [PATCH 5/5] revert cargo lock changes Signed-off-by: Bugen Zhao --- Cargo.lock | 1 - src/error/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c89a6da1f53f..8f91a6aa97020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9145,7 +9145,6 @@ dependencies = [ name = "risingwave_error" version = "1.7.0-alpha" dependencies = [ - "anyhow", "bincode 1.3.3", "bytes", "easy-ext", diff --git a/src/error/Cargo.toml b/src/error/Cargo.toml index 4a99711db6c41..13bb50a371853 100644 --- a/src/error/Cargo.toml +++ b/src/error/Cargo.toml @@ -8,7 +8,6 @@ license = { workspace = true } repository = { workspace = true } [dependencies] -anyhow = "1" bincode = "1" bytes = "1" easy-ext = "1"