diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 20100c06e20a2..721cfa2f241c7 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -48,7 +48,7 @@ use self::unified::AccessImpl; use self::upsert_parser::UpsertParser; use self::util::get_kafka_topic; use crate::common::AwsAuthProps; -use crate::error::ConnectorResult; +use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::maxwell::MaxwellParser; use crate::parser::util::{ extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta, @@ -559,7 +559,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { } } -#[try_stream(ok = Vec, error = anyhow::Error)] +#[try_stream(ok = Vec, error = ConnectorError)] async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) { #[for_await] for batch in stream { diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index e274f639f15b2..899828322c810 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; +use crate::error::ConnectorResult; use crate::parser::ParserConfig; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, @@ -71,7 +72,7 @@ impl SplitMetaData for IcebergSplit { unimplemented!() } - fn restore_from_json(_value: JsonbVal) -> anyhow::Result { + fn restore_from_json(_value: JsonbVal) -> ConnectorResult { unimplemented!() } @@ -79,7 +80,7 @@ impl SplitMetaData for IcebergSplit { unimplemented!() } - fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> { + fn update_with_offset(&mut self, _start_offset: String) -> ConnectorResult<()> { unimplemented!() } } @@ -95,11 +96,11 @@ impl SplitEnumerator for IcebergSplitEnumerator { async fn new( _properties: Self::Properties, _context: SourceEnumeratorContextRef, - ) -> anyhow::Result { + ) -> ConnectorResult { Ok(Self {}) } - async fn list_splits(&mut self) -> anyhow::Result> { + async fn list_splits(&mut self) -> ConnectorResult> { Ok(vec![]) } } @@ -118,7 +119,7 @@ impl SplitReader for IcebergFileReader { _parser_config: ParserConfig, _source_ctx: SourceContextRef, _columns: Option>, - ) -> anyhow::Result { + ) -> ConnectorResult { unimplemented!() }