Skip to content

Commit

Permalink
fix fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Apr 11, 2024
1 parent 08f8ec6 commit 71922d3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ reqwest = { version = "0.12.2", features = ["json", "stream"] }
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_object_store = { workspace = true }
rumqttc = { version = "0.24.0", features = ["url"] }
rust_decimal = "1"
rustls-native-certs = "0.7"
Expand Down
25 changes: 20 additions & 5 deletions src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ impl SnowflakeS3Client {
"rw_sink_to_s3_credentials",
);

let region = RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider();
let region =
RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider();

let config = aws_config::from_env()
.credentials_provider(credentials)
Expand All @@ -236,7 +237,8 @@ impl SnowflakeS3Client {
&aws_access_key_id,
&aws_secret_access_key,
&aws_region,
).unwrap();
)
.unwrap();

Self {
s3_bucket,
Expand All @@ -248,10 +250,23 @@ impl SnowflakeS3Client {

pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> {
let path = generate_s3_file_name(self.s3_path.clone(), file_suffix);
let mut uploader = self.opendal_s3_engine.streaming_upload(&path).await.
map_err(|err| SinkError::Snowflake(format!("failed to create the streaming uploader of opendal s3 engine, error: {}", err)))?;
let mut uploader = self
.opendal_s3_engine
.streaming_upload(&path)
.await
.map_err(|err| {
SinkError::Snowflake(format!(
"failed to create the streaming uploader of opendal s3 engine, error: {}",
err
))
})?;
uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?;
uploader.finish().await.map_err(|err| SinkError::Snowflake(format!("failed to finish streaming upload to s3 for snowflake sink, error: {}", err)))?;
uploader.finish().await.map_err(|err| {
SinkError::Snowflake(format!(
"failed to finish streaming upload to s3 for snowflake sink, error: {}",
err
))
})?;
Ok(())
}
}

0 comments on commit 71922d3

Please sign in to comment.