diff --git a/Cargo.lock b/Cargo.lock index d8b661d61f814..44e6b93a054c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9151,6 +9151,7 @@ dependencies = [ name = "risingwave_error" version = "1.7.0-alpha" dependencies = [ + "anyhow", "bincode 1.3.3", "bytes", "easy-ext", diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index d25658dfaf5e5..08b538568712f 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -12,50 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum ConnectorError { - #[error("Parse error: {0}")] - Parse(&'static str), - - #[error("Invalid parameter {name}: {reason}")] - InvalidParam { name: &'static str, reason: String }, - - #[error("Kafka error: {0}")] - Kafka(#[from] rdkafka::error::KafkaError), - - #[error("Config error: {0}")] - Config( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error("Connection error: {0}")] - Connection( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error("MySQL error: {0}")] - MySql(#[from] mysql_async::Error), - - #[error("Postgres error: {0}")] - Postgres(#[from] tokio_postgres::Error), - - #[error("Pulsar error: {0}")] - Pulsar( - #[source] - #[backtrace] - anyhow::Error, - ), - - #[error(transparent)] - Internal( - #[from] - #[backtrace] - anyhow::Error, - ), +use risingwave_common::error::v2::def_anyhow_newtype; + +def_anyhow_newtype! { + /// The error type for the `connector` crate. + /// + /// We use [`anyhow::Error`] under the hood as the connector has to deal with + /// various kinds of errors from different external crates. It acts more like an + /// application and callers may not expect to handle it in a fine-grained way. + pub ConnectorError; } diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 78c6c714e2bc6..c9fe938da4b9f 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -17,7 +17,7 @@ mod postgres; use std::collections::HashMap; -use anyhow::{anyhow, Context}; +use anyhow::Context; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -77,10 +77,7 @@ impl CdcTableType { Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( PostgresExternalTableReader::new(with_properties, schema).await?, )), - _ => bail!(ConnectorError::Config(anyhow!( - "invalid external table type: {:?}", - *self - ))), + _ => bail!("invalid external table type: {:?}", *self), } } } @@ -405,19 +402,11 @@ impl MySqlExternalTableReader { DataType::Date => Value::from(value.into_date().0), DataType::Time => Value::from(value.into_time().0), DataType::Timestamp => Value::from(value.into_timestamp().0), - _ => { - return Err(ConnectorError::Internal(anyhow!( - "unsupported primary key data type: {}", - ty - ))) - } + _ => bail!("unsupported primary key data type: {}", ty), }; - Ok((pk.clone(), val)) + ConnectorResult::Ok((pk.clone(), val)) } else { - Err(ConnectorError::Internal(anyhow!( - "primary key {} cannot be null", - pk - ))) + bail!("primary key {} cannot be null", pk); } }) .try_collect()?; diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 7181710b70868..139af839bd16d 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use arrow_array::{Int32Array, Int64Array, RecordBatch}; use async_trait::async_trait; use futures::StreamExt; @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::catalog::ROWID_PREFIX; use risingwave_common::{bail, ensure}; -use crate::error::ConnectorError; use crate::parser::ParserConfig; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties}; @@ -398,10 +397,11 @@ impl PulsarIcebergReader { fn build_iceberg_configs(&self) -> anyhow::Result> { let mut iceberg_configs = HashMap::new(); - let bucket = - self.props.iceberg_bucket.as_ref().ok_or_else(|| { - ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured")) - })?; + let bucket = self + .props + .iceberg_bucket + .as_ref() + .context("Iceberg bucket is not configured")?; iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string()); iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string()); diff --git a/src/error/Cargo.toml b/src/error/Cargo.toml index 13bb50a371853..4a99711db6c41 100644 --- a/src/error/Cargo.toml +++ b/src/error/Cargo.toml @@ -8,6 +8,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] +anyhow = "1" bincode = "1" bytes = "1" easy-ext = "1" diff --git a/src/error/src/anyhow.rs b/src/error/src/anyhow.rs new file mode 100644 index 0000000000000..6f3dbc9c4765c --- /dev/null +++ b/src/error/src/anyhow.rs @@ -0,0 +1,89 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Define a newtype wrapper around [`anyhow::Error`]. +/// +/// # Motivation +/// +/// [`anyhow::Error`] is good enough if one just wants to make the error type +/// informative but not necessarily actionable. However, given a value of type +/// [`anyhow::Error`], it is hard to tell which module or crate it comes from, +/// which may blur the boundary between modules when passing it around, leading +/// to abuse. +/// +/// # Usage +/// +/// ```ignore +/// def_anyhow_newtype! { +/// /// Documentation for the newtype. +/// #[derive(..)] +/// pub MyError; +/// } +/// ``` +/// +/// This will define a newtype `MyError` around [`anyhow::Error`]. +/// +/// The newtype can be converted from any type that implements `Into`, +/// so the developing experience is kept almost the same. To construct a new error, +/// one can still use macros like `anyhow::anyhow!` or `risingwave_common::bail!`. +/// +/// Since `bail!` and `?` already imply an `into()` call, developers do not need to +/// care about the conversion from [`anyhow::Error`] to the newtype most of the time. +/// +/// # Limitation +/// +/// Note that the newtype does not implement [`std::error::Error`] just like +/// [`anyhow::Error`]. However, it can be dereferenced to `dyn std::error::Error` +/// to be used in places like `thiserror`'s `#[source]` attribute. +#[macro_export] +macro_rules! def_anyhow_newtype { + ($(#[$attr:meta])* $vis:vis $name:ident $(;)?) => { + $(#[$attr])* $vis struct $name(::anyhow::Error); + + impl std::ops::Deref for $name { + type Target = ::anyhow::Error; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl std::fmt::Debug for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } + + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } + + impl From for $name + where + E: Into<::anyhow::Error>, + { + fn from(value: E) -> Self { + Self(value.into()) + } + } + + impl From<$name> for Box { + fn from(value: $name) -> Self { + value.0.into() + } + } + }; +} diff --git a/src/error/src/lib.rs b/src/error/src/lib.rs index ccfec0cfbcc19..d3364485e8f2f 100644 --- a/src/error/src/lib.rs +++ b/src/error/src/lib.rs @@ -21,4 +21,5 @@ #![feature(register_tool)] #![register_tool(rw)] +pub mod anyhow; pub mod tonic;