Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support delta sink with gcs #16182

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" }
# compile time and binary size.
deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = [
"s3-no-concurrent-write",
"gcs",
] }
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
parquet = "50"
Expand Down
17 changes: 16 additions & 1 deletion src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::sink::writer::SinkWriterExt;

pub const DELTALAKE_SINK: &str = "deltalake";
pub const DEFAULT_REGION: &str = "us-east-1";
pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";

#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
pub struct DeltaLakeCommon {
Expand All @@ -61,6 +62,8 @@ pub struct DeltaLakeCommon {
pub s3_region: Option<String>,
#[serde(rename = "s3.endpoint")]
pub s3_endpoint: Option<String>,
#[serde(rename = "gcs.service.account")]
pub gcs_service_account: Option<String>,
}
impl DeltaLakeCommon {
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
Expand Down Expand Up @@ -98,18 +101,29 @@ impl DeltaLakeCommon {
deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
}
DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
DeltaTableUrl::Gcs(gcs_path) => {
let mut storage_options = HashMap::new();
storage_options.insert(
GCS_SERVICE_ACCOUNT.to_string(),
self.gcs_service_account.clone().unwrap(),
Copy link
Member

@fuyufjh fuyufjh Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the gcs_service_account is not specified, the CN will panic?

If so, sounds unacceptable to me. Please do validation when create sink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix is inside create_deltalake_client(), you are sure that it will be called during create sink, right?

Copy link
Contributor Author

@xxhZs xxhZs Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It will call it on create sink and sink write, create sinkis called first, so when the user just makes a write error, it will report the error to the psql client, not recovery

);
deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
.await?
}
};
Ok(table)
}

fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
if path.starts_with("s3://") || path.starts_with("s3a://") {
Ok(DeltaTableUrl::S3(path.to_string()))
} else if path.starts_with("gs://") {
Ok(DeltaTableUrl::Gcs(path.to_string()))
} else if let Some(path) = path.strip_prefix("file://") {
Ok(DeltaTableUrl::Local(path.to_string()))
} else {
Err(SinkError::DeltaLake(anyhow!(
"path need to start with 's3://','s3a://'(s3) or file://(local)"
"path need to start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
)))
}
}
Expand All @@ -118,6 +132,7 @@ impl DeltaLakeCommon {
enum DeltaTableUrl {
S3(String),
Local(String),
Gcs(String),
}

#[serde_as]
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ DeltaLakeConfig:
- name: s3.endpoint
field_type: String
required: false
- name: gcs.service.account
field_type: String
required: false
- name: r#type
field_type: String
required: true
Expand Down
Loading