diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 3836d14c3108c..a297c223113d9 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -167,7 +167,7 @@ macro_rules! impl_split { $( impl TryFrom for $split { - type Error = crate::error::ConnectorError; + type Error = $crate::error::ConnectorError; fn try_from(split: SplitImpl) -> std::result::Result { match split { diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 5b5f3aeee0c1a..3879d817d1a84 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -19,7 +19,6 @@ use std::future::{poll_fn, Future}; use std::sync::Arc; use std::task::Poll; -use anyhow::anyhow; use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 7fa309853c6c0..3ad9d7972abf2 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -30,7 +30,7 @@ use risingwave_pb::connector_service::{ use thiserror_ext::AsReport; use tokio::sync::mpsc; -use crate::error::{ConnectorError, ConnectorResult as Result}; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; use crate::source::cdc::{CdcProperties, CdcSourceType, CdcSourceTypeTrait, DebeziumCdcSplit}; @@ -68,7 +68,7 @@ impl SplitReader for CdcSplitReader { parser_config: ParserConfig, source_ctx: SourceContextRef, _columns: Option>, - ) -> Result { + ) -> ConnectorResult { assert_eq!(splits.len(), 1); let split = splits.into_iter().next().unwrap(); let split_id = split.id(); diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 450a50fe92cde..30165f1939e0f 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -14,7 +14,7 @@ use std::marker::PhantomData; -use anyhow::{anyhow, Context}; +use anyhow::Context; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/datagen/split.rs b/src/connector/src/source/datagen/split.rs index 43c6dd3ddaa9c..6d51cfa7d47ae 100644 --- a/src/connector/src/source/datagen/split.rs +++ b/src/connector/src/source/datagen/split.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/filesystem/file_common.rs b/src/connector/src/source/filesystem/file_common.rs index 2c34450d19f8b..ccff7315491ba 100644 --- a/src/connector/src/source/filesystem/file_common.rs +++ b/src/connector/src/source/filesystem/file_common.rs @@ -15,7 +15,6 @@ use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; -use anyhow::anyhow; use aws_sdk_s3::types::Object; use risingwave_common::types::{JsonbVal, Timestamptz}; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/filesystem/nd_streaming.rs b/src/connector/src/source/filesystem/nd_streaming.rs index b827ac05006e7..b13077d341fed 100644 --- a/src/connector/src/source/filesystem/nd_streaming.rs +++ b/src/connector/src/source/filesystem/nd_streaming.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use bytes::BytesMut; use futures::io::Cursor; use futures::AsyncBufReadExt; diff --git a/src/connector/src/source/google_pubsub/split.rs b/src/connector/src/source/google_pubsub/split.rs index 8ba3f29bbeafc..f150f7f08038b 100644 --- a/src/connector/src/source/google_pubsub/split.rs +++ b/src/connector/src/source/google_pubsub/split.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/kafka/split.rs b/src/connector/src/source/kafka/split.rs index 5056a9fbdf14b..a98707eb4ab3a 100644 --- a/src/connector/src/source/kafka/split.rs +++ b/src/connector/src/source/kafka/split.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/kinesis/split.rs b/src/connector/src/source/kinesis/split.rs index 5e0d0eb5d91c8..1c7bea61f8744 100644 --- a/src/connector/src/source/kinesis/split.rs +++ b/src/connector/src/source/kinesis/split.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index d101a56aaae62..5dd6569f83b0b 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; diff --git a/src/connector/src/source/nexmark/split.rs b/src/connector/src/source/nexmark/split.rs index 691df1470d767..5150f1b6a1e1d 100644 --- a/src/connector/src/source/nexmark/split.rs +++ b/src/connector/src/source/nexmark/split.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index a733028224616..dddcf927d0c3f 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -47,7 +47,7 @@ impl SplitEnumerator for PulsarSplitEnumerator { async fn new( properties: PulsarProperties, _context: SourceEnumeratorContextRef, - ) -> Result { + ) -> ConnectorResult { let pulsar = properties .common .build_client(&properties.oauth, &properties.aws_auth_props) diff --git a/src/connector/src/source/pulsar/split.rs b/src/connector/src/source/pulsar/split.rs index a40471570d80d..bf9b63d99d74f 100644 --- a/src/connector/src/source/pulsar/split.rs +++ b/src/connector/src/source/pulsar/split.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index d4b82abf18f28..6d224593d7a2e 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::sync::{Arc, OnceLock}; -use anyhow::anyhow; use async_trait::async_trait; use parking_lot::Mutex; use risingwave_common::bail; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 2ca8e9b98be87..db56ef18a0912 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -662,7 +662,7 @@ impl CatalogController { let ret = src_manager.register_source(&pb_source).await; if let Err(e) = ret { txn.rollback().await?; - return Err(e.into()); + return Err(e); } } txn.commit().await?; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index aaac2ef3ba370..d74ba66731170 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -432,7 +432,7 @@ impl DdlController { mgr.catalog_manager .cancel_create_source_procedure(&source) .await?; - return Err(e.into()); + return Err(e); } mgr.catalog_manager