diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index 76c25db784595..bec1ca3344121 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -13,14 +13,47 @@ // limitations under the License. use risingwave_common::error::v2::def_anyhow_newtype; +use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::RpcError; +use crate::parser::AccessError; +use crate::schema::schema_registry::{ConcurrentRequestError, WireFormatError}; +use crate::schema::InvalidOptionError; +use crate::sink::SinkError; + def_anyhow_newtype! { pub ConnectorError, + std::io::Error => "", + + AccessError => "", + WireFormatError => "", + ConcurrentRequestError => "", + InvalidOptionError => "", + SinkError => "", + PbFieldNotFound => "", + // TODO(error-handling): Remove implicit contexts below and specify ad-hoc context for each conversion. + + url::ParseError => "failed to parse url", + serde_json::Error => "failed to parse json", + csv::Error => "failed to parse csv", + + opendal::Error => "", + mysql_async::Error => "MySQL error", tokio_postgres::Error => "Postgres error", + apache_avro::Error => "Avro error", + rdkafka::error::KafkaError => "Kafka error", + pulsar::Error => "Pulsar error", + async_nats::jetstream::consumer::StreamError => "Nats error", + async_nats::jetstream::consumer::pull::MessagesError => "Nats error", + async_nats::jetstream::context::CreateStreamError => "Nats error", + async_nats::jetstream::stream::ConsumerError => "Nats error", + icelake::Error => "Iceberg error", + redis::RedisError => "Redis error", + arrow_schema::ArrowError => "Arrow error", + google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error", } pub type ConnectorResult = std::result::Result;