Skip to content

Commit

Permalink
refactor: Use icelake catalog api to simplify things. (#12303)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Sep 19, 2023
1 parent 441583a commit fa6c391
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 102 deletions.
46 changes: 44 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ hashbrown = { version = "0.14.0", features = [
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.3.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.3.1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "a6790d17094754959e351fac1e11147e37643e97" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" }
arrow-array = "46"
arrow-schema = "46"
arrow-buffer = "46"
Expand Down
10 changes: 6 additions & 4 deletions e2e_test/iceberg/iceberg_sink_v2.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',

catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
s3.access.key = 'admin',
s3.secret.key = 'password'
);

statement ok
Expand Down
175 changes: 81 additions & 94 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema};
use async_trait::async_trait;
use icelake::config::{TableConfig, TableConfigRef};
use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, DataFile};
use icelake::Table;
use opendal::services::S3;
use icelake::{Table, TableIdentifier};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
Expand All @@ -35,7 +33,6 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_json::Value;
use url::Url;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
Expand All @@ -57,15 +54,29 @@ pub type RemoteIcebergConfig = RemoteConfig;
#[serde(deny_unknown_fields)]
pub struct IcebergConfig {
#[serde(skip_serializing)]
pub connector: String, // Must be "kafka" here.
pub connector: String, // Must be "iceberg" here.

pub r#type: String, // accept "append-only" or "upsert"

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub force_append_only: bool,

#[serde(rename = "table.name")]
pub table_name: String, // Full name of table, must include schema name

#[serde(rename = "database.name")]
pub database_name: String, // Use as catalog name.

// Catalog type supported by iceberg, such as "storage", "rest".
// If not set, we use "storage" as default.
#[serde(rename = "catalog.type")]
pub catalog_type: Option<String>,

#[serde(rename = "warehouse.path")]
pub path: String,
pub path: Option<String>, // Path of iceberg warehouse, only applicable in storage catalog.

#[serde(rename = "catalog.uri")]
pub uri: Option<String>, // URI of iceberg catalog, only applicable in rest catalog.

#[serde(rename = "s3.region")]
pub region: Option<String>,
Expand All @@ -78,26 +89,12 @@ pub struct IcebergConfig {

#[serde(rename = "s3.secret.key")]
pub secret_key: String,

#[serde(rename = "database.name")]
pub database_name: String,

#[serde(rename = "table.name")]
pub table_name: String,

#[serde(skip)]
pub iceberg_table_config: TableConfigRef,
}

impl IcebergConfig {
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
let iceberg_table_config =
Arc::new(TableConfig::try_from(&values).map_err(|e| SinkError::Iceberg(anyhow!(e)))?);
let mut config =
serde_json::from_value::<IcebergConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

config.iceberg_table_config = iceberg_table_config;
let config = serde_json::from_value::<IcebergConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
Expand All @@ -108,21 +105,66 @@ impl IcebergConfig {
)));
}

if config.endpoint.is_none() && config.region.is_none() {
return Err(SinkError::Config(anyhow!(
"You must fill either s3 region or s3 endpoint",
)));
Ok(config)
}

fn build_iceberg_configs(&self) -> HashMap<String, String> {
let mut iceberg_configs = HashMap::new();
iceberg_configs.insert(
CATALOG_TYPE.to_string(),
self.catalog_type
.as_deref()
.unwrap_or("storage")
.to_string(),
);
iceberg_configs.insert(
CATALOG_NAME.to_string(),
self.database_name.clone().to_string(),
);
if let Some(path) = &self.path {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
path.clone().to_string(),
);
}

Ok(config)
if let Some(uri) = &self.uri {
iceberg_configs.insert(
format!("iceberg.catalog.{}.uri", self.database_name),
uri.clone().to_string(),
);
}

if let Some(region) = &self.region {
iceberg_configs.insert(
"iceberg.catalog.table.io.region".to_string(),
region.clone().to_string(),
);
}

if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(
"iceberg.catalog.table.io.endpoint".to_string(),
endpoint.clone().to_string(),
);
}

iceberg_configs.insert(
"iceberg.catalog.table.io.access_key_id".to_string(),
self.access_key.clone().to_string(),
);
iceberg_configs.insert(
"iceberg.catalog.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);

iceberg_configs
}
}

pub struct IcebergSink {
config: IcebergConfig,
param: SinkParam,
table_root: String,
bucket_name: String,
}

impl Debug for IcebergSink {
Expand All @@ -135,32 +177,17 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_table(&self) -> Result<Table> {
let mut builder = S3::default();

// Sink will not load config from file.
builder.disable_config_load();

builder
.root(&self.table_root)
.bucket(&self.bucket_name)
.access_key_id(&self.config.access_key)
.secret_access_key(&self.config.secret_key);

if let Some(region) = &self.config.region {
builder.region(region);
}

if let Some(endpoint) = &self.config.endpoint {
builder.endpoint(endpoint);
}
let catalog = load_catalog(&self.config.build_iceberg_configs())
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

let op = opendal::Operator::new(builder)
.map_err(|err| SinkError::Config(anyhow!("{err}")))?
.finish();
let table_id = TableIdentifier::new(self.config.table_name.split('.'))
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?;

let table = Table::open_with_config(op, self.config.iceberg_table_config.clone())
let table = catalog
.load_table(&table_id)
.await
.map_err(|err| SinkError::Iceberg(anyhow!("Create table fail: {}", err)))?;
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;

let sink_schema = self.param.schema();
let iceberg_schema = table
Expand All @@ -176,55 +203,15 @@ impl IcebergSink {
Ok(table)
}

/// Parse bucket name and table root path.
///
/// return (bucket name, table root path)
fn parse_bucket_and_root_from_path(config: &IcebergConfig) -> Result<(String, String)> {
let url = Url::parse(&config.path).map_err(|err| {
SinkError::Config(anyhow!(
"Fail to parse Invalid path: {}, caused by: {}",
&config.path,
err
))
})?;

let scheme = url.scheme();
if scheme != "s3a" && scheme != "s3" && scheme != "s3n" {
return Err(SinkError::Config(anyhow!(
"Invalid path: {}, only support s3a,s3,s3n prefix",
&config.path
)));
}

let bucket = url
.host_str()
.ok_or_else(|| SinkError::Config(anyhow!("Invalid path: {}", &config.path)))?;
let root = url.path();

let table_root_path = if root.is_empty() {
format!("/{}/{}", config.database_name, config.table_name)
} else {
format!("{}/{}/{}", root, config.database_name, config.table_name)
};

Ok((bucket.to_string(), table_root_path))
}

pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
let (bucket_name, table_root) = Self::parse_bucket_and_root_from_path(&config)?;
// TODO(ZENOTME): Only support append-only mode now.
if !config.force_append_only {
return Err(SinkError::Iceberg(anyhow!(
"Iceberg sink only support append-only mode now."
)));
}

Ok(Self {
config,
param,
table_root,
bucket_name,
})
Ok(Self { config, param })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl SinkImpl {
SinkImpl::ClickHouse(_) => "clickhouse",
SinkImpl::Iceberg(_) => "iceberg",
SinkImpl::Nats(_) => "nats",
SinkImpl::RemoteIceberg(_) => "iceberg",
SinkImpl::RemoteIceberg(_) => "iceberg_java",
SinkImpl::TestSink(_) => "test",
}
}
Expand Down

0 comments on commit fa6c391

Please sign in to comment.