Skip to content

Commit

Permalink
resolve conficts with iceberg properties refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 20, 2024
1 parent 5ebd4c5 commit 814fc50
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 110 deletions.
141 changes: 141 additions & 0 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use clap::ValueEnum;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use icelake::catalog::{
load_iceberg_base_catalog_config, BaseCatalogConfig, CATALOG_NAME, CATALOG_TYPE,
};
use risingwave_common::bail;
use risingwave_common::config::MetaBackend;
use serde_derive::Deserialize;
use serde_with::serde_as;
use url::Url;
use with_options::WithOptions;

use crate::deserialize_optional_bool_from_string;
use crate::error::ConnectorResult;

#[serde_as]
Expand Down Expand Up @@ -62,6 +65,10 @@ pub struct IcebergCommon {
/// Full name of table, must include schema name.
#[serde(rename = "table.name")]
pub table_name: String,

/// enable config load currently is used by nimtable, so it only support jdbc catalog.
#[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
pub enable_config_load: Option<bool>,
}

impl IcebergCommon {
Expand All @@ -76,12 +83,146 @@ impl IcebergCommon {
.unwrap_or_else(|| "risingwave".to_string())
}

fn build_jni_catalog_configs_for_config_load(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<(BaseCatalogConfig, HashMap<String, String>)> {
if self.catalog_type.as_deref() != Some("jdbc") {
bail!("enable_config_load only support jdbc catalog right now");
}

let mut iceberg_configs = HashMap::new();

let base_catalog_config = {
let catalog_type = self.catalog_type().to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name());

let Ok(s3_region) = std::env::var("AWS_REGION") else {
bail!("To create an iceberg engine table, AWS_REGION needed to be set");
};

// icelake
iceberg_configs.insert(
"iceberg.table.io.region".to_string(),
s3_region.clone().to_string(),
);
// iceberg-rust
iceberg_configs.insert(
("iceberg.table.io.".to_string() + S3_REGION).to_string(),
s3_region.clone().to_string(),
);

let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else {
bail!("To create an iceberg engine table, AWS_S3_BUCKET needed to be set");
};

let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else {
bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set");
};
let warehouse_path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory);

let (bucket, _) = {
let url = Url::parse(&warehouse_path)
.with_context(|| format!("Invalid warehouse path: {}", warehouse_path))?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);

load_iceberg_base_catalog_config(&iceberg_configs)?
};

// Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/
let mut java_catalog_configs = HashMap::new();
{
let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else {
bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set");
};

let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else {
bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set");
};

let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else {
bail!("To create an iceberg engine table, RW_BACKEND needed to be set");
};
let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
bail!("failed to parse meta backend: {}", meta_store_backend);
};

let catalog_uri = match meta_backend {
MetaBackend::Postgres => format!(
"jdbc:postgresql://{}/{}",
meta_store_endpoint.clone(),
meta_store_database.clone()
),
MetaBackend::Mysql => format!(
"jdbc:mysql://{}/{}",
meta_store_endpoint.clone(),
meta_store_database.clone()
),
MetaBackend::Sqlite | MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => {
bail!(
"Unsupported meta backend for iceberg engine table: {}",
meta_store_backend
);
}
};

java_catalog_configs.insert("uri".to_string(), catalog_uri.to_string());

java_catalog_configs.insert("warehouse".to_string(), self.warehouse_path.clone());
java_catalog_configs.extend(java_catalog_props.clone());

// Currently we only support s3, so let's set it to s3
java_catalog_configs.insert(
"io-impl".to_string(),
"org.apache.iceberg.aws.s3.S3FileIO".to_string(),
);

if let Some(path_style_access) = path_style_access {
java_catalog_configs.insert(
"s3.path-style-access".to_string(),
path_style_access.to_string(),
);
}

let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else {
bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set");
};

let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else {
bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set");
};
java_catalog_configs.insert("jdbc.user".to_string(), meta_store_user);
java_catalog_configs.insert("jdbc.password".to_string(), meta_store_password);
}

Ok((base_catalog_config, java_catalog_configs))
}

/// For both V1 and V2.
fn build_jni_catalog_configs(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<(BaseCatalogConfig, HashMap<String, String>)> {
if let Some(enable_config_load) = self.enable_config_load
&& enable_config_load
{
return self
.build_jni_catalog_configs_for_config_load(path_style_access, java_catalog_props);
}
let mut iceberg_configs = HashMap::new();

let base_catalog_config = {
Expand Down
83 changes: 1 addition & 82 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ use std::sync::Arc;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use clap::ValueEnum;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent};
use icelake::catalog::CatalogRef;
use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter};
Expand All @@ -45,7 +41,6 @@ use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::config::MetaBackend;
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
use risingwave_pb::connector_service::SinkMetadata;
Expand Down Expand Up @@ -109,10 +104,6 @@ pub struct IcebergConfig {

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub create_table_if_not_exists: bool,

/// enable config load currently is used by nimtable, so it only support jdbc catalog.
#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub enable_config_load: bool,
}

impl IcebergConfig {
Expand Down Expand Up @@ -171,82 +162,10 @@ impl IcebergConfig {
"`commit_checkpoint_interval` must be greater than 0"
)));
}
config = config.fill_for_config_load()?;

Ok(config)
}

pub fn fill_for_config_load(mut self) -> Result<Self> {
if self.enable_config_load {
if self.catalog_type.as_deref() != Some("jdbc") {
return Err(SinkError::Config(anyhow!(
"enable_config_load only support jdbc catalog right now"
)));
}

let Ok(s3_region) = std::env::var("AWS_REGION") else {
bail!("To create an iceberg engine table, AWS_REGION needed to be set");
};
self.region = Some(s3_region);

let Ok(s3_bucket) = std::env::var("AWS_S3_BUCKET") else {
bail!("To create an iceberg engine table, AWS_S3_BUCKET needed to be set");
};

let Ok(data_directory) = std::env::var("RW_DATA_DIRECTORY") else {
bail!("To create an iceberg engine table, RW_DATA_DIRECTORY needed to be set");
};
self.path = format!("s3://{}/{}/nimtable", s3_bucket, data_directory);

let Ok(meta_store_endpoint) = std::env::var("RW_SQL_ENDPOINT") else {
bail!("To create an iceberg engine table, RW_SQL_ENDPOINT needed to be set");
};

let Ok(meta_store_database) = std::env::var("RW_SQL_DATABASE") else {
bail!("To create an iceberg engine table, RW_SQL_DATABASE needed to be set");
};

let Ok(meta_store_backend) = std::env::var("RW_BACKEND") else {
bail!("To create an iceberg engine table, RW_BACKEND needed to be set");
};
let Ok(meta_backend) = MetaBackend::from_str(&meta_store_backend, true) else {
bail!("failed to parse meta backend: {}", meta_store_backend);
};

self.uri = match meta_backend {
MetaBackend::Postgres => Some(format!(
"jdbc:postgresql://{}/{}",
meta_store_endpoint.clone(),
meta_store_database.clone()
)),
MetaBackend::Mysql => Some(format!(
"jdbc:mysql://{}/{}",
meta_store_endpoint.clone(),
meta_store_database.clone()
)),
MetaBackend::Sqlite | MetaBackend::Etcd | MetaBackend::Sql | MetaBackend::Mem => {
bail!(
"Unsupported meta backend for iceberg engine table: {}",
meta_store_backend
);
}
};

let Ok(meta_store_user) = std::env::var("RW_SQL_USERNAME") else {
bail!("To create an iceberg engine table, RW_SQL_USERNAME needed to be set");
};

let Ok(meta_store_password) = std::env::var("RW_SQL_PASSWORD") else {
bail!("To create an iceberg engine table, RW_SQL_PASSWORD needed to be set");
};

let java_catalog_props = &mut self.java_catalog_props;
java_catalog_props.insert("jdbc.user".to_string(), meta_store_user);
java_catalog_props.insert("jdbc.password".to_string(), meta_store_password);
}
Ok(self)
}

pub fn catalog_type(&self) -> &str {
self.common.catalog_type()
}
Expand Down Expand Up @@ -1038,6 +957,7 @@ mod test {
catalog_name: Some("demo".to_string()),
database_name: Some("demo_db".to_string()),
table_name: "demo_table".to_string(),
enable_config_load: None,
},
r#type: "upsert".to_string(),
force_append_only: false,
Expand All @@ -1049,7 +969,6 @@ mod test {
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
create_table_if_not_exists: false,
enable_config_load: false,
};

assert_eq!(iceberg_config, expected_iceberg_config);
Expand Down
24 changes: 16 additions & 8 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod parquet_file_reader;

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
Expand All @@ -30,7 +31,6 @@ use risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};

use crate::connector_common::IcebergCommon;
use crate::deserialize_optional_bool_from_string;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::ParserConfig;
use crate::source::{
Expand All @@ -50,20 +50,28 @@ pub struct IcebergProperties {
#[serde(rename = "catalog.jdbc.password")]
pub jdbc_password: Option<String>,

#[serde(
rename = "enable_config_load",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub enable_config_load: Option<bool>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

use iceberg::table::Table as TableV2;
use iceberg::Catalog as CatalogV2;

impl IcebergProperties {
pub async fn create_catalog_v2(&self) -> ConnectorResult<Arc<dyn CatalogV2>> {
let mut java_catalog_props = HashMap::new();
if let Some(jdbc_user) = self.jdbc_user.clone() {
java_catalog_props.insert("jdbc.user".to_string(), jdbc_user);
}
if let Some(jdbc_password) = self.jdbc_password.clone() {
java_catalog_props.insert("jdbc.password".to_string(), jdbc_password);
}
// TODO: support path_style_access and java_catalog_props for iceberg source
self.common
.create_catalog_v2(&None, &java_catalog_props)
.await
}

pub async fn load_table_v2(&self) -> ConnectorResult<TableV2> {
let mut java_catalog_props = HashMap::new();
if let Some(jdbc_user) = self.jdbc_user.clone() {
Expand Down
10 changes: 5 additions & 5 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ IcebergConfig:
field_type: String
comments: Full name of table, must include schema name.
required: true
- name: enable_config_load
field_type: bool
comments: enable config load currently is used by nimtable, so it only support jdbc catalog.
required: false
default: Default::default
- name: s3.path.style.access
field_type: bool
required: false
Expand All @@ -347,11 +352,6 @@ IcebergConfig:
field_type: bool
required: false
default: Default::default
- name: enable_config_load
field_type: bool
comments: enable config load currently is used by nimtable, so it only support jdbc catalog.
required: false
default: Default::default
KafkaConfig:
fields:
- name: properties.bootstrap.server
Expand Down
Loading

0 comments on commit 814fc50

Please sign in to comment.