From 69b9078944af2249e13c411dc26a3d6f7e1a8516 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 13 Feb 2024 15:52:50 +0800 Subject: [PATCH] fix clippy and add docs Signed-off-by: Bugen Zhao --- src/connector/src/common.rs | 15 +++++++++++---- src/connector/src/error.rs | 6 +++++- src/connector/src/source/cdc/enumerator/mod.rs | 4 +--- src/meta/src/stream/source_manager.rs | 2 +- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index b2f400c344cfa..0247a413b5190 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -378,6 +378,13 @@ pub struct PulsarOauthCommon { pub scope: Option, } +fn create_credential_temp_file(credentials: &[u8]) -> std::io::Result { + let mut f = NamedTempFile::new()?; + f.write_all(credentials)?; + f.as_file().sync_all()?; + Ok(f) +} + impl PulsarCommon { pub(crate) async fn build_client( &self, @@ -391,10 +398,10 @@ impl PulsarCommon { match url.scheme() { "s3" => { let credentials = load_file_descriptor_from_s3(&url, aws_auth_props).await?; - let mut f = NamedTempFile::new()?; - f.write_all(&credentials)?; - f.as_file().sync_all()?; - temp_file = Some(f); + temp_file = Some( + create_credential_temp_file(&credentials) + .context("failed to create temp file for pulsar credentials")?, + ); } "file" => {} _ => { diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index bec1ca3344121..870c06e1b8df4 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -24,8 +24,10 @@ use crate::sink::SinkError; def_anyhow_newtype! { pub ConnectorError, + // Common errors std::io::Error => "", + // Fine-grained connector errors AccessError => "", WireFormatError => "", ConcurrentRequestError => "", @@ -35,11 +37,13 @@ def_anyhow_newtype! { // TODO(error-handling): Remove implicit contexts below and specify ad-hoc context for each conversion. + // Parsing errors url::ParseError => "failed to parse url", serde_json::Error => "failed to parse json", csv::Error => "failed to parse csv", - opendal::Error => "", + // Connector errors + opendal::Error => "", // believed to be self-explanatory mysql_async::Error => "MySQL error", tokio_postgres::Error => "Postgres error", diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index fec4ea734b558..b5ac4826921bc 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -102,9 +102,7 @@ where )?; if let Some(error) = validate_source_response.error { - return Err(anyhow!(error.error_message) - .context("source cannot pass validation") - .into()); + return Err(anyhow!(error.error_message).context("source cannot pass validation")); } Ok(()) diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 1866a9d247ca6..4e81ee38f7ca3 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -24,7 +24,7 @@ use anyhow::Context; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::dispatch_source_prop; -use risingwave_connector::error::{ConnectorErrorContext, ConnectorResult}; +use risingwave_connector::error::ConnectorResult; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData,