Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use icelake catalog api to simplify things. #12303

Merged
merged 4 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ hashbrown = { version = "0.14.0", features = [
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.3.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.3.1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "a6790d17094754959e351fac1e11147e37643e97" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" }
arrow-array = "46"
arrow-schema = "46"
arrow-buffer = "46"
Expand Down
10 changes: 6 additions & 4 deletions e2e_test/iceberg/iceberg_sink_v2.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',

catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
s3.access.key = 'admin',
s3.secret.key = 'password'
);

statement ok
Expand Down
175 changes: 81 additions & 94 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema};
use async_trait::async_trait;
use icelake::config::{TableConfig, TableConfigRef};
use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, DataFile};
use icelake::Table;
use opendal::services::S3;
use icelake::{Table, TableIdentifier};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
Expand All @@ -35,7 +33,6 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_json::Value;
use url::Url;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
Expand All @@ -57,15 +54,29 @@ pub type RemoteIcebergConfig = RemoteConfig;
#[serde(deny_unknown_fields)]
pub struct IcebergConfig {
#[serde(skip_serializing)]
pub connector: String, // Must be "kafka" here.
pub connector: String, // Must be "iceberg" here.

pub r#type: String, // accept "append-only" or "upsert"

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub force_append_only: bool,

#[serde(rename = "table.name")]
pub table_name: String, // Full name of table, must include schema name

#[serde(rename = "database.name")]
pub database_name: String, // Use as catalog name.

// Catalog type supported by iceberg, such as "storage", "rest".
// If not set, we use "storage" as default.
#[serde(rename = "catalog.type")]
pub catalog_type: Option<String>,

#[serde(rename = "warehouse.path")]
pub path: String,
pub path: Option<String>, // Path of iceberg warehouse, only applicable in storage catalog.

#[serde(rename = "catalog.uri")]
pub uri: Option<String>, // URI of iceberg catalog, only applicable in rest catalog.

#[serde(rename = "s3.region")]
pub region: Option<String>,
Expand All @@ -78,26 +89,12 @@ pub struct IcebergConfig {

#[serde(rename = "s3.secret.key")]
pub secret_key: String,

#[serde(rename = "database.name")]
pub database_name: String,

#[serde(rename = "table.name")]
pub table_name: String,

#[serde(skip)]
pub iceberg_table_config: TableConfigRef,
}

impl IcebergConfig {
pub fn from_hashmap(values: HashMap<String, String>) -> Result<Self> {
let iceberg_table_config =
Arc::new(TableConfig::try_from(&values).map_err(|e| SinkError::Iceberg(anyhow!(e)))?);
let mut config =
serde_json::from_value::<IcebergConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

config.iceberg_table_config = iceberg_table_config;
let config = serde_json::from_value::<IcebergConfig>(serde_json::to_value(values).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;

if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
return Err(SinkError::Config(anyhow!(
Expand All @@ -108,21 +105,66 @@ impl IcebergConfig {
)));
}

if config.endpoint.is_none() && config.region.is_none() {
return Err(SinkError::Config(anyhow!(
"You must fill either s3 region or s3 endpoint",
)));
Ok(config)
}

fn build_iceberg_configs(&self) -> HashMap<String, String> {
let mut iceberg_configs = HashMap::new();
iceberg_configs.insert(
CATALOG_TYPE.to_string(),
self.catalog_type
.as_deref()
.unwrap_or("storage")
.to_string(),
);
iceberg_configs.insert(
CATALOG_NAME.to_string(),
self.database_name.clone().to_string(),
);
if let Some(path) = &self.path {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
path.clone().to_string(),
);
}

Ok(config)
if let Some(uri) = &self.uri {
iceberg_configs.insert(
format!("iceberg.catalog.{}.uri", self.database_name),
uri.clone().to_string(),
);
}

if let Some(region) = &self.region {
iceberg_configs.insert(
"iceberg.catalog.table.io.region".to_string(),
region.clone().to_string(),
);
}

if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(
"iceberg.catalog.table.io.endpoint".to_string(),
endpoint.clone().to_string(),
);
}

iceberg_configs.insert(
"iceberg.catalog.table.io.access_key_id".to_string(),
self.access_key.clone().to_string(),
);
iceberg_configs.insert(
"iceberg.catalog.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);

iceberg_configs
}
}

pub struct IcebergSink {
config: IcebergConfig,
param: SinkParam,
table_root: String,
bucket_name: String,
}

impl Debug for IcebergSink {
Expand All @@ -135,32 +177,17 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_table(&self) -> Result<Table> {
let mut builder = S3::default();

// Sink will not load config from file.
builder.disable_config_load();

builder
.root(&self.table_root)
.bucket(&self.bucket_name)
.access_key_id(&self.config.access_key)
.secret_access_key(&self.config.secret_key);

if let Some(region) = &self.config.region {
builder.region(region);
}

if let Some(endpoint) = &self.config.endpoint {
builder.endpoint(endpoint);
}
let catalog = load_catalog(&self.config.build_iceberg_configs())
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

let op = opendal::Operator::new(builder)
.map_err(|err| SinkError::Config(anyhow!("{err}")))?
.finish();
let table_id = TableIdentifier::new(self.config.table_name.split('.'))
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?;

let table = Table::open_with_config(op, self.config.iceberg_table_config.clone())
let table = catalog
.load_table(&table_id)
.await
.map_err(|err| SinkError::Iceberg(anyhow!("Create table fail: {}", err)))?;
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?;

let sink_schema = self.param.schema();
let iceberg_schema = table
Expand All @@ -176,55 +203,15 @@ impl IcebergSink {
Ok(table)
}

/// Parse bucket name and table root path.
///
/// return (bucket name, table root path)
fn parse_bucket_and_root_from_path(config: &IcebergConfig) -> Result<(String, String)> {
let url = Url::parse(&config.path).map_err(|err| {
SinkError::Config(anyhow!(
"Fail to parse Invalid path: {}, caused by: {}",
&config.path,
err
))
})?;

let scheme = url.scheme();
if scheme != "s3a" && scheme != "s3" && scheme != "s3n" {
return Err(SinkError::Config(anyhow!(
"Invalid path: {}, only support s3a,s3,s3n prefix",
&config.path
)));
}

let bucket = url
.host_str()
.ok_or_else(|| SinkError::Config(anyhow!("Invalid path: {}", &config.path)))?;
let root = url.path();

let table_root_path = if root.is_empty() {
format!("/{}/{}", config.database_name, config.table_name)
} else {
format!("{}/{}/{}", root, config.database_name, config.table_name)
};

Ok((bucket.to_string(), table_root_path))
}

pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
let (bucket_name, table_root) = Self::parse_bucket_and_root_from_path(&config)?;
// TODO(ZENOTME): Only support append-only mode now.
if !config.force_append_only {
return Err(SinkError::Iceberg(anyhow!(
"Iceberg sink only support append-only mode now."
)));
}

Ok(Self {
config,
param,
table_root,
bucket_name,
})
Ok(Self { config, param })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl SinkImpl {
SinkImpl::ClickHouse(_) => "clickhouse",
SinkImpl::Iceberg(_) => "iceberg",
SinkImpl::Nats(_) => "nats",
SinkImpl::RemoteIceberg(_) => "iceberg",
SinkImpl::RemoteIceberg(_) => "iceberg_java",
SinkImpl::TestSink(_) => "test",
}
}
Expand Down
Loading