From e544337a477234656f7c580ddb20f40a5c44ecf8 Mon Sep 17 00:00:00 2001 From: Dylan Date: Thu, 21 Nov 2024 17:26:27 +0800 Subject: [PATCH] feat(iceberg): support rest authentication (#19406) --- .../src/connector_common/iceberg/mod.rs | 125 +++++++++++++----- src/connector/src/sink/iceberg/mod.rs | 18 ++- src/connector/with_options_sink.yaml | 24 ++++ src/connector/with_options_source.yaml | 24 ++++ 4 files changed, 158 insertions(+), 33 deletions(-) diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index d10a9eefb68aa..ac0000128a0a6 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -62,6 +62,22 @@ pub struct IcebergCommon { /// Full name of table, must include schema name. #[serde(rename = "table.name")] pub table_name: String, + /// Credential for accessing iceberg catalog, only applicable in rest catalog. + /// A credential to exchange for a token in the OAuth2 client credentials flow. + #[serde(rename = "catalog.credential")] + pub credential: Option, + /// token for accessing iceberg catalog, only applicable in rest catalog. + /// A Bearer token which will be used for interaction with the server. + #[serde(rename = "catalog.token")] + pub token: Option, + /// `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog. + /// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server. + #[serde(rename = "catalog.oauth2-server-uri")] + pub oauth2_server_uri: Option, + /// scope for accessing iceberg catalog, only applicable in rest catalog. + /// Additional scope for OAuth2. + #[serde(rename = "catalog.scope")] + pub scope: Option, #[serde( rename = "s3.path.style.access", @@ -145,20 +161,32 @@ impl IcebergCommon { match &self.warehouse_path { Some(warehouse_path) => { let (bucket, _) = { - let url = Url::parse(warehouse_path).with_context(|| { - format!("Invalid warehouse path: {}", warehouse_path) - })?; - let bucket = url - .host_str() - .with_context(|| { - format!("Invalid s3 path: {}, bucket is missing", warehouse_path) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) + let url = Url::parse(warehouse_path); + if url.is_err() && catalog_type == "rest" { + // If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog + // so we allow it to pass here. + (None, None) + } else { + let url = url.with_context(|| { + format!("Invalid warehouse path: {}", warehouse_path) + })?; + let bucket = url + .host_str() + .with_context(|| { + format!( + "Invalid s3 path: {}, bucket is missing", + warehouse_path + ) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (Some(bucket), Some(root)) + } }; - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + if let Some(bucket) = bucket { + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + } } None => { if catalog_type != "rest" { @@ -219,29 +247,48 @@ impl IcebergCommon { path_style_access.to_string(), ); } - if matches!(self.catalog_type.as_deref(), Some("glue")) { - java_catalog_configs.insert( - "client.credentials-provider".to_string(), - "com.risingwave.connector.catalog.GlueCredentialProvider".to_string(), - ); - // Use S3 ak/sk and region as glue ak/sk and region by default. - // TODO: use different ak/sk and region for s3 and glue. - java_catalog_configs.insert( - "client.credentials-provider.glue.access-key-id".to_string(), - self.access_key.clone().to_string(), - ); - java_catalog_configs.insert( - "client.credentials-provider.glue.secret-access-key".to_string(), - self.secret_key.clone().to_string(), - ); - if let Some(region) = &self.region { - java_catalog_configs - .insert("client.region".to_string(), region.clone().to_string()); + + match self.catalog_type.as_deref() { + Some("rest") => { + if let Some(credential) = &self.credential { + java_catalog_configs.insert("credential".to_string(), credential.clone()); + } + if let Some(token) = &self.token { + java_catalog_configs.insert("token".to_string(), token.clone()); + } + if let Some(oauth2_server_uri) = &self.oauth2_server_uri { + java_catalog_configs + .insert("oauth2-server-uri".to_string(), oauth2_server_uri.clone()); + } + if let Some(scope) = &self.scope { + java_catalog_configs.insert("scope".to_string(), scope.clone()); + } + } + Some("glue") => { + java_catalog_configs.insert( + "client.credentials-provider".to_string(), + "com.risingwave.connector.catalog.GlueCredentialProvider".to_string(), + ); + // Use S3 ak/sk and region as glue ak/sk and region by default. + // TODO: use different ak/sk and region for s3 and glue. + java_catalog_configs.insert( + "client.credentials-provider.glue.access-key-id".to_string(), + self.access_key.clone().to_string(), + ); java_catalog_configs.insert( - "glue.endpoint".to_string(), - format!("https://glue.{}.amazonaws.com", region), + "client.credentials-provider.glue.secret-access-key".to_string(), + self.secret_key.clone().to_string(), ); + if let Some(region) = &self.region { + java_catalog_configs + .insert("client.region".to_string(), region.clone().to_string()); + java_catalog_configs.insert( + "glue.endpoint".to_string(), + format!("https://glue.{}.amazonaws.com", region), + ); + } } + _ => {} } } @@ -492,6 +539,20 @@ mod v2 { S3_SECRET_ACCESS_KEY.to_string(), self.secret_key.clone().to_string(), ); + if let Some(credential) = &self.credential { + iceberg_configs.insert("credential".to_string(), credential.clone()); + } + if let Some(token) = &self.token { + iceberg_configs.insert("token".to_string(), token.clone()); + } + if let Some(oauth2_server_uri) = &self.oauth2_server_uri { + iceberg_configs + .insert("oauth2-server-uri".to_string(), oauth2_server_uri.clone()); + } + if let Some(scope) = &self.scope { + iceberg_configs.insert("scope".to_string(), scope.clone()); + } + let config_builder = iceberg_catalog_rest::RestCatalogConfig::builder() .uri(self.catalog_uri.clone().with_context(|| { "`catalog.uri` must be set in rest catalog".to_string() diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 0c878ae1ba6d6..54699b9599fec 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -49,6 +49,7 @@ use risingwave_pb::connector_service::SinkMetadata; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use thiserror_ext::AsReport; +use url::Url; use with_options::WithOptions; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; @@ -283,7 +284,18 @@ impl IcebergSink { names.push(self.config.common.table_name.to_string()); match &self.config.common.warehouse_path { Some(warehouse_path) => { - if warehouse_path.ends_with('/') { + let url = Url::parse(warehouse_path); + if url.is_err() { + // For rest catalog, the warehouse_path could be a warehouse name. + // In this case, we should specify the location when creating a table. + if self.config.common.catalog_type() == "rest" + || self.config.common.catalog_type() == "rest_rust" + { + None + } else { + bail!(format!("Invalid warehouse path: {}", warehouse_path)) + } + } else if warehouse_path.ends_with('/') { Some(format!("{}{}", warehouse_path, names.join("/"))) } else { Some(format!("{}/{}", warehouse_path, names.join("/"))) @@ -1017,6 +1029,10 @@ mod test { database_name: Some("demo_db".to_string()), table_name: "demo_table".to_string(), path_style_access: Some(true), + credential: None, + oauth2_server_uri: None, + scope: None, + token: None, }, r#type: "upsert".to_string(), force_append_only: false, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 31579dfd70325..88f2e64cce2ee 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -412,6 +412,30 @@ IcebergConfig: field_type: String comments: Full name of table, must include schema name. required: true + - name: catalog.credential + field_type: String + comments: |- + Credential for accessing iceberg catalog, only applicable in rest catalog. + A credential to exchange for a token in the OAuth2 client credentials flow. + required: false + - name: catalog.token + field_type: String + comments: |- + token for accessing iceberg catalog, only applicable in rest catalog. + A Bearer token which will be used for interaction with the server. + required: false + - name: catalog.oauth2-server-uri + field_type: String + comments: |- + `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog. + Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server. + required: false + - name: catalog.scope + field_type: String + comments: |- + scope for accessing iceberg catalog, only applicable in rest catalog. + Additional scope for OAuth2. + required: false - name: s3.path.style.access field_type: bool required: false diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 75972546b2994..f1db0a4276733 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -117,6 +117,30 @@ IcebergProperties: field_type: String comments: Full name of table, must include schema name. required: true + - name: catalog.credential + field_type: String + comments: |- + Credential for accessing iceberg catalog, only applicable in rest catalog. + A credential to exchange for a token in the OAuth2 client credentials flow. + required: false + - name: catalog.token + field_type: String + comments: |- + token for accessing iceberg catalog, only applicable in rest catalog. + A Bearer token which will be used for interaction with the server. + required: false + - name: catalog.oauth2-server-uri + field_type: String + comments: |- + `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog. + Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server. + required: false + - name: catalog.scope + field_type: String + comments: |- + scope for accessing iceberg catalog, only applicable in rest catalog. + Additional scope for OAuth2. + required: false - name: s3.path.style.access field_type: bool required: false