From 7888b8f461a75953c63a1b210cf716bcbb8a4a53 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 14:48:01 +0800 Subject: [PATCH 01/12] support iceberg scan executor --- Cargo.lock | 3 + src/batch/Cargo.toml | 3 + src/batch/src/error.rs | 7 + src/batch/src/executor/iceberg_scan.rs | 185 ++++++++++++++++++ src/batch/src/executor/mod.rs | 1 + src/connector/src/sink/iceberg/jni_catalog.rs | 5 +- src/connector/src/sink/iceberg/mod.rs | 59 +++--- 7 files changed, 230 insertions(+), 33 deletions(-) create mode 100644 src/batch/src/executor/iceberg_scan.rs diff --git a/Cargo.lock b/Cargo.lock index 7e6f91306c3a..2f84d4a83e40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8574,6 +8574,8 @@ name = "risingwave_batch" version = "1.7.0-alpha" dependencies = [ "anyhow", + "arrow-array 50.0.0", + "arrow-schema 50.0.0", "assert_matches", "async-recursion", "async-trait", @@ -8584,6 +8586,7 @@ dependencies = [ "futures-util", "hashbrown 0.14.0", "hytra", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 053a133bc233..c1fcc23ca348 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -15,6 +15,8 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" +arrow-array = { workspace = true } +arrow-schema = { workspace = true } assert_matches = "1" async-recursion = "1" async-trait = "0.1" @@ -24,6 +26,7 @@ futures-async-stream = { workspace = true } futures-util = "0.3" hashbrown = { workspace = true } hytra = "0.1.2" +icelake = { workspace = true } itertools = "0.12" memcomparable = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index 11614386cd0b..1cbfc7247827 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -109,6 +109,13 @@ pub enum BatchError { DmlError, ), + #[error(transparent)] + Iceberg( + #[from] + #[backtrace] + icelake::Error, + ), + // Make the ref-counted type to be a variant for easier code structuring. // TODO(error-handling): replace with `thiserror_ext::Arc` #[error(transparent)] diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs new file mode 100644 index 000000000000..20dc9df97c70 --- /dev/null +++ b/src/batch/src/executor/iceberg_scan.rs @@ -0,0 +1,185 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::{DefaultHasher, Hash, Hasher}; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow_array::RecordBatch; +use futures_async_stream::try_stream; +use futures_util::stream::StreamExt; +use icelake::io::{FileScan, TableScan}; +use icelake::TableIdentifier; +use risingwave_common::catalog::Schema; +use risingwave_connector::sink::iceberg::IcebergConfig; + +use crate::error::BatchError; +use crate::executor::{DataChunk, Executor}; + +/// Create a iceberg scan executor. +/// +/// # Examples +/// +/// ``` +/// use futures_async_stream::for_await; +/// use risingwave_common::catalog::{Field, Schema}; +/// use risingwave_common::types::DataType; +/// use risingwave_connector::sink::iceberg::IcebergConfig; +/// +/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor}; +/// use crate::executor::Executor; +/// +/// #[tokio::test] +/// async fn test_iceberg_scan() { +/// let iceberg_scan_executor = IcebergScanExecutor { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// file_selector: FileSelector::select_all(), +/// iceberg_config: IcebergConfig { +/// database_name: "demo_db".into(), +/// table_name: "demo_table".into(), +/// catalog_type: Some("storage".into()), +/// path: "s3a://hummock001/".into(), +/// endpoint: Some("http://127.0.0.1:9301".into()), +/// access_key: "hummockadmin".into(), +/// secret_key: "hummockadmin".into(), +/// region: Some("us-east-1".into()), +/// ..Default::default() +/// }, +/// snapshot_id: None, +/// batch_size: 1024, +/// schema: Schema::new(vec![ +/// Field::with_name(DataType::Int64, "seq_id"), +/// Field::with_name(DataType::Int64, "user_id"), +/// Field::with_name(DataType::Varchar, "user_name"), +/// ]), +/// identity: "iceberg_scan".into(), +/// }; +/// +/// let stream = Box::new(iceberg_scan_executor).execute(); +/// #[for_await] +/// for chunk in stream { +/// let chunk = chunk.unwrap(); +/// println!("{:?}", chunk); +/// } +/// } +/// ``` + +pub struct IcebergScanExecutor { + database_name: String, + table_name: String, + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + + schema: Schema, + identity: String, +} + +impl Executor for IcebergScanExecutor { + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn identity(&self) -> &str { + &self.identity + } + + fn execute(self: Box) -> super::BoxedDataChunkStream { + self.do_execute().boxed() + } +} + +impl IcebergScanExecutor { + #[try_stream(ok = DataChunk, error = BatchError)] + async fn do_execute(self: Box) { + let catalog = self.iceberg_config.create_catalog().await?; + + let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap(); + let table = catalog + .load_table(&table_ident) + .await + .map_err(BatchError::Iceberg)?; + + let table_scan: TableScan = table + .new_scan_builder() + .with_snapshot_id( + self.snapshot_id + .unwrap_or_else(|| table.current_table_metadata().current_snapshot_id.unwrap()), + ) + .with_batch_size(self.batch_size) + .with_column_names(self.schema.names()) + .build() + .map_err(|e| BatchError::Internal(anyhow!(e)))?; + let file_scan_stream: icelake::io::FileScanStream = + table_scan.scan(&table).await.map_err(BatchError::Iceberg)?; + + #[for_await] + for file_scan in file_scan_stream { + let file_scan: FileScan = file_scan.map_err(BatchError::Iceberg)?; + if !self.file_selector.select(file_scan.path()) { + continue; + } + let record_batch_stream = file_scan.scan().await.map_err(BatchError::Iceberg)?; + + #[for_await] + for record_batch in record_batch_stream { + let record_batch: RecordBatch = record_batch.map_err(BatchError::Iceberg)?; + let chunk = Self::record_batch_to_chunk(record_batch)?; + debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + yield chunk; + } + } + } + + fn record_batch_to_chunk(record_batch: RecordBatch) -> Result { + let mut columns = Vec::with_capacity(record_batch.num_columns()); + for array in record_batch.columns() { + let column = Arc::new(array.try_into()?); + columns.push(column); + } + Ok(DataChunk::new(columns, record_batch.num_rows())) + } +} + +pub enum FileSelector { + // File paths to be scanned by this executor are specified. + FileList(Vec), + // Data files to be scanned by this executor could be calculated by Hash(file_path) % num_tasks == task_id. + // task_id, num_tasks + Hash(usize, usize), +} + +impl FileSelector { + fn select_all() -> Self { + FileSelector::Hash(0, 1) + } + + fn select(&self, path: &str) -> bool { + match self { + FileSelector::FileList(paths) => paths.contains(&path.to_string()), + FileSelector::Hash(task_id, num_tasks) => { + let hash = Self::hash_str_to_usize(path); + hash % num_tasks == *task_id + } + } + } + + fn hash_str_to_usize(s: &str) -> usize { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + hasher.finish() as usize + } +} diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index 3b0298643484..b6344ab9600c 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -20,6 +20,7 @@ mod generic_exchange; mod group_top_n; mod hash_agg; mod hop_window; +mod iceberg_scan; mod insert; mod join; mod limit; diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/sink/iceberg/jni_catalog.rs index 0057257df996..11de50e69936 100644 --- a/src/connector/src/sink/iceberg/jni_catalog.rs +++ b/src/connector/src/sink/iceberg/jni_catalog.rs @@ -29,8 +29,6 @@ use jni::JavaVM; use risingwave_jni_core::call_method; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, jobj_to_str, JVM}; -use crate::sink::{Result, SinkError}; - pub struct JniCatalog { java_catalog: GlobalRef, jvm: &'static JavaVM, @@ -144,7 +142,7 @@ impl JniCatalog { name: impl ToString, catalog_impl: impl ToString, java_catalog_props: HashMap, - ) -> Result { + ) -> anyhow::Result { let jvm = JVM.get_or_init()?; execute_with_jni_env(jvm, |env| { @@ -184,6 +182,5 @@ impl JniCatalog { config: base_config, }) as CatalogRef) }) - .map_err(SinkError::Iceberg) } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 9e51557ef477..25b9ecf5d2dc 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -67,7 +67,7 @@ pub const ICEBERG_SINK: &str = "iceberg"; static RW_CATALOG_NAME: &str = "risingwave"; -#[derive(Debug, Clone, Deserialize, WithOptions)] +#[derive(Debug, Clone, Deserialize, WithOptions, Default)] #[serde(deny_unknown_fields)] pub struct IcebergConfig { pub connector: String, // Avoid deny unknown field. Must be "iceberg" @@ -348,42 +348,43 @@ impl IcebergConfig { Ok((base_catalog_config, java_catalog_configs)) } -} -async fn create_catalog(config: &IcebergConfig) -> Result { - match config.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = config.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs) - .await - .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - Ok(catalog) - } - catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", - _ => unreachable!(), - }; + pub async fn create_catalog(&self) -> anyhow::Result { + match self.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = self.build_iceberg_configs()?; + let catalog = load_catalog(&iceberg_configs) + .await + .map_err(|e| anyhow!(e))?; + Ok(catalog) + } + catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", + _ => unreachable!(), + }; - jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) - } - "mock" => Ok(Arc::new(MockCatalog{})), - _ => { - Err(SinkError::Iceberg(anyhow!( + jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) + } + "mock" => Ok(Arc::new(MockCatalog{})), + _ => { + Err(anyhow!( "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", - config.catalog_type() - ))) + self.catalog_type() + )) + } } } } pub async fn create_table(config: &IcebergConfig) -> Result { - let catalog = create_catalog(config) + let catalog = config + .create_catalog() .await .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; From 5e4a47aa469da454c9760b5664b017f25fb1cee8 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 17:28:12 +0800 Subject: [PATCH 02/12] refactor --- src/batch/src/executor/iceberg_scan.rs | 10 ++----- src/connector/src/sink/iceberg/mod.rs | 38 ++++++++++++------------- src/frontend/src/handler/create_sink.rs | 4 +-- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 20dc9df97c70..3c6957004932 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -20,7 +20,6 @@ use arrow_array::RecordBatch; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use icelake::io::{FileScan, TableScan}; -use icelake::TableIdentifier; use risingwave_common::catalog::Schema; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -105,13 +104,10 @@ impl Executor for IcebergScanExecutor { impl IcebergScanExecutor { #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { - let catalog = self.iceberg_config.create_catalog().await?; - - let table_ident = TableIdentifier::new(vec![self.database_name, self.table_name]).unwrap(); - let table = catalog - .load_table(&table_ident) + let table = self.iceberg_config + .load_table() .await - .map_err(BatchError::Iceberg)?; + .map_err(BatchError::Internal)?; let table_scan: TableScan = table .new_scan_builder() diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 25b9ecf5d2dc..2783b15b4ddb 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -349,7 +349,7 @@ impl IcebergConfig { Ok((base_catalog_config, java_catalog_configs)) } - pub async fn create_catalog(&self) -> anyhow::Result { + async fn create_catalog(&self) -> anyhow::Result { match self.catalog_type() { "storage" | "rest" => { let iceberg_configs = self.build_iceberg_configs()?; @@ -380,25 +380,25 @@ impl IcebergConfig { } } } -} -pub async fn create_table(config: &IcebergConfig) -> Result
{ - let catalog = config - .create_catalog() - .await - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; + pub async fn load_table(&self) -> anyhow::Result
{ + let catalog = self + .create_catalog() + .await + .map_err(|e| anyhow!("Unable to load iceberg catalog: {e}"))?; - let table_id = TableIdentifier::new( - vec![config.database_name.as_str()] - .into_iter() - .chain(config.table_name.split('.')), - ) - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - - catalog - .load_table(&table_id) - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err))) + let table_id = TableIdentifier::new( + vec![self.database_name.as_str()] + .into_iter() + .chain(self.table_name.split('.')), + ) + .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; + + catalog + .load_table(&table_id) + .await + .map_err(|err| anyhow!(err)) + } } pub struct IcebergSink { @@ -427,7 +427,7 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_and_validate_table(&self) -> Result
{ - let table = create_table(&self.config).await?; + let table = self.config.load_table().await.map_err(SinkError::Iceberg)?; let sink_schema = self.param.schema(); let iceberg_schema = table diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 2ccd32a4c006..e2e9388e8186 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -28,7 +28,7 @@ use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; -use risingwave_connector::sink::iceberg::{create_table, IcebergConfig, ICEBERG_SINK}; +use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; @@ -321,7 +321,7 @@ pub async fn get_partition_compute_info( async fn get_partition_compute_info_for_iceberg( iceberg_config: &IcebergConfig, ) -> Result> { - let table = create_table(iceberg_config).await?; + let table = iceberg_config.load_table().await?; let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else { return Ok(None); }; From 8f0681bea265ec3f18ea0ee6f92561cebe85726b Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 17:43:23 +0800 Subject: [PATCH 03/12] fmt --- src/batch/src/executor/iceberg_scan.rs | 3 ++- src/connector/src/sink/iceberg/mod.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 3c6957004932..a417192d8124 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -104,7 +104,8 @@ impl Executor for IcebergScanExecutor { impl IcebergScanExecutor { #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { - let table = self.iceberg_config + let table = self + .iceberg_config .load_table() .await .map_err(BatchError::Internal)?; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 2783b15b4ddb..0e4f28f64639 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -392,7 +392,7 @@ impl IcebergConfig { .into_iter() .chain(self.table_name.split('.')), ) - .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; + .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; catalog .load_table(&table_id) From 52de44c014f97819e5c8d48c520a805df6417f7a Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 1 Feb 2024 18:06:49 +0800 Subject: [PATCH 04/12] fix doc --- src/batch/src/executor/iceberg_scan.rs | 48 ++++++++++++++++---------- src/batch/src/executor/mod.rs | 1 + 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index a417192d8124..caf1289220d4 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -32,20 +32,15 @@ use crate::executor::{DataChunk, Executor}; /// /// ``` /// use futures_async_stream::for_await; +/// use risingwave_batch::executor::{Executor, FileSelector, IcebergScanExecutor}; /// use risingwave_common::catalog::{Field, Schema}; /// use risingwave_common::types::DataType; /// use risingwave_connector::sink::iceberg::IcebergConfig; /// -/// use crate::executor::iceberg_scan::{FileSelector, IcebergScanExecutor}; -/// use crate::executor::Executor; -/// /// #[tokio::test] /// async fn test_iceberg_scan() { -/// let iceberg_scan_executor = IcebergScanExecutor { -/// database_name: "demo_db".into(), -/// table_name: "demo_table".into(), -/// file_selector: FileSelector::select_all(), -/// iceberg_config: IcebergConfig { +/// let iceberg_scan_executor = IcebergScanExecutor::new( +/// IcebergConfig { /// database_name: "demo_db".into(), /// table_name: "demo_table".into(), /// catalog_type: Some("storage".into()), @@ -56,15 +51,16 @@ use crate::executor::{DataChunk, Executor}; /// region: Some("us-east-1".into()), /// ..Default::default() /// }, -/// snapshot_id: None, -/// batch_size: 1024, -/// schema: Schema::new(vec![ +/// None, +/// FileSelector::select_all(), +/// 1024, +/// Schema::new(vec![ /// Field::with_name(DataType::Int64, "seq_id"), /// Field::with_name(DataType::Int64, "user_id"), /// Field::with_name(DataType::Varchar, "user_name"), /// ]), -/// identity: "iceberg_scan".into(), -/// }; +/// "iceberg_scan".into(), +/// ); /// /// let stream = Box::new(iceberg_scan_executor).execute(); /// #[for_await] @@ -76,8 +72,6 @@ use crate::executor::{DataChunk, Executor}; /// ``` pub struct IcebergScanExecutor { - database_name: String, - table_name: String, iceberg_config: IcebergConfig, snapshot_id: Option, file_selector: FileSelector, @@ -102,6 +96,24 @@ impl Executor for IcebergScanExecutor { } impl IcebergScanExecutor { + pub fn new( + iceberg_config: IcebergConfig, + snapshot_id: Option, + file_selector: FileSelector, + batch_size: usize, + schema: Schema, + identity: String, + ) -> Self { + Self { + iceberg_config, + snapshot_id, + file_selector, + batch_size, + schema, + identity, + } + } + #[try_stream(ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { let table = self @@ -160,11 +172,11 @@ pub enum FileSelector { } impl FileSelector { - fn select_all() -> Self { + pub fn select_all() -> Self { FileSelector::Hash(0, 1) } - fn select(&self, path: &str) -> bool { + pub fn select(&self, path: &str) -> bool { match self { FileSelector::FileList(paths) => paths.contains(&path.to_string()), FileSelector::Hash(task_id, num_tasks) => { @@ -174,7 +186,7 @@ impl FileSelector { } } - fn hash_str_to_usize(s: &str) -> usize { + pub fn hash_str_to_usize(s: &str) -> usize { let mut hasher = DefaultHasher::new(); s.hash(&mut hasher); hasher.finish() as usize diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index b6344ab9600c..ded2af708982 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -53,6 +53,7 @@ pub use generic_exchange::*; pub use group_top_n::*; pub use hash_agg::*; pub use hop_window::*; +pub use iceberg_scan::*; pub use insert::*; pub use join::*; pub use limit::*; From b5423ea22044fe7ed6386bb8965137613113f9c7 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 2 Feb 2024 18:13:51 +0800 Subject: [PATCH 05/12] support create iceberg source --- src/connector/src/macros.rs | 3 +- src/connector/src/sink/iceberg/mod.rs | 2 +- src/connector/src/source/base.rs | 5 +- src/connector/src/source/iceberg/mod.rs | 189 ++++++++++++++++++ src/connector/src/source/mod.rs | 1 + src/frontend/src/handler/create_source.rs | 10 +- src/frontend/src/handler/create_table.rs | 25 ++- .../src/optimizer/plan_node/logical_source.rs | 15 +- src/meta/src/stream/source_manager.rs | 2 + src/sqlparser/src/ast/statement.rs | 12 ++ 10 files changed, 251 insertions(+), 13 deletions(-) create mode 100644 src/connector/src/source/iceberg/mod.rs diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 9a2383dbb4a9..e34171717ae6 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -36,7 +36,8 @@ macro_rules! for_all_classified_sources { { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> }, - { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit} + { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}, + { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit} } $( ,$extra_args diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 0e4f28f64639..0f7e93968834 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -927,7 +927,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { } /// Try to match our schema with iceberg schema. -fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> { +pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> { if rw_schema.fields.len() != arrow_schema.fields().len() { return Err(SinkError::Iceberg(anyhow!( "Schema length not match, ours is {}, and iceberg is {}", diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index faa857aae0dd..8c170017d34b 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -29,7 +29,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::error::{ErrorSuppressor, RwError}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::types::{JsonbVal, Scalar}; -use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; +use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo, Source}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; @@ -149,9 +149,10 @@ pub struct SourceEnumeratorContext { pub connector_client: Option, } -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct SourceEnumeratorInfo { pub source_id: u32, + pub source: Option, } #[derive(Debug, Default)] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs new file mode 100644 index 000000000000..b29f007a4b44 --- /dev/null +++ b/src/connector/src/source/iceberg/mod.rs @@ -0,0 +1,189 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use async_trait::async_trait; +use itertools::Itertools; +use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; +use risingwave_common::types::JsonbVal; +use serde::{Deserialize, Serialize}; +use simd_json::prelude::ArrayTrait; + +use crate::parser::ParserConfig; +use crate::sink::iceberg::IcebergConfig; +use crate::source::{ + BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, + SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, +}; + +pub const ICEBERG_CONNECTOR: &str = "iceberg"; + +#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] +pub struct IcebergProperties { + #[serde(rename = "catalog.type")] + pub catalog_type: String, + #[serde(rename = "s3.region")] + pub region_name: String, + #[serde(rename = "s3.endpoint", default)] + pub endpoint: String, + #[serde(rename = "s3.access.key", default)] + pub access: String, + #[serde(rename = "s3.secret.key", default)] + pub secret: String, + #[serde(rename = "warehouse.path")] + pub warehouse_path: String, + #[serde(rename = "database.name")] + pub database_name: String, + #[serde(rename = "table.name")] + pub table_name: String, + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl SourceProperties for IcebergProperties { + type Split = IcebergSplit; + type SplitEnumerator = IcebergSplitEnumerator; + type SplitReader = IcebergFileReader; + + const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR; +} + +impl UnknownFields for IcebergProperties { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct IcebergSplit {} + +impl SplitMetaData for IcebergSplit { + fn id(&self) -> SplitId { + unimplemented!() + } + + fn restore_from_json(_value: JsonbVal) -> anyhow::Result { + unimplemented!() + } + + fn encode_to_json(&self) -> JsonbVal { + unimplemented!() + } + + fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> { + unimplemented!() + } +} + +#[derive(Debug, Clone)] +pub struct IcebergSplitEnumerator {} + +#[async_trait] +impl SplitEnumerator for IcebergSplitEnumerator { + type Properties = IcebergProperties; + type Split = IcebergSplit; + + async fn new( + properties: Self::Properties, + context: SourceEnumeratorContextRef, + ) -> anyhow::Result { + match &context.info.source { + Some(source) => { + let iceberg_config = IcebergConfig { + database_name: properties.database_name, + table_name: properties.table_name, + catalog_type: Some(properties.catalog_type), + path: properties.warehouse_path, + endpoint: Some(properties.endpoint), + access_key: properties.access, + secret_key: properties.secret, + region: Some(properties.region_name), + ..Default::default() + }; + + let columns: Vec = source + .columns + .iter() + .cloned() + .map(ColumnCatalog::from) + .collect_vec(); + + let schema = Schema { + fields: columns + .iter() + .map(|c| Field::from(&c.column_desc)) + .collect(), + }; + + let table = iceberg_config.load_table().await?; + + let iceberg_schema: arrow_schema::Schema = table + .current_table_metadata() + .current_schema()? + .clone() + .try_into()?; + + for f1 in schema.fields() { + if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) { + return Err(anyhow::anyhow!(format!( + "Column {} not found in iceberg table", + f1.name + ))); + } + } + + let new_iceberg_field = iceberg_schema + .fields + .iter() + .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) + .cloned() + .collect::>(); + let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field); + + crate::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; + Ok(Self {}) + } + None => unreachable!(), + } + } + + async fn list_splits(&mut self) -> anyhow::Result> { + Ok(vec![]) + } +} + +#[derive(Debug)] +pub struct IcebergFileReader {} + +#[async_trait] +impl SplitReader for IcebergFileReader { + type Properties = IcebergProperties; + type Split = IcebergSplit; + + async fn new( + _props: IcebergProperties, + _splits: Vec, + _parser_config: ParserConfig, + _source_ctx: SourceContextRef, + _columns: Option>, + ) -> anyhow::Result { + unimplemented!() + } + + fn into_stream(self) -> BoxChunkSourceStream { + unimplemented!() + } +} diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index cba63b3005c1..3656820ed95b 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -31,6 +31,7 @@ pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; pub use nats::NATS_CONNECTOR; mod common; +pub mod iceberg; mod manager; pub mod reader; pub mod test_source; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5f25d12650f0..a736f863be8a 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -44,6 +44,7 @@ use risingwave_connector::source::cdc::{ MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ @@ -977,6 +978,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Json], + ), + ICEBERG_CONNECTOR => hashmap!( + Format::Native => vec![Encode::Native], ) )) }); @@ -1213,8 +1217,10 @@ pub async fn handle_create_source( ) .into()); } - - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; + let connector = get_connector(&with_properties) + .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; + let (mut columns, pk_column_ids, row_id_index) = + bind_pk_on_relation(columns, pk_names, ICEBERG_CONNECTOR != connector)?; debug_assert!(is_column_ids_dedup(&columns)); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ee871bc68702..6f884cab4f08 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, }; +use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; @@ -35,6 +36,7 @@ use risingwave_connector::source::cdc::external::{ DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -61,6 +63,7 @@ use crate::handler::create_source::{ bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; +use crate::handler::util::get_connector; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; @@ -411,6 +414,7 @@ fn multiple_pk_definition_err() -> RwError { pub fn bind_pk_on_relation( mut columns: Vec, pk_names: Vec, + must_need_pk: bool, ) -> Result<(Vec, Vec, Option)> { for c in &columns { assert!(c.column_id() != ColumnId::placeholder()); @@ -431,8 +435,10 @@ pub fn bind_pk_on_relation( }) .try_collect()?; - // Add `_row_id` column if `pk_column_ids` is empty. - let row_id_index = pk_column_ids.is_empty().then(|| { + // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk + let need_row_id = pk_column_ids.is_empty() && must_need_pk; + + let row_id_index = need_row_id.then(|| { let column = ColumnCatalog::row_id_column(); let index = columns.len(); pk_column_ids = vec![column.column_id()]; @@ -510,7 +516,14 @@ pub(crate) async fn gen_create_table_plan_with_source( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; + let connector = get_connector(&with_properties) + .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; + if connector == ICEBERG_CONNECTOR { + return Err( + ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), + ); + } + let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; let watermark_descs = bind_source_watermark( session, @@ -594,7 +607,7 @@ pub(crate) fn gen_create_table_plan_without_bind( ) -> Result<(PlanRef, Option, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; + let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; let watermark_descs = bind_source_watermark( context.session_ctx(), @@ -774,7 +787,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( } let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?; + let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -1275,7 +1288,7 @@ mod tests { } ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?; + let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index fbc9fe7a40c8..27250371fa4e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -24,7 +24,8 @@ use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::error::Result; -use risingwave_connector::source::DataType; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; +use risingwave_connector::source::{DataType, UPSTREAM_SOURCE_KEY}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -546,6 +547,18 @@ impl ToStream for LogicalSource { } } } + if let Some(source) = &self.core.catalog { + let connector = &source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .map(|s| s.to_lowercase()) + .unwrap(); + if ICEBERG_CONNECTOR == connector { + return Err( + anyhow::anyhow!("Iceberg source is not supported in stream queries").into(), + ); + } + } Ok(plan) } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 8af470ce7df6..d9a82acb22c5 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -103,6 +103,7 @@ impl ConnectorSourceWorker

{ metrics: self.metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { source_id: self.source_id, + source: None, }, connector_client: self.connector_client.clone(), }), @@ -131,6 +132,7 @@ impl ConnectorSourceWorker

{ metrics: metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { source_id: source.id, + source: Some(source.clone()), }, connector_client: connector_client.clone(), }), diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 3dd923b61054..13ca68b5bf25 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -249,6 +249,18 @@ impl Parser { } else { ConnectorSchema::native().into() }) + } else if connector.contains("iceberg") { + let expected = ConnectorSchema::native(); + if self.peek_source_schema_format() { + let schema = parse_source_schema(self)?.into_v2(); + if schema != expected { + return Err(ParserError::ParserError(format!( + "Row format for iceberg connectors should be \ + either omitted or set to `{expected}`", + ))); + } + } + Ok(expected.into()) } else { Ok(parse_source_schema(self)?) } From b60fe1aae3ceba2e6a27fbefc76c58f0bedd1abc Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 5 Feb 2024 16:26:54 +0800 Subject: [PATCH 06/12] introduce none format --- proto/plan_common.proto | 2 ++ src/connector/src/sink/catalog/mod.rs | 4 ++-- src/connector/src/source/iceberg/mod.rs | 8 ++++---- src/frontend/src/handler/alter_source_with_sr.rs | 2 ++ src/frontend/src/handler/create_sink.rs | 4 ++-- src/frontend/src/handler/create_source.rs | 7 +++++-- src/sqlparser/src/ast/statement.rs | 16 +++++++++++++++- 7 files changed, 32 insertions(+), 11 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index c85ea9ef8a60..0ab62c3aa6c3 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -135,6 +135,7 @@ enum FormatType { FORMAT_TYPE_CANAL = 5; FORMAT_TYPE_UPSERT = 6; FORMAT_TYPE_PLAIN = 7; + FORMAT_TYPE_NONE = 8; } enum EncodeType { @@ -146,6 +147,7 @@ enum EncodeType { ENCODE_TYPE_JSON = 5; ENCODE_TYPE_BYTES = 6; ENCODE_TYPE_TEMPLATE = 7; + ENCODE_TYPE_NONE = 8; } enum RowFormatType { diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index d4e38cac4d1c..7c3f80680fb5 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -205,7 +205,7 @@ impl TryFrom for SinkFormatDesc { F::Plain => SinkFormat::AppendOnly, F::Upsert => SinkFormat::Upsert, F::Debezium => SinkFormat::Debezium, - f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => { + f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => { return Err(SinkError::Config(anyhow!( "sink format unsupported: {}", f.as_str_name() @@ -217,7 +217,7 @@ impl TryFrom for SinkFormatDesc { E::Protobuf => SinkEncode::Protobuf, E::Template => SinkEncode::Template, E::Avro => SinkEncode::Avro, - e @ (E::Unspecified | E::Native | E::Csv | E::Bytes) => { + e @ (E::Unspecified | E::Native | E::Csv | E::Bytes | E::None) => { return Err(SinkError::Config(anyhow!( "sink encode unsupported: {}", e.as_str_name() diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index b29f007a4b44..87171d69c16e 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -39,9 +39,9 @@ pub struct IcebergProperties { #[serde(rename = "s3.endpoint", default)] pub endpoint: String, #[serde(rename = "s3.access.key", default)] - pub access: String, + pub s3_access: String, #[serde(rename = "s3.secret.key", default)] - pub secret: String, + pub s3_secret: String, #[serde(rename = "warehouse.path")] pub warehouse_path: String, #[serde(rename = "database.name")] @@ -108,8 +108,8 @@ impl SplitEnumerator for IcebergSplitEnumerator { catalog_type: Some(properties.catalog_type), path: properties.warehouse_path, endpoint: Some(properties.endpoint), - access_key: properties.access, - secret_key: properties.secret, + access_key: properties.s3_access, + secret_key: properties.s3_secret, region: Some(properties.region_name), ..Default::default() }; diff --git a/src/frontend/src/handler/alter_source_with_sr.rs b/src/frontend/src/handler/alter_source_with_sr.rs index ee46d63ead40..b87c164a7214 100644 --- a/src/frontend/src/handler/alter_source_with_sr.rs +++ b/src/frontend/src/handler/alter_source_with_sr.rs @@ -42,6 +42,7 @@ fn format_type_to_format(from: FormatType) -> Option { FormatType::Canal => Format::Canal, FormatType::Upsert => Format::Upsert, FormatType::Plain => Format::Plain, + FormatType::None => Format::None, }) } @@ -55,6 +56,7 @@ fn encode_type_to_encode(from: EncodeType) -> Option { EncodeType::Json => Encode::Json, EncodeType::Bytes => Encode::Bytes, EncodeType::Template => Encode::Template, + EncodeType::None => Encode::None, }) } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index e2e9388e8186..e314f1066562 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -693,7 +693,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { F::Plain => SinkFormat::AppendOnly, F::Upsert => SinkFormat::Upsert, F::Debezium => SinkFormat::Debezium, - f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal) => { + f @ (F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => { return Err(ErrorCode::BindError(format!("sink format unsupported: {f}")).into()); } }; @@ -702,7 +702,7 @@ fn bind_sink_format_desc(value: ConnectorSchema) -> Result { E::Protobuf => SinkEncode::Protobuf, E::Avro => SinkEncode::Avro, E::Template => SinkEncode::Template, - e @ (E::Native | E::Csv | E::Bytes) => { + e @ (E::Native | E::Csv | E::Bytes | E::None) => { return Err(ErrorCode::BindError(format!("sink encode unsupported: {e}")).into()); } }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a736f863be8a..4d19e41db633 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -317,6 +317,7 @@ pub(crate) async fn bind_columns_from_source( let columns = match (&source_schema.format, &source_schema.row_encode) { (Format::Native, Encode::Native) + | (Format::None, Encode::None) | (Format::Plain, Encode::Bytes) | (Format::DebeziumMongo, Encode::Json) => None, (Format::Plain, Encode::Protobuf) => { @@ -707,7 +708,7 @@ pub(crate) async fn bind_source_pk( .collect_vec(); let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) | (Format::Plain, _) => sql_defined_pk_names, + (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => sql_defined_pk_names, // For all Upsert formats, we only accept one and only key column as primary key. // Additional KEY columns must be set in this case and must be primary key. @@ -980,7 +981,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), ICEBERG_CONNECTOR => hashmap!( - Format::Native => vec![Encode::Native], + Format::None => vec![Encode::None], ) )) }); @@ -1314,6 +1315,7 @@ fn format_to_prost(format: &Format) -> FormatType { Format::DebeziumMongo => FormatType::DebeziumMongo, Format::Maxwell => FormatType::Maxwell, Format::Canal => FormatType::Canal, + Format::None => FormatType::None, } } fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { @@ -1325,6 +1327,7 @@ fn row_encode_to_prost(row_encode: &Encode) -> EncodeType { Encode::Csv => EncodeType::Csv, Encode::Bytes => EncodeType::Bytes, Encode::Template => EncodeType::Template, + Encode::None => EncodeType::None, } } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 13ca68b5bf25..e876a197c265 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -94,6 +94,7 @@ pub struct CreateSourceStatement { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Format { Native, + None, // Keyword::NONE Debezium, // Keyword::DEBEZIUM DebeziumMongo, // Keyword::DEBEZIUM_MONGO Maxwell, // Keyword::MAXWELL @@ -116,6 +117,7 @@ impl fmt::Display for Format { Format::Canal => "CANAL", Format::Upsert => "UPSERT", Format::Plain => "PLAIN", + Format::None => "NONE", } ) } @@ -149,6 +151,7 @@ pub enum Encode { Protobuf, // Keyword::PROTOBUF Json, // Keyword::JSON Bytes, // Keyword::BYTES + None, // Keyword::None Native, Template, } @@ -167,6 +170,7 @@ impl fmt::Display for Encode { Encode::Bytes => "BYTES", Encode::Native => "NATIVE", Encode::Template => "TEMPLATE", + Encode::None => "NONE", } ) } @@ -250,7 +254,7 @@ impl Parser { ConnectorSchema::native().into() }) } else if connector.contains("iceberg") { - let expected = ConnectorSchema::native(); + let expected = ConnectorSchema::none(); if self.peek_source_schema_format() { let schema = parse_source_schema(self)?.into_v2(); if schema != expected { @@ -316,6 +320,16 @@ impl ConnectorSchema { } } + /// Create a new source schema with `None` format and encoding. + /// Used for self-explanatory source like iceberg. + pub const fn none() -> Self { + ConnectorSchema { + format: Format::None, + row_encode: Encode::None, + row_options: Vec::new(), + } + } + pub fn row_options(&self) -> &[SqlOption] { self.row_options.as_ref() } From 6a6124503036e7b544b4028aecc726965efb0d41 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 5 Feb 2024 22:24:17 +0800 Subject: [PATCH 07/12] fix --- src/connector/src/sink/catalog/mod.rs | 7 ++++++- src/frontend/src/handler/create_source.rs | 4 +++- src/frontend/src/handler/create_table.rs | 3 +-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 7c3f80680fb5..e6a654f75a5f 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -205,7 +205,12 @@ impl TryFrom for SinkFormatDesc { F::Plain => SinkFormat::AppendOnly, F::Upsert => SinkFormat::Upsert, F::Debezium => SinkFormat::Debezium, - f @ (F::Unspecified | F::Native | F::DebeziumMongo | F::Maxwell | F::Canal | F::None) => { + f @ (F::Unspecified + | F::Native + | F::DebeziumMongo + | F::Maxwell + | F::Canal + | F::None) => { return Err(SinkError::Config(anyhow!( "sink format unsupported: {}", f.as_str_name() diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index cefe61480d57..4390ac521b42 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -708,7 +708,9 @@ pub(crate) async fn bind_source_pk( .collect_vec(); let res = match (&source_schema.format, &source_schema.row_encode) { - (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => sql_defined_pk_names, + (Format::Native, Encode::Native) | (Format::None, Encode::None) | (Format::Plain, _) => { + sql_defined_pk_names + } // For all Upsert formats, we only accept one and only key column as primary key. // Additional KEY columns must be set in this case and must be primary key. diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 450f19ebd561..a83b91469757 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -27,8 +27,6 @@ use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, }; -use risingwave_common::error::ErrorCode::ProtocolError; -use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; use risingwave_connector::source; @@ -58,6 +56,7 @@ use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId}; +use crate::error::ErrorCode::ProtocolError; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ From 5182d508eaf2b6fcf5ece8d8d32003f4d00e57b8 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 6 Feb 2024 15:53:29 +0800 Subject: [PATCH 08/12] fix test --- src/connector/with_options_source.yaml | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 3ef323aa8857..a41ea5d7da95 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -33,6 +33,35 @@ GcsProperties: field_type: String required: false default: Default::default +IcebergProperties: + fields: + - name: catalog.type + field_type: String + required: true + - name: s3.region + field_type: String + required: true + - name: s3.endpoint + field_type: String + required: false + default: Default::default + - name: s3.access.key + field_type: String + required: false + default: Default::default + - name: s3.secret.key + field_type: String + required: false + default: Default::default + - name: warehouse.path + field_type: String + required: true + - name: database.name + field_type: String + required: true + - name: table.name + field_type: String + required: true KafkaProperties: fields: - name: bytes.per.second From db5dcd443fa99a755853146a104535a9e8bfcd8a Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 6 Feb 2024 16:04:33 +0800 Subject: [PATCH 09/12] fix test --- src/connector/with_options_source.yaml | 54 +++++++++++++------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index a41ea5d7da95..6e3a88694b52 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -35,33 +35,33 @@ GcsProperties: default: Default::default IcebergProperties: fields: - - name: catalog.type - field_type: String - required: true - - name: s3.region - field_type: String - required: true - - name: s3.endpoint - field_type: String - required: false - default: Default::default - - name: s3.access.key - field_type: String - required: false - default: Default::default - - name: s3.secret.key - field_type: String - required: false - default: Default::default - - name: warehouse.path - field_type: String - required: true - - name: database.name - field_type: String - required: true - - name: table.name - field_type: String - required: true + - name: catalog.type + field_type: String + required: true + - name: s3.region + field_type: String + required: true + - name: s3.endpoint + field_type: String + required: false + default: Default::default + - name: s3.access.key + field_type: String + required: false + default: Default::default + - name: s3.secret.key + field_type: String + required: false + default: Default::default + - name: warehouse.path + field_type: String + required: true + - name: database.name + field_type: String + required: true + - name: table.name + field_type: String + required: true KafkaProperties: fields: - name: bytes.per.second From aaada214f30ff3a7dd9e5770c4adc4ef776299b6 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 6 Feb 2024 18:27:59 +0800 Subject: [PATCH 10/12] improve --- src/frontend/src/handler/create_source.rs | 6 ++---- src/frontend/src/handler/create_table.rs | 6 ++---- src/frontend/src/handler/util.rs | 14 ++++++++++++++ 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 4390ac521b42..327efa632120 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -73,7 +73,7 @@ use crate::handler::create_table::{ ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::{ - get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt, + connector_need_pk, get_connector, is_cdc_connector, is_kafka_connector, SourceSchemaCompatExt, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; @@ -1220,10 +1220,8 @@ pub async fn handle_create_source( ) .into()); } - let connector = get_connector(&with_properties) - .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; let (mut columns, pk_column_ids, row_id_index) = - bind_pk_on_relation(columns, pk_names, ICEBERG_CONNECTOR != connector)?; + bind_pk_on_relation(columns, pk_names, connector_need_pk(&with_properties))?; debug_assert!(is_column_ids_dedup(&columns)); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index a83b91469757..2a89b15e89fb 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -63,7 +63,7 @@ use crate::handler::create_source::{ bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; -use crate::handler::util::get_connector; +use crate::handler::util::{get_connector, is_iceberg_connector}; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; @@ -516,9 +516,7 @@ pub(crate) async fn gen_create_table_plan_with_source( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let connector = get_connector(&with_properties) - .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; - if connector == ICEBERG_CONNECTOR { + if is_iceberg_connector(&with_properties) { return Err( ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), ); diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 6e91cf53f0b3..a4b405c8c24b 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -31,6 +31,7 @@ use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::row::Row as _; use risingwave_common::types::{DataType, ScalarRefImpl, Timestamptz}; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{display_comma_separated, CompatibleSourceSchema, ConnectorSchema}; @@ -241,6 +242,11 @@ pub fn to_pg_field(f: &Field) -> PgFieldDescriptor { ) } +pub fn connector_need_pk(with_properties: &HashMap) -> bool { + // Currently only iceberg connector doesn't need primary key + !is_iceberg_connector(with_properties) +} + #[inline(always)] pub fn get_connector(with_properties: &HashMap) -> Option { with_properties @@ -265,6 +271,14 @@ pub fn is_cdc_connector(with_properties: &HashMap) -> bool { connector.contains("-cdc") } +#[inline(always)] +pub fn is_iceberg_connector(with_properties: &HashMap) -> bool { + let Some(connector) = get_connector(with_properties) else { + return false; + }; + connector == ICEBERG_CONNECTOR +} + #[easy_ext::ext(SourceSchemaCompatExt)] impl CompatibleSourceSchema { /// Convert `self` to [`ConnectorSchema`] and warn the user if the syntax is deprecated. From ea78079bfbf0d261c8eb1c6f16faa141f6063e42 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 7 Feb 2024 15:06:43 +0800 Subject: [PATCH 11/12] refactor --- src/connector/src/source/base.rs | 3 +- src/connector/src/source/iceberg/mod.rs | 67 +--------------- src/frontend/src/handler/create_source.rs | 95 ++++++++++++++++++++--- src/frontend/src/handler/create_table.rs | 6 +- src/meta/src/stream/source_manager.rs | 2 - 5 files changed, 91 insertions(+), 82 deletions(-) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index c078bcb41a63..30682f022954 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -29,7 +29,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::error::ErrorSuppressor; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::types::{JsonbVal, Scalar}; -use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo, Source}; +use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; @@ -153,7 +153,6 @@ pub struct SourceEnumeratorContext { #[derive(Clone, Debug, Default)] pub struct SourceEnumeratorInfo { pub source_id: u32, - pub source: Option, } #[derive(Debug, Default)] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 87171d69c16e..e274f639f15b 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -15,14 +15,10 @@ use std::collections::HashMap; use async_trait::async_trait; -use itertools::Itertools; -use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; -use simd_json::prelude::ArrayTrait; use crate::parser::ParserConfig; -use crate::sink::iceberg::IcebergConfig; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, @@ -97,67 +93,10 @@ impl SplitEnumerator for IcebergSplitEnumerator { type Split = IcebergSplit; async fn new( - properties: Self::Properties, - context: SourceEnumeratorContextRef, + _properties: Self::Properties, + _context: SourceEnumeratorContextRef, ) -> anyhow::Result { - match &context.info.source { - Some(source) => { - let iceberg_config = IcebergConfig { - database_name: properties.database_name, - table_name: properties.table_name, - catalog_type: Some(properties.catalog_type), - path: properties.warehouse_path, - endpoint: Some(properties.endpoint), - access_key: properties.s3_access, - secret_key: properties.s3_secret, - region: Some(properties.region_name), - ..Default::default() - }; - - let columns: Vec = source - .columns - .iter() - .cloned() - .map(ColumnCatalog::from) - .collect_vec(); - - let schema = Schema { - fields: columns - .iter() - .map(|c| Field::from(&c.column_desc)) - .collect(), - }; - - let table = iceberg_config.load_table().await?; - - let iceberg_schema: arrow_schema::Schema = table - .current_table_metadata() - .current_schema()? - .clone() - .try_into()?; - - for f1 in schema.fields() { - if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) { - return Err(anyhow::anyhow!(format!( - "Column {} not found in iceberg table", - f1.name - ))); - } - } - - let new_iceberg_field = iceberg_schema - .fields - .iter() - .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) - .cloned() - .collect::>(); - let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field); - - crate::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; - Ok(Self {}) - } - None => unreachable!(), - } + Ok(Self {}) } async fn list_splits(&mut self) -> anyhow::Result> { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 327efa632120..c34cc3b75c5d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -16,13 +16,13 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; use std::sync::LazyLock; -use anyhow::Context; +use anyhow::{anyhow, Context}; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ - is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, INITIAL_SOURCE_VERSION_ID, + is_column_ids_dedup, ColumnCatalog, ColumnDesc, Schema, TableId, INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::types::DataType; @@ -36,6 +36,7 @@ use risingwave_connector::parser::{ use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, }; +use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::external::CdcTableType; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CITUS_CDC_CONNECTOR, @@ -46,8 +47,9 @@ use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ - GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, - NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, + KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, + PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, @@ -1061,12 +1063,11 @@ pub fn validate_compatibility( } /// Performs early stage checking in frontend to see if the schema of the given `columns` is -/// compatible with the connector extracted from the properties. Currently this only works for -/// `nexmark` connector since it's in chunk format. +/// compatible with the connector extracted from the properties. /// /// One should only call this function after all properties of all columns are resolved, like /// generated column descriptors. -pub(super) fn check_source_schema( +pub(super) async fn check_source_schema( props: &HashMap, row_id_index: Option, columns: &[ColumnCatalog], @@ -1075,10 +1076,22 @@ pub(super) fn check_source_schema( return Ok(()); }; - if connector != NEXMARK_CONNECTOR { - return Ok(()); + if connector == NEXMARK_CONNECTOR { + check_nexmark_schema(props, row_id_index, columns) + } else if connector == ICEBERG_CONNECTOR { + Ok(check_iceberg_source(props, columns) + .await + .map_err(|err| ProtocolError(err.to_string().into()))?) + } else { + Ok(()) } +} +pub(super) fn check_nexmark_schema( + props: &HashMap, + row_id_index: Option, + columns: &[ColumnCatalog], +) -> Result<()> { let table_type = props .get("nexmark.table.type") .map(|t| t.to_ascii_lowercase()); @@ -1128,6 +1141,68 @@ pub(super) fn check_source_schema( Ok(()) } +pub async fn check_iceberg_source( + props: &HashMap, + columns: &[ColumnCatalog], +) -> anyhow::Result<()> { + let props = ConnectorProperties::extract(props.clone(), true)?; + let ConnectorProperties::Iceberg(properties) = props else { + return Err(anyhow!(format!( + "Invalid properties for iceberg source: {:?}", + props + ))); + }; + + let iceberg_config = IcebergConfig { + database_name: properties.database_name, + table_name: properties.table_name, + catalog_type: Some(properties.catalog_type), + path: properties.warehouse_path, + endpoint: Some(properties.endpoint), + access_key: properties.s3_access, + secret_key: properties.s3_secret, + region: Some(properties.region_name), + ..Default::default() + }; + + let schema = Schema { + fields: columns + .iter() + .cloned() + .map(|c| c.column_desc.into()) + .collect(), + }; + + let table = iceberg_config.load_table().await?; + + let iceberg_schema: arrow_schema::Schema = table + .current_table_metadata() + .current_schema()? + .clone() + .try_into()?; + + for f1 in schema.fields() { + if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) { + return Err(anyhow::anyhow!(format!( + "Column {} not found in iceberg table", + f1.name + ))); + } + } + + let new_iceberg_field = iceberg_schema + .fields + .iter() + .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) + .cloned() + .collect::>(); + let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field); + + risingwave_connector::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; + + Ok(()) +} + pub async fn handle_create_source( handler_args: HandlerArgs, stmt: CreateSourceStatement, @@ -1238,7 +1313,7 @@ pub async fn handle_create_source( &pk_column_ids, )?; - check_source_schema(&with_properties, row_id_index, &columns)?; + check_source_schema(&with_properties, row_id_index, &columns).await?; let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect(); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 2a89b15e89fb..7fc757b71b6b 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -34,7 +34,6 @@ use risingwave_connector::source::cdc::external::{ DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY; -use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -56,14 +55,13 @@ use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId}; -use crate::error::ErrorCode::ProtocolError; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; -use crate::handler::util::{get_connector, is_iceberg_connector}; +use crate::handler::util::is_iceberg_connector; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; @@ -542,7 +540,7 @@ pub(crate) async fn gen_create_table_plan_with_source( &pk_column_ids, )?; - check_source_schema(&with_properties, row_id_index, &columns)?; + check_source_schema(&with_properties, row_id_index, &columns).await?; gen_table_plan_inner( context.into(), diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index d9a82acb22c5..8af470ce7df6 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -103,7 +103,6 @@ impl ConnectorSourceWorker

{ metrics: self.metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { source_id: self.source_id, - source: None, }, connector_client: self.connector_client.clone(), }), @@ -132,7 +131,6 @@ impl ConnectorSourceWorker

{ metrics: metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { source_id: source.id, - source: Some(source.clone()), }, connector_client: connector_client.clone(), }), From 5556ad2757728b95657c2ab49e659c56aec8b63c Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 21 Feb 2024 10:16:46 +0800 Subject: [PATCH 12/12] fmt --- src/frontend/src/handler/create_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index c34cc3b75c5d..44c854a7c4ee 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1081,7 +1081,7 @@ pub(super) async fn check_source_schema( } else if connector == ICEBERG_CONNECTOR { Ok(check_iceberg_source(props, columns) .await - .map_err(|err| ProtocolError(err.to_string().into()))?) + .map_err(|err| ProtocolError(err.to_string()))?) } else { Ok(()) }