Skip to content

Commit

Permalink
feat(iceberg): support rest authentication (#19406)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Nov 21, 2024
1 parent a771dab commit e544337
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 33 deletions.
125 changes: 93 additions & 32 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ pub struct IcebergCommon {
/// Full name of table, must include schema name.
#[serde(rename = "table.name")]
pub table_name: String,
/// Credential for accessing iceberg catalog, only applicable in rest catalog.
/// A credential to exchange for a token in the OAuth2 client credentials flow.
#[serde(rename = "catalog.credential")]
pub credential: Option<String>,
/// token for accessing iceberg catalog, only applicable in rest catalog.
/// A Bearer token which will be used for interaction with the server.
#[serde(rename = "catalog.token")]
pub token: Option<String>,
/// `oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
/// Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
#[serde(rename = "catalog.oauth2-server-uri")]
pub oauth2_server_uri: Option<String>,
/// scope for accessing iceberg catalog, only applicable in rest catalog.
/// Additional scope for OAuth2.
#[serde(rename = "catalog.scope")]
pub scope: Option<String>,

#[serde(
rename = "s3.path.style.access",
Expand Down Expand Up @@ -145,20 +161,32 @@ impl IcebergCommon {
match &self.warehouse_path {
Some(warehouse_path) => {
let (bucket, _) = {
let url = Url::parse(warehouse_path).with_context(|| {
format!("Invalid warehouse path: {}", warehouse_path)
})?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
let url = Url::parse(warehouse_path);
if url.is_err() && catalog_type == "rest" {
// If the warehouse path is not a valid URL, it could be a warehouse name in rest catalog
// so we allow it to pass here.
(None, None)
} else {
let url = url.with_context(|| {
format!("Invalid warehouse path: {}", warehouse_path)
})?;
let bucket = url
.host_str()
.with_context(|| {
format!(
"Invalid s3 path: {}, bucket is missing",
warehouse_path
)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(Some(bucket), Some(root))
}
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
if let Some(bucket) = bucket {
iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
}
}
None => {
if catalog_type != "rest" {
Expand Down Expand Up @@ -219,29 +247,48 @@ impl IcebergCommon {
path_style_access.to_string(),
);
}
if matches!(self.catalog_type.as_deref(), Some("glue")) {
java_catalog_configs.insert(
"client.credentials-provider".to_string(),
"com.risingwave.connector.catalog.GlueCredentialProvider".to_string(),
);
// Use S3 ak/sk and region as glue ak/sk and region by default.
// TODO: use different ak/sk and region for s3 and glue.
java_catalog_configs.insert(
"client.credentials-provider.glue.access-key-id".to_string(),
self.access_key.clone().to_string(),
);
java_catalog_configs.insert(
"client.credentials-provider.glue.secret-access-key".to_string(),
self.secret_key.clone().to_string(),
);
if let Some(region) = &self.region {
java_catalog_configs
.insert("client.region".to_string(), region.clone().to_string());

match self.catalog_type.as_deref() {
Some("rest") => {
if let Some(credential) = &self.credential {
java_catalog_configs.insert("credential".to_string(), credential.clone());
}
if let Some(token) = &self.token {
java_catalog_configs.insert("token".to_string(), token.clone());
}
if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
java_catalog_configs
.insert("oauth2-server-uri".to_string(), oauth2_server_uri.clone());
}
if let Some(scope) = &self.scope {
java_catalog_configs.insert("scope".to_string(), scope.clone());
}
}
Some("glue") => {
java_catalog_configs.insert(
"client.credentials-provider".to_string(),
"com.risingwave.connector.catalog.GlueCredentialProvider".to_string(),
);
// Use S3 ak/sk and region as glue ak/sk and region by default.
// TODO: use different ak/sk and region for s3 and glue.
java_catalog_configs.insert(
"client.credentials-provider.glue.access-key-id".to_string(),
self.access_key.clone().to_string(),
);
java_catalog_configs.insert(
"glue.endpoint".to_string(),
format!("https://glue.{}.amazonaws.com", region),
"client.credentials-provider.glue.secret-access-key".to_string(),
self.secret_key.clone().to_string(),
);
if let Some(region) = &self.region {
java_catalog_configs
.insert("client.region".to_string(), region.clone().to_string());
java_catalog_configs.insert(
"glue.endpoint".to_string(),
format!("https://glue.{}.amazonaws.com", region),
);
}
}
_ => {}
}
}

Expand Down Expand Up @@ -492,6 +539,20 @@ mod v2 {
S3_SECRET_ACCESS_KEY.to_string(),
self.secret_key.clone().to_string(),
);
if let Some(credential) = &self.credential {
iceberg_configs.insert("credential".to_string(), credential.clone());
}
if let Some(token) = &self.token {
iceberg_configs.insert("token".to_string(), token.clone());
}
if let Some(oauth2_server_uri) = &self.oauth2_server_uri {
iceberg_configs
.insert("oauth2-server-uri".to_string(), oauth2_server_uri.clone());
}
if let Some(scope) = &self.scope {
iceberg_configs.insert("scope".to_string(), scope.clone());
}

let config_builder = iceberg_catalog_rest::RestCatalogConfig::builder()
.uri(self.catalog_uri.clone().with_context(|| {
"`catalog.uri` must be set in rest catalog".to_string()
Expand Down
18 changes: 17 additions & 1 deletion src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use risingwave_pb::connector_service::SinkMetadata;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use url::Url;
use with_options::WithOptions;

use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder;
Expand Down Expand Up @@ -283,7 +284,18 @@ impl IcebergSink {
names.push(self.config.common.table_name.to_string());
match &self.config.common.warehouse_path {
Some(warehouse_path) => {
if warehouse_path.ends_with('/') {
let url = Url::parse(warehouse_path);
if url.is_err() {
// For rest catalog, the warehouse_path could be a warehouse name.
// In this case, we should specify the location when creating a table.
if self.config.common.catalog_type() == "rest"
|| self.config.common.catalog_type() == "rest_rust"
{
None
} else {
bail!(format!("Invalid warehouse path: {}", warehouse_path))
}
} else if warehouse_path.ends_with('/') {
Some(format!("{}{}", warehouse_path, names.join("/")))
} else {
Some(format!("{}/{}", warehouse_path, names.join("/")))
Expand Down Expand Up @@ -1017,6 +1029,10 @@ mod test {
database_name: Some("demo_db".to_string()),
table_name: "demo_table".to_string(),
path_style_access: Some(true),
credential: None,
oauth2_server_uri: None,
scope: None,
token: None,
},
r#type: "upsert".to_string(),
force_append_only: false,
Expand Down
24 changes: 24 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,30 @@ IcebergConfig:
field_type: String
comments: Full name of table, must include schema name.
required: true
- name: catalog.credential
field_type: String
comments: |-
Credential for accessing iceberg catalog, only applicable in rest catalog.
A credential to exchange for a token in the OAuth2 client credentials flow.
required: false
- name: catalog.token
field_type: String
comments: |-
token for accessing iceberg catalog, only applicable in rest catalog.
A Bearer token which will be used for interaction with the server.
required: false
- name: catalog.oauth2-server-uri
field_type: String
comments: |-
`oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
required: false
- name: catalog.scope
field_type: String
comments: |-
scope for accessing iceberg catalog, only applicable in rest catalog.
Additional scope for OAuth2.
required: false
- name: s3.path.style.access
field_type: bool
required: false
Expand Down
24 changes: 24 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,30 @@ IcebergProperties:
field_type: String
comments: Full name of table, must include schema name.
required: true
- name: catalog.credential
field_type: String
comments: |-
Credential for accessing iceberg catalog, only applicable in rest catalog.
A credential to exchange for a token in the OAuth2 client credentials flow.
required: false
- name: catalog.token
field_type: String
comments: |-
token for accessing iceberg catalog, only applicable in rest catalog.
A Bearer token which will be used for interaction with the server.
required: false
- name: catalog.oauth2-server-uri
field_type: String
comments: |-
`oauth2-server-uri` for accessing iceberg catalog, only applicable in rest catalog.
Token endpoint URI to fetch token from if the Rest Catalog is not the authorization server.
required: false
- name: catalog.scope
field_type: String
comments: |-
scope for accessing iceberg catalog, only applicable in rest catalog.
Additional scope for OAuth2.
required: false
- name: s3.path.style.access
field_type: bool
required: false
Expand Down

0 comments on commit e544337

Please sign in to comment.