Skip to content

Commit

Permalink
feat(sink): support deafult aws credentials for deltalake (#19557)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Dec 20, 2024
1 parent adb8651 commit c022a36
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 49 deletions.
17 changes: 13 additions & 4 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,27 @@ use risingwave_common::util::env_var::env_var_is_true;
/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
pub struct AwsAuthProps {
#[serde(rename = "aws.region", alias = "region")]
#[serde(rename = "aws.region", alias = "region", alias = "s3.region")]
pub region: Option<String>,

#[serde(
rename = "aws.endpoint_url",
alias = "endpoint_url",
alias = "endpoint"
alias = "endpoint",
alias = "s3.endpoint"
)]
pub endpoint: Option<String>,
#[serde(rename = "aws.credentials.access_key_id", alias = "access_key")]
#[serde(
rename = "aws.credentials.access_key_id",
alias = "access_key",
alias = "s3.access.key"
)]
pub access_key: Option<String>,
#[serde(rename = "aws.credentials.secret_access_key", alias = "secret_key")]
#[serde(
rename = "aws.credentials.secret_access_key",
alias = "secret_key",
alias = "s3.secret.key"
)]
pub secret_key: Option<String>,
#[serde(rename = "aws.credentials.session_token", alias = "session_token")]
pub session_token: Option<String>,
Expand Down
90 changes: 51 additions & 39 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,20 @@ use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};
use crate::connector_common::AwsAuthProps;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";

#[serde_as]
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct DeltaLakeCommon {
#[serde(rename = "s3.access.key")]
pub s3_access_key: Option<String>,
#[serde(rename = "s3.secret.key")]
pub s3_secret_key: Option<String>,
#[serde(rename = "location")]
pub location: String,
#[serde(rename = "s3.region")]
pub s3_region: Option<String>,
#[serde(rename = "s3.endpoint")]
pub s3_endpoint: Option<String>,
#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,

#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
/// Commit every n(>0) checkpoints, default is 10.
Expand All @@ -80,35 +76,7 @@ impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
let table = match Self::get_table_url(&self.location)? {
DeltaTableUrl::S3(s3_path) => {
let mut storage_options = HashMap::new();
storage_options.insert(
AWS_ACCESS_KEY_ID.to_owned(),
self.s3_access_key.clone().ok_or_else(|| {
SinkError::Config(anyhow!("s3.access.key is required with aws s3"))
})?,
);
storage_options.insert(
AWS_SECRET_ACCESS_KEY.to_owned(),
self.s3_secret_key.clone().ok_or_else(|| {
SinkError::Config(anyhow!("s3.secret.key is required with aws s3"))
})?,
);
if self.s3_endpoint.is_none() && self.s3_region.is_none() {
return Err(SinkError::Config(anyhow!(
"s3.endpoint and s3.region need to be filled with at least one"
)));
}
storage_options.insert(
AWS_REGION.to_owned(),
self.s3_region
.clone()
.unwrap_or_else(|| DEFAULT_REGION.to_owned()),
);
if let Some(s3_endpoint) = &self.s3_endpoint {
storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.clone());
}
storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
let storage_options = self.build_delta_lake_config_for_aws().await?;
deltalake::aws::register_handlers(None);
deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
}
Expand Down Expand Up @@ -144,6 +112,50 @@ impl DeltaLakeCommon {
)))
}
}

async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
let mut storage_options = HashMap::new();
storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
let sdk_config = self.aws_auth_props.build_config().await?;
let credentials = sdk_config
.credentials_provider()
.ok_or_else(|| {
SinkError::Config(anyhow!(
"s3.access.key and s3.secret.key is required with aws s3"
))
})?
.as_ref()
.provide_credentials()
.await
.map_err(|e| SinkError::Config(e.into()))?;
let region = sdk_config.region();
let endpoint = sdk_config.endpoint_url();
storage_options.insert(
AWS_ACCESS_KEY_ID.to_owned(),
credentials.access_key_id().to_owned(),
);
storage_options.insert(
AWS_SECRET_ACCESS_KEY.to_owned(),
credentials.secret_access_key().to_owned(),
);
if endpoint.is_none() && region.is_none() {
return Err(SinkError::Config(anyhow!(
"s3.endpoint and s3.region need to be filled with at least one"
)));
}
storage_options.insert(
AWS_REGION.to_owned(),
region
.map(|r| r.as_ref().to_owned())
.clone()
.unwrap_or_else(|| DEFAULT_REGION.to_owned()),
);
if let Some(s3_endpoint) = endpoint {
storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
}
Ok(storage_options)
}
}

enum DeltaTableUrl {
Expand Down Expand Up @@ -272,7 +284,7 @@ fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -
_ => {
return Err(SinkError::DeltaLake(anyhow!(
"deltalake cannot support type {:?}",
rw_data_type.to_string()
rw_data_type.to_owned()
)))
}
};
Expand Down
63 changes: 57 additions & 6 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,26 @@ BigQueryConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -133,21 +137,56 @@ ClickHouseConfig:
required: true
DeltaLakeConfig:
fields:
- name: s3.access.key
- name: location
field_type: String
required: true
- name: aws.region
field_type: String
required: false
- name: s3.secret.key
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
- name: location
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: true
- name: s3.region
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
- name: s3.endpoint
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
alias:
- session_token
- name: aws.credentials.role.arn
field_type: String
comments: IAM role
required: false
alias:
- arn
- name: aws.credentials.role.external_id
field_type: String
comments: external ID in IAM role trust policy
required: false
alias:
- external_id
- name: aws.profile
field_type: String
required: false
alias:
- profile
- name: gcs.service.account
field_type: String
required: false
Expand Down Expand Up @@ -198,22 +237,26 @@ DynamoDbConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -673,22 +716,26 @@ KafkaConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -950,22 +997,26 @@ PulsarConfig:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down
8 changes: 8 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -378,22 +378,26 @@ KafkaProperties:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down Expand Up @@ -998,22 +1002,26 @@ PulsarProperties:
required: false
alias:
- region
- s3.region
- name: aws.endpoint_url
field_type: String
required: false
alias:
- endpoint_url
- endpoint
- s3.endpoint
- name: aws.credentials.access_key_id
field_type: String
required: false
alias:
- access_key
- s3.access.key
- name: aws.credentials.secret_access_key
field_type: String
required: false
alias:
- secret_key
- s3.secret.key
- name: aws.credentials.session_token
field_type: String
required: false
Expand Down

0 comments on commit c022a36

Please sign in to comment.