Skip to content

Commit

Permalink
refactor(connector): make ConnectorError a wrapper of anyhow::Error
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Feb 6, 2024
1 parent e7fd3dd commit bc0fa6e
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 9 additions & 46 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use thiserror::Error;

#[derive(Error, Debug)]
pub enum ConnectorError {
#[error("Parse error: {0}")]
Parse(&'static str),

#[error("Invalid parameter {name}: {reason}")]
InvalidParam { name: &'static str, reason: String },

#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),

#[error("Config error: {0}")]
Config(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("Connection error: {0}")]
Connection(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Postgres error: {0}")]
Postgres(#[from] tokio_postgres::Error),

#[error("Pulsar error: {0}")]
Pulsar(
#[source]
#[backtrace]
anyhow::Error,
),

#[error(transparent)]
Internal(
#[from]
#[backtrace]
anyhow::Error,
),
use risingwave_common::error::v2::def_anyhow_newtype;

def_anyhow_newtype! {
/// The error type for the `connector` crate.
///
/// We use [`anyhow::Error`] under the hood as the connector has to deal with
/// various kinds of errors from different external crates. It acts more like an
/// application and callers may not expect to handle it in a fine-grained way.
pub ConnectorError;
}
21 changes: 5 additions & 16 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod postgres;

use std::collections::HashMap;

use anyhow::{anyhow, Context};
use anyhow::Context;
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
Expand Down Expand Up @@ -77,10 +77,7 @@ impl CdcTableType {
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(with_properties, schema).await?,
)),
_ => bail!(ConnectorError::Config(anyhow!(
"invalid external table type: {:?}",
*self
))),
_ => bail!("invalid external table type: {:?}", *self),
}
}
}
Expand Down Expand Up @@ -405,19 +402,11 @@ impl MySqlExternalTableReader {
DataType::Date => Value::from(value.into_date().0),
DataType::Time => Value::from(value.into_time().0),
DataType::Timestamp => Value::from(value.into_timestamp().0),
_ => {
return Err(ConnectorError::Internal(anyhow!(
"unsupported primary key data type: {}",
ty
)))
}
_ => bail!("unsupported primary key data type: {}", ty),
};
Ok((pk.clone(), val))
ConnectorResult::Ok((pk.clone(), val))
} else {
Err(ConnectorError::Internal(anyhow!(
"primary key {} cannot be null",
pk
)))
bail!("primary key {} cannot be null", pk);
}
})
.try_collect()?;
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
use anyhow::Context;
use arrow_array::{Int32Array, Int64Array, RecordBatch};
use async_trait::async_trait;
use futures::StreamExt;
Expand All @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::ROWID_PREFIX;
use risingwave_common::{bail, ensure};

use crate::error::ConnectorError;
use crate::parser::ParserConfig;
use crate::source::pulsar::split::PulsarSplit;
use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties};
Expand Down Expand Up @@ -398,10 +397,11 @@ impl PulsarIcebergReader {
fn build_iceberg_configs(&self) -> anyhow::Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let bucket =
self.props.iceberg_bucket.as_ref().ok_or_else(|| {
ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured"))
})?;
let bucket = self
.props
.iceberg_bucket
.as_ref()
.context("Iceberg bucket is not configured")?;

iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string());
iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string());
Expand Down
1 change: 1 addition & 0 deletions src/error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
anyhow = "1"
bincode = "1"
bytes = "1"
easy-ext = "1"
Expand Down
89 changes: 89 additions & 0 deletions src/error/src/anyhow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Define a newtype wrapper around [`anyhow::Error`].
///
/// # Motivation
///
/// [`anyhow::Error`] is good enough if one just wants to make the error type
/// informative but not necessarily actionable. However, given a value of type
/// [`anyhow::Error`], it is hard to tell which module or crate it comes from,
/// which may blur the boundary between modules when passing it around, leading
/// to abuse.
///
/// # Usage
///
/// ```ignore
/// def_anyhow_newtype! {
/// /// Documentation for the newtype.
/// #[derive(..)]
/// pub MyError;
/// }
/// ```
///
/// This will define a newtype `MyError` around [`anyhow::Error`].
///
/// The newtype can be converted from any type that implements `Into<anyhow::Error>`,
/// so the developing experience is kept almost the same. To construct a new error,
/// one can still use macros like `anyhow::anyhow!` or `risingwave_common::bail!`.
///
/// Since `bail!` and `?` already imply an `into()` call, developers do not need to
/// care about the conversion from [`anyhow::Error`] to the newtype most of the time.
///
/// # Limitation
///
/// Note that the newtype does not implement [`std::error::Error`] just like
/// [`anyhow::Error`]. However, it can be dereferenced to `dyn std::error::Error`
/// to be used in places like `thiserror`'s `#[source]` attribute.
#[macro_export]
macro_rules! def_anyhow_newtype {
($(#[$attr:meta])* $vis:vis $name:ident $(;)?) => {
$(#[$attr])* $vis struct $name(::anyhow::Error);

impl std::ops::Deref for $name {
type Target = ::anyhow::Error;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::fmt::Debug for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl<E> From<E> for $name
where
E: Into<::anyhow::Error>,
{
fn from(value: E) -> Self {
Self(value.into())
}
}

impl From<$name> for Box<dyn std::error::Error + Send + Sync + 'static> {
fn from(value: $name) -> Self {
value.0.into()
}
}
};
}
1 change: 1 addition & 0 deletions src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
#![feature(register_tool)]
#![register_tool(rw)]

pub mod anyhow;
pub mod tonic;

0 comments on commit bc0fa6e

Please sign in to comment.