Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): simplify and clean-up unused variants of ConnectorError #15031

Merged
merged 5 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,22 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum ConnectorError {
#[error("Parse error: {0}")]
Parse(&'static str),

#[error("Invalid parameter {name}: {reason}")]
InvalidParam { name: &'static str, reason: String },

#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),

#[error("Config error: {0}")]
Config(
#[source]
#[error("MySQL error: {0}")]
MySql(
#[from]
#[backtrace]
anyhow::Error,
mysql_async::Error,
),

#[error("Connection error: {0}")]
Connection(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyhow::Error is good enough if one just wants to make the error type informative but not necessarily actionable. However, given a value of type anyhow::Error, it is hard to tell which module or crate it comes from, which may blur the boundary between modules when passing it around, leading to abuse.

I think the new ConnectorError on the right side will lose the context of specific connector, e.g. MySQL error. So it is just a new type of the anyhow::Error, and doesn't embed information of specific crate/module.

Without the context information, we don't know which module throws the error and must look into the stacktrace to find out. For example, previously we can tell the IO error comes from mysql #14847.

So I -1 for current implementation, could you embed the crate/module info into the macro?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically for #14847, I believe the retrying should be done before throwing the mysql_async::Error to upper layer, which is, before turning it into a ConnectorError. This is because performing ad-hoc matching for MySQL in the general code path of handling type-erased ConnectorError appears to be an abstraction leak. In this case, the internal structure of ConnectorError is not relevant.

However, if we're intend to do that on ConnectorError, anyhow still allows developer to downcast to a concrete type, just like trait objects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a principle that errors should be either informative or actionable. In contrast of implementing the retry logic (actionable), I suppose the more reasonable part is that the refactor will lose some context in the error message (informative) and make it vaguer, as discussed in https://github.com/risingwavelabs/risingwave/pull/15031/files#r1479489349.

Specifically, the message will become:

- Connector error: MySQL error: Input/output error: Input/output error: can't parse: buf doesn't have enough data
+ Connector error: Input/output error: Input/output error: can't parse: buf doesn't have enough data

I admit that this undoubtedly make it less informative and harder to identify the root cause of external services at a glance. This could be mainly because the error messages from 3rd-party crates are not managed and reviewed by us. As a result, we are less familiar with them and could confuse them with others.


I'm okay to keep the original enum implementation. As pointed out by @xxchan, a prefix of MySQL error: is not as good as a manually-specified context message like Failed to read the offset from MySQL: , but could be still better than no context. We could improve that in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am most concerning the losing context problem which would increase the burden of troubleshooting. Ideally, if we can extract 3rd party crate name into the def_anyhow_newtype macro, then we can abandon the original enum implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the discussion with @xxchan, I've come up with another style of anyhow wrapper which will force the developers to provide contexts or detailed messages when upcasting the error. Will explore it in the next PRs.


#[error("Postgres error: {0}")]
Postgres(#[from] tokio_postgres::Error),

#[error("Pulsar error: {0}")]
Pulsar(
#[source]
#[backtrace]
anyhow::Error,
),

#[error(transparent)]
Internal(
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
),
}

pub type ConnectorResult<T> = Result<T, ConnectorError>;
5 changes: 2 additions & 3 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use futures_async_stream::try_stream;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName,
};

#[derive(Debug)]
Expand Down
25 changes: 6 additions & 19 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod postgres;

use std::collections::HashMap;

use anyhow::{anyhow, Context};
use anyhow::Context;
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
Expand All @@ -32,13 +32,11 @@ use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use serde_derive::{Deserialize, Serialize};

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::mysql_row_to_owned_row;
use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};

pub type ConnectorResult<T> = std::result::Result<T, ConnectorError>;

#[derive(Debug)]
pub enum CdcTableType {
Undefined,
Expand Down Expand Up @@ -77,10 +75,7 @@ impl CdcTableType {
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(with_properties, schema).await?,
)),
_ => bail!(ConnectorError::Config(anyhow!(
"invalid external table type: {:?}",
*self
))),
_ => bail!("invalid external table type: {:?}", *self),
}
}
}
Expand Down Expand Up @@ -405,19 +400,11 @@ impl MySqlExternalTableReader {
DataType::Date => Value::from(value.into_date().0),
DataType::Time => Value::from(value.into_time().0),
DataType::Timestamp => Value::from(value.into_timestamp().0),
_ => {
return Err(ConnectorError::Internal(anyhow!(
"unsupported primary key data type: {}",
ty
)))
}
_ => bail!("unsupported primary key data type: {}", ty),
};
Ok((pk.clone(), val))
ConnectorResult::Ok((pk.clone(), val))
} else {
Err(ConnectorError::Internal(anyhow!(
"primary key {} cannot be null",
pk
)))
bail!("primary key {} cannot be null", pk);
}
})
.try_collect()?;
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::NoTls;

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
use anyhow::Context;
use arrow_array::{Int32Array, Int64Array, RecordBatch};
use async_trait::async_trait;
use futures::StreamExt;
Expand All @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::ROWID_PREFIX;
use risingwave_common::{bail, ensure};

use crate::error::ConnectorError;
use crate::parser::ParserConfig;
use crate::source::pulsar::split::PulsarSplit;
use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties};
Expand Down Expand Up @@ -398,10 +397,11 @@ impl PulsarIcebergReader {
fn build_iceberg_configs(&self) -> anyhow::Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let bucket =
self.props.iceberg_bucket.as_ref().ok_or_else(|| {
ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured"))
})?;
let bucket = self
.props
.iceberg_bucket
.as_ref()
.context("Iceberg bucket is not configured")?;

iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string());
iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string());
Expand Down
Loading