Skip to content

Commit

Permalink
Fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Sep 19, 2023
1 parent 99df9ab commit 0e15117
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 23 deletions.
12 changes: 6 additions & 6 deletions e2e_test/iceberg/iceberg_sink_v2.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',

iceberg.catalog.type = 'storage',
iceberg.catalog.name = 'demo',
iceberg.catalog.demo.warehouse = 's3://icebergdata/demo',
iceberg.table.io.endpoint = 'http://127.0.0.1:9301',
iceberg.table.io.access_key_id = 'admin',
iceberg.table.io.secret_access_key = 'password'
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'admin',
s3.secret.key = 'password'
);

statement ok
Expand Down
91 changes: 74 additions & 17 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema};
use async_trait::async_trait;
use icelake::catalog::load_catalog;
use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, DataFile};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use opendal::services::S3;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -66,23 +64,35 @@ pub struct IcebergConfig {
#[serde(rename = "table.name")]
pub table_name: String, // Full name of table, must include schema name

#[serde(skip)]
pub iceberg_configs: HashMap<String, String>,
#[serde(rename = "database.name")]
pub database_name: String, // Use as catalog name.

#[serde(rename = "catalog.type")]
pub catalog_type: String, // Catalog type supported by iceberg, such as "storage", "rest"

#[serde(rename = "warehouse.path")]
pub path: Option<String>, // Path of iceberg warehouse, only applicable in storage catalog.

#[serde(rename = "catalog.uri")]
pub uri: Option<String>, // URI of iceberg catalog, only applicable in rest catalog.

#[serde(rename = "s3.region")]
pub region: Option<String>,

#[serde(rename = "s3.endpoint")]
pub endpoint: Option<String>,

#[serde(rename = "s3.access.key")]
pub access_key: String,

#[serde(rename = "s3.secret.key")]
pub secret_key: String,
}

impl IcebergConfig {
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
let iceberg_configs = values
.iter()
.filter(|(k, _v)| k.starts_with("iceberg."))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();

let mut config =
serde_json::from_value::<IcebergConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

config.iceberg_configs = iceberg_configs;
let config = serde_json::from_value::<IcebergConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
Expand All @@ -95,6 +105,53 @@ impl IcebergConfig {

Ok(config)
}

fn build_iceberg_configs(&self) -> HashMap<String, String> {
let mut iceberg_configs = HashMap::new();
iceberg_configs.insert(CATALOG_TYPE.to_string(), self.catalog_type.clone());
iceberg_configs.insert(
CATALOG_NAME.to_string(),
self.database_name.clone().to_string(),
);
if let Some(path) = &self.path {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
path.clone().to_string(),
);
}

if let Some(uri) = &self.uri {
iceberg_configs.insert(
format!("iceberg.catalog.{}.uri", self.database_name),
uri.clone().to_string(),
);
}

if let Some(region) = &self.region {
iceberg_configs.insert(
"iceberg.catalog.table.io.region".to_string(),
region.clone().to_string(),
);
}

if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(
"iceberg.catalog.table.io.endpoint".to_string(),
endpoint.clone().to_string(),
);
}

iceberg_configs.insert(
"iceberg.catalog.table.io.access_key_id".to_string(),
self.access_key.clone().to_string(),
);
iceberg_configs.insert(
"iceberg.catalog.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);

iceberg_configs
}
}

pub struct IcebergSink {
Expand All @@ -112,7 +169,7 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_table(&self) -> Result<Table> {
let catalog = load_catalog(&self.config.iceberg_configs)
let catalog = load_catalog(&self.config.build_iceberg_configs())
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

Expand Down

0 comments on commit 0e15117

Please sign in to comment.