Skip to content

Commit

Permalink
use streaming upload instead of bulk load of aws_sdk_s3
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Apr 11, 2024
1 parent a6aa620 commit 08f8ec6
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ risingwave_common_estimate_size = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_object_store = { workspace = true }
rumqttc = { version = "0.24.0", features = ["url"] }
rust_decimal = "1"
rustls-native-certs = "0.7"
Expand Down
42 changes: 27 additions & 15 deletions src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ use std::time::{SystemTime, UNIX_EPOCH};
use aws_config;
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client as S3Client;
use aws_types::region::Region;
use bytes::Bytes;
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
use reqwest::{header, Client, RequestBuilder, StatusCode};
use risingwave_common::config::ObjectStoreConfig;
use risingwave_object_store::object::*;
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -191,6 +192,7 @@ pub struct SnowflakeS3Client {
s3_bucket: String,
s3_path: Option<String>,
s3_client: S3Client,
opendal_s3_engine: OpendalObjectStore,
}

impl SnowflakeS3Client {
Expand All @@ -202,15 +204,15 @@ impl SnowflakeS3Client {
aws_region: String,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
aws_secret_access_key,
aws_access_key_id.clone(),
aws_secret_access_key.clone(),
// we don't allow temporary credentials
None,
None,
"rw_sink_to_s3_credentials",
);

let region = RegionProviderChain::first_try(Region::new(aws_region)).or_default_provider();
let region = RegionProviderChain::first_try(Region::new(aws_region.clone())).or_default_provider();

let config = aws_config::from_env()
.credentials_provider(credentials)
Expand All @@ -221,25 +223,35 @@ impl SnowflakeS3Client {
// create the brand new s3 client used to sink files to s3
let s3_client = S3Client::new(&config);

// just use default here
let config = ObjectStoreConfig::default();

// create the s3 engine for streaming upload to the intermediate s3 bucket
// note: this will lead to a complete panic if any credential / intermediate creation
// process has error, which may not be acceptable...
// but it's hard to gracefully handle the error without modifying downstream return type(s)...
let opendal_s3_engine = OpendalObjectStore::new_s3_engine_with_credentials(
&s3_bucket,
config,
&aws_access_key_id,
&aws_secret_access_key,
&aws_region,
).unwrap();

Self {
s3_bucket,
s3_path,
s3_client,
opendal_s3_engine,
}
}

pub async fn sink_to_s3(&self, data: Bytes, file_suffix: String) -> Result<()> {
self.s3_client
.put_object()
.bucket(self.s3_bucket.clone())
.key(generate_s3_file_name(self.s3_path.clone(), file_suffix))
.body(ByteStream::from(data))
.send()
.await
.map_err(|err| {
SinkError::Snowflake(format!("failed to sink data to S3, error: {}", err))
})?;

let path = generate_s3_file_name(self.s3_path.clone(), file_suffix);
let mut uploader = self.opendal_s3_engine.streaming_upload(&path).await.
map_err(|err| SinkError::Snowflake(format!("failed to create the streaming uploader of opendal s3 engine, error: {}", err)))?;
uploader.write_bytes(data).await.map_err(|err| SinkError::Snowflake(format!("failed to write bytes when streaming uploading to s3 for snowflake sink, error: {}", err)))?;
uploader.finish().await.map_err(|err| SinkError::Snowflake(format!("failed to finish streaming upload to s3 for snowflake sink, error: {}", err)))?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct OpendalObjectStore {
pub(crate) op: Operator,
pub(crate) engine_type: EngineType,
}

#[derive(Clone)]
pub enum EngineType {
Memory,
Expand Down Expand Up @@ -216,6 +217,7 @@ impl ObjectStore for OpendalObjectStore {
pub struct OpendalStreamingUploader {
writer: Writer,
}

impl OpendalStreamingUploader {
pub async fn new(op: Operator, path: String) -> ObjectResult<Self> {
let writer = op
Expand Down
53 changes: 53 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl OpendalObjectStore {
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::S3,
Expand All @@ -78,4 +79,56 @@ impl OpendalObjectStore {

Ok(HttpClient::build(client_builder)?)
}

/// currently used by snowflake sink,
/// especially when sinking to the intermediate s3 bucket.
pub fn new_s3_engine_with_credentials(
bucket: &str,
object_store_config: ObjectStoreConfig,
aws_access_key_id: &str,
aws_secret_access_key: &str,
aws_region: &str,
) -> ObjectResult<Self> {
// Create s3 builder with credentials.
let mut builder = S3::default();

// set credentials for s3 sink
builder.bucket(bucket);
builder.access_key_id(aws_access_key_id);
builder.secret_access_key(aws_secret_access_key);
builder.region(aws_region);

// For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field.
if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
builder.endpoint(&endpoint_url);
}

if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
builder.enable_virtual_host_style();
}

let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);

let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.1)
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::S3,
})
}
}

0 comments on commit 08f8ec6

Please sign in to comment.