From b5423ea22044fe7ed6386bb8965137613113f9c7 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 2 Feb 2024 18:13:51 +0800 Subject: [PATCH] support create iceberg source --- src/connector/src/macros.rs | 3 +- src/connector/src/sink/iceberg/mod.rs | 2 +- src/connector/src/source/base.rs | 5 +- src/connector/src/source/iceberg/mod.rs | 189 ++++++++++++++++++ src/connector/src/source/mod.rs | 1 + src/frontend/src/handler/create_source.rs | 10 +- src/frontend/src/handler/create_table.rs | 25 ++- .../src/optimizer/plan_node/logical_source.rs | 15 +- src/meta/src/stream/source_manager.rs | 2 + src/sqlparser/src/ast/statement.rs | 12 ++ 10 files changed, 251 insertions(+), 13 deletions(-) create mode 100644 src/connector/src/source/iceberg/mod.rs diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index 9a2383dbb4a96..e34171717ae6c 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -36,7 +36,8 @@ macro_rules! for_all_classified_sources { { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> }, - { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit} + { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}, + { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit} } $( ,$extra_args diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 0e4f28f64639c..0f7e939688341 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -927,7 +927,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { } /// Try to match our schema with iceberg schema. -fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> { +pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> { if rw_schema.fields.len() != arrow_schema.fields().len() { return Err(SinkError::Iceberg(anyhow!( "Schema length not match, ours is {}, and iceberg is {}", diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index faa857aae0dd6..8c170017d34b1 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -29,7 +29,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::error::{ErrorSuppressor, RwError}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::types::{JsonbVal, Scalar}; -use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; +use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo, Source}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; use risingwave_rpc_client::ConnectorClient; @@ -149,9 +149,10 @@ pub struct SourceEnumeratorContext { pub connector_client: Option, } -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct SourceEnumeratorInfo { pub source_id: u32, + pub source: Option, } #[derive(Debug, Default)] diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs new file mode 100644 index 0000000000000..b29f007a4b44d --- /dev/null +++ b/src/connector/src/source/iceberg/mod.rs @@ -0,0 +1,189 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use async_trait::async_trait; +use itertools::Itertools; +use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; +use risingwave_common::types::JsonbVal; +use serde::{Deserialize, Serialize}; +use simd_json::prelude::ArrayTrait; + +use crate::parser::ParserConfig; +use crate::sink::iceberg::IcebergConfig; +use crate::source::{ + BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, + SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, +}; + +pub const ICEBERG_CONNECTOR: &str = "iceberg"; + +#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] +pub struct IcebergProperties { + #[serde(rename = "catalog.type")] + pub catalog_type: String, + #[serde(rename = "s3.region")] + pub region_name: String, + #[serde(rename = "s3.endpoint", default)] + pub endpoint: String, + #[serde(rename = "s3.access.key", default)] + pub access: String, + #[serde(rename = "s3.secret.key", default)] + pub secret: String, + #[serde(rename = "warehouse.path")] + pub warehouse_path: String, + #[serde(rename = "database.name")] + pub database_name: String, + #[serde(rename = "table.name")] + pub table_name: String, + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +impl SourceProperties for IcebergProperties { + type Split = IcebergSplit; + type SplitEnumerator = IcebergSplitEnumerator; + type SplitReader = IcebergFileReader; + + const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR; +} + +impl UnknownFields for IcebergProperties { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct IcebergSplit {} + +impl SplitMetaData for IcebergSplit { + fn id(&self) -> SplitId { + unimplemented!() + } + + fn restore_from_json(_value: JsonbVal) -> anyhow::Result { + unimplemented!() + } + + fn encode_to_json(&self) -> JsonbVal { + unimplemented!() + } + + fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> { + unimplemented!() + } +} + +#[derive(Debug, Clone)] +pub struct IcebergSplitEnumerator {} + +#[async_trait] +impl SplitEnumerator for IcebergSplitEnumerator { + type Properties = IcebergProperties; + type Split = IcebergSplit; + + async fn new( + properties: Self::Properties, + context: SourceEnumeratorContextRef, + ) -> anyhow::Result { + match &context.info.source { + Some(source) => { + let iceberg_config = IcebergConfig { + database_name: properties.database_name, + table_name: properties.table_name, + catalog_type: Some(properties.catalog_type), + path: properties.warehouse_path, + endpoint: Some(properties.endpoint), + access_key: properties.access, + secret_key: properties.secret, + region: Some(properties.region_name), + ..Default::default() + }; + + let columns: Vec = source + .columns + .iter() + .cloned() + .map(ColumnCatalog::from) + .collect_vec(); + + let schema = Schema { + fields: columns + .iter() + .map(|c| Field::from(&c.column_desc)) + .collect(), + }; + + let table = iceberg_config.load_table().await?; + + let iceberg_schema: arrow_schema::Schema = table + .current_table_metadata() + .current_schema()? + .clone() + .try_into()?; + + for f1 in schema.fields() { + if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) { + return Err(anyhow::anyhow!(format!( + "Column {} not found in iceberg table", + f1.name + ))); + } + } + + let new_iceberg_field = iceberg_schema + .fields + .iter() + .filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name)) + .cloned() + .collect::>(); + let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field); + + crate::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?; + Ok(Self {}) + } + None => unreachable!(), + } + } + + async fn list_splits(&mut self) -> anyhow::Result> { + Ok(vec![]) + } +} + +#[derive(Debug)] +pub struct IcebergFileReader {} + +#[async_trait] +impl SplitReader for IcebergFileReader { + type Properties = IcebergProperties; + type Split = IcebergSplit; + + async fn new( + _props: IcebergProperties, + _splits: Vec, + _parser_config: ParserConfig, + _source_ctx: SourceContextRef, + _columns: Option>, + ) -> anyhow::Result { + unimplemented!() + } + + fn into_stream(self) -> BoxChunkSourceStream { + unimplemented!() + } +} diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index cba63b3005c1a..3656820ed95b0 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -31,6 +31,7 @@ pub use kafka::KAFKA_CONNECTOR; pub use kinesis::KINESIS_CONNECTOR; pub use nats::NATS_CONNECTOR; mod common; +pub mod iceberg; mod manager; pub mod reader; pub mod test_source; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5f25d12650f0c..a736f863be8a8 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -44,6 +44,7 @@ use risingwave_connector::source::cdc::{ MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ @@ -977,6 +978,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Json], + ), + ICEBERG_CONNECTOR => hashmap!( + Format::Native => vec![Encode::Native], ) )) }); @@ -1213,8 +1217,10 @@ pub async fn handle_create_source( ) .into()); } - - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; + let connector = get_connector(&with_properties) + .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; + let (mut columns, pk_column_ids, row_id_index) = + bind_pk_on_relation(columns, pk_names, ICEBERG_CONNECTOR != connector)?; debug_assert!(is_column_ids_dedup(&columns)); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index ee871bc68702f..6f884cab4f086 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME, INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET, }; +use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_common::util::value_encoding::DatumToProtoExt; @@ -35,6 +36,7 @@ use risingwave_connector::source::cdc::external::{ DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY, }; use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; @@ -61,6 +63,7 @@ use crate::handler::create_source::{ bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; +use crate::handler::util::get_connector; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::SourceNodeKind; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; @@ -411,6 +414,7 @@ fn multiple_pk_definition_err() -> RwError { pub fn bind_pk_on_relation( mut columns: Vec, pk_names: Vec, + must_need_pk: bool, ) -> Result<(Vec, Vec, Option)> { for c in &columns { assert!(c.column_id() != ColumnId::placeholder()); @@ -431,8 +435,10 @@ pub fn bind_pk_on_relation( }) .try_collect()?; - // Add `_row_id` column if `pk_column_ids` is empty. - let row_id_index = pk_column_ids.is_empty().then(|| { + // Add `_row_id` column if `pk_column_ids` is empty and must_need_pk + let need_row_id = pk_column_ids.is_empty() && must_need_pk; + + let row_id_index = need_row_id.then(|| { let column = ColumnCatalog::row_id_column(); let index = columns.len(); pk_column_ids = vec![column.column_id()]; @@ -510,7 +516,14 @@ pub(crate) async fn gen_create_table_plan_with_source( c.column_desc.column_id = col_id_gen.generate(c.name()) } - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; + let connector = get_connector(&with_properties) + .ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?; + if connector == ICEBERG_CONNECTOR { + return Err( + ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), + ); + } + let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; let watermark_descs = bind_source_watermark( session, @@ -594,7 +607,7 @@ pub(crate) fn gen_create_table_plan_without_bind( ) -> Result<(PlanRef, Option, PbTable)> { ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; + let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; let watermark_descs = bind_source_watermark( context.session_ctx(), @@ -774,7 +787,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( } let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?; + let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -1275,7 +1288,7 @@ mod tests { } ensure_table_constraints_supported(&constraints)?; let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?; + let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; Ok(pk_column_ids) })(); match (expected, actual) { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index fbc9fe7a40c8c..27250371fa4ee 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -24,7 +24,8 @@ use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::error::Result; -use risingwave_connector::source::DataType; +use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; +use risingwave_connector::source::{DataType, UPSTREAM_SOURCE_KEY}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; @@ -546,6 +547,18 @@ impl ToStream for LogicalSource { } } } + if let Some(source) = &self.core.catalog { + let connector = &source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .map(|s| s.to_lowercase()) + .unwrap(); + if ICEBERG_CONNECTOR == connector { + return Err( + anyhow::anyhow!("Iceberg source is not supported in stream queries").into(), + ); + } + } Ok(plan) } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 8af470ce7df65..d9a82acb22c58 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -103,6 +103,7 @@ impl ConnectorSourceWorker

{ metrics: self.metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { source_id: self.source_id, + source: None, }, connector_client: self.connector_client.clone(), }), @@ -131,6 +132,7 @@ impl ConnectorSourceWorker

{ metrics: metrics.source_enumerator_metrics.clone(), info: SourceEnumeratorInfo { source_id: source.id, + source: Some(source.clone()), }, connector_client: connector_client.clone(), }), diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 3dd923b610542..13ca68b5bf255 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -249,6 +249,18 @@ impl Parser { } else { ConnectorSchema::native().into() }) + } else if connector.contains("iceberg") { + let expected = ConnectorSchema::native(); + if self.peek_source_schema_format() { + let schema = parse_source_schema(self)?.into_v2(); + if schema != expected { + return Err(ParserError::ParserError(format!( + "Row format for iceberg connectors should be \ + either omitted or set to `{expected}`", + ))); + } + } + Ok(expected.into()) } else { Ok(parse_source_schema(self)?) }