Skip to content

Commit

Permalink
fix clippy and add docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Feb 13, 2024
1 parent 5ddd0df commit 8a722eb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
15 changes: 11 additions & 4 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ pub struct PulsarOauthCommon {
pub scope: Option<String>,
}

fn create_credential_temp_file(credentials: &[u8]) -> std::io::Result<NamedTempFile> {
let mut f = NamedTempFile::new()?;
f.write_all(credentials)?;
f.as_file().sync_all()?;
Ok(f)
}

impl PulsarCommon {
pub(crate) async fn build_client(
&self,
Expand All @@ -391,10 +398,10 @@ impl PulsarCommon {
match url.scheme() {
"s3" => {
let credentials = load_file_descriptor_from_s3(&url, aws_auth_props).await?;
let mut f = NamedTempFile::new()?;
f.write_all(&credentials)?;
f.as_file().sync_all()?;
temp_file = Some(f);
temp_file = Some(
create_credential_temp_file(&credentials)
.context("failed to create temp file for pulsar credentials")?,
);
}
"file" => {}
_ => {
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use crate::sink::SinkError;
def_anyhow_newtype! {
pub ConnectorError,

// Common errors
std::io::Error => "",

// Fine-grained connector errors
AccessError => "",
WireFormatError => "",
ConcurrentRequestError => "",
Expand All @@ -35,11 +37,13 @@ def_anyhow_newtype! {

// TODO(error-handling): Remove implicit contexts below and specify ad-hoc context for each conversion.

// Parsing errors
url::ParseError => "failed to parse url",
serde_json::Error => "failed to parse json",
csv::Error => "failed to parse csv",

opendal::Error => "",
// Connector errors
opendal::Error => "", // believed to be self-explanatory

mysql_async::Error => "MySQL error",
tokio_postgres::Error => "Postgres error",
Expand Down
4 changes: 1 addition & 3 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ where
)?;

if let Some(error) = validate_source_response.error {
return Err(anyhow!(error.error_message)
.context("source cannot pass validation")
.into());
return Err(anyhow!(error.error_message).context("source cannot pass validation"));
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use anyhow::Context;
use risingwave_common::catalog::TableId;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_connector::dispatch_source_prop;
use risingwave_connector::error::{ConnectorErrorContext, ConnectorResult};
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::{
ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties,
SplitEnumerator, SplitId, SplitImpl, SplitMetaData,
Expand Down

0 comments on commit 8a722eb

Please sign in to comment.