From 71922d3fa59dc6779c56fa4ccb4e3f777fcc5136 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Thu, 11 Apr 2024 17:44:51 -0400 Subject: [PATCH] fix fmt --- src/connector/Cargo.toml | 2 +- src/connector/src/sink/snowflake_connector.rs | 25 +++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index bb97fe608127c..5f759ff1612cf 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -108,9 +108,9 @@ reqwest = { version = "0.12.2", features = ["json", "stream"] } risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_jni_core = { workspace = true } +risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } -risingwave_object_store = { workspace = true } rumqttc = { version = "0.24.0", features = ["url"] } rust_decimal = "1" rustls-native-certs = "0.7" diff --git a/src/connector/src/sink/snowflake_connector.rs b/src/connector/src/sink/snowflake_connector.rs index 6ea732b846e08..3466633757c81 100644 --- a/src/connector/src/sink/snowflake_connector.rs +++ b/src/connector/src/sink/snowflake_connector.rs @@ -212,7 +212,8 @@ impl SnowflakeS3Client { "rw_sink_to_s3_credentials", ); - let region = RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider(); + let region = + RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider(); let config = aws_config::from_env() .credentials_provider(credentials) @@ -236,7 +237,8 @@ impl SnowflakeS3Client { &aws_access_key_id, &aws_secret_access_key, &aws_region, - ).unwrap(); + ) + .unwrap(); Self { s3_bucket, @@ -248,10 +250,23 @@ impl SnowflakeS3Client { pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> { let path = generate_s3_file_name(self.s3_path.clone(), file_suffix); - let mut uploader = self.opendal_s3_engine.streaming_upload(&path).await. - map_err(|err| SinkError::Snowflake(format!("failed to create the streaming uploader of opendal s3 engine, error: {}", err)))?; + let mut uploader = self + .opendal_s3_engine + .streaming_upload(&path) + .await + .map_err(|err| { + SinkError::Snowflake(format!( + "failed to create the streaming uploader of opendal s3 engine, error: {}", + err + )) + })?; uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?; - uploader.finish().await.map_err(|err| SinkError::Snowflake(format!("failed to finish streaming upload to s3 for snowflake sink, error: {}", err)))?; + uploader.finish().await.map_err(|err| { + SinkError::Snowflake(format!( + "failed to finish streaming upload to s3 for snowflake sink, error: {}", + err + )) + })?; Ok(()) } }