Skip to content

Commit

Permalink
support create iceberg source
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Feb 2, 2024
1 parent d3a6609 commit b5423ea
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ macro_rules! for_all_classified_sources {
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
}
$(
,$extra_args
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
}

/// Try to match our schema with iceberg schema.
fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
pub fn try_matches_arrow_schema(rw_schema: &Schema, arrow_schema: &ArrowSchema) -> Result<()> {
if rw_schema.fields.len() != arrow_schema.fields().len() {
return Err(SinkError::Iceberg(anyhow!(
"Schema length not match, ours is {}, and iceberg is {}",
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorSuppressor, RwError};
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo, Source};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use risingwave_rpc_client::ConnectorClient;
Expand Down Expand Up @@ -149,9 +149,10 @@ pub struct SourceEnumeratorContext {
pub connector_client: Option<ConnectorClient>,
}

#[derive(Clone, Copy, Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct SourceEnumeratorInfo {
pub source_id: u32,
pub source: Option<Source>,
}

#[derive(Debug, Default)]
Expand Down
189 changes: 189 additions & 0 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::JsonbVal;
use serde::{Deserialize, Serialize};
use simd_json::prelude::ArrayTrait;

use crate::parser::ParserConfig;
use crate::sink::iceberg::IcebergConfig;
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties,
SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields,
};

pub const ICEBERG_CONNECTOR: &str = "iceberg";

#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct IcebergProperties {
#[serde(rename = "catalog.type")]
pub catalog_type: String,
#[serde(rename = "s3.region")]
pub region_name: String,
#[serde(rename = "s3.endpoint", default)]
pub endpoint: String,
#[serde(rename = "s3.access.key", default)]
pub access: String,
#[serde(rename = "s3.secret.key", default)]
pub secret: String,
#[serde(rename = "warehouse.path")]
pub warehouse_path: String,
#[serde(rename = "database.name")]
pub database_name: String,
#[serde(rename = "table.name")]
pub table_name: String,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl SourceProperties for IcebergProperties {
type Split = IcebergSplit;
type SplitEnumerator = IcebergSplitEnumerator;
type SplitReader = IcebergFileReader;

const SOURCE_NAME: &'static str = ICEBERG_CONNECTOR;
}

impl UnknownFields for IcebergProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct IcebergSplit {}

impl SplitMetaData for IcebergSplit {
fn id(&self) -> SplitId {
unimplemented!()
}

fn restore_from_json(_value: JsonbVal) -> anyhow::Result<Self> {
unimplemented!()
}

fn encode_to_json(&self) -> JsonbVal {
unimplemented!()
}

fn update_with_offset(&mut self, _start_offset: String) -> anyhow::Result<()> {
unimplemented!()
}
}

#[derive(Debug, Clone)]
pub struct IcebergSplitEnumerator {}

#[async_trait]
impl SplitEnumerator for IcebergSplitEnumerator {
type Properties = IcebergProperties;
type Split = IcebergSplit;

async fn new(
properties: Self::Properties,
context: SourceEnumeratorContextRef,
) -> anyhow::Result<Self> {
match &context.info.source {
Some(source) => {
let iceberg_config = IcebergConfig {
database_name: properties.database_name,
table_name: properties.table_name,
catalog_type: Some(properties.catalog_type),
path: properties.warehouse_path,
endpoint: Some(properties.endpoint),
access_key: properties.access,
secret_key: properties.secret,
region: Some(properties.region_name),
..Default::default()
};

let columns: Vec<ColumnCatalog> = source
.columns
.iter()
.cloned()
.map(ColumnCatalog::from)
.collect_vec();

let schema = Schema {
fields: columns
.iter()
.map(|c| Field::from(&c.column_desc))
.collect(),
};

let table = iceberg_config.load_table().await?;

let iceberg_schema: arrow_schema::Schema = table
.current_table_metadata()
.current_schema()?
.clone()
.try_into()?;

for f1 in schema.fields() {
if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) {
return Err(anyhow::anyhow!(format!(
"Column {} not found in iceberg table",
f1.name
)));
}
}

let new_iceberg_field = iceberg_schema
.fields
.iter()
.filter(|f1| schema.fields.iter().any(|f2| f1.name() == &f2.name))
.cloned()
.collect::<Vec<_>>();
let new_iceberg_schema = arrow_schema::Schema::new(new_iceberg_field);

crate::sink::iceberg::try_matches_arrow_schema(&schema, &new_iceberg_schema)?;
Ok(Self {})
}
None => unreachable!(),
}
}

async fn list_splits(&mut self) -> anyhow::Result<Vec<Self::Split>> {
Ok(vec![])
}
}

#[derive(Debug)]
pub struct IcebergFileReader {}

#[async_trait]
impl SplitReader for IcebergFileReader {
type Properties = IcebergProperties;
type Split = IcebergSplit;

async fn new(
_props: IcebergProperties,
_splits: Vec<IcebergSplit>,
_parser_config: ParserConfig,
_source_ctx: SourceContextRef,
_columns: Option<Vec<Column>>,
) -> anyhow::Result<Self> {
unimplemented!()
}

fn into_stream(self) -> BoxChunkSourceStream {
unimplemented!()
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use kafka::KAFKA_CONNECTOR;
pub use kinesis::KINESIS_CONNECTOR;
pub use nats::NATS_CONNECTOR;
mod common;
pub mod iceberg;
mod manager;
pub mod reader;
pub mod test_source;
Expand Down
10 changes: 8 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use risingwave_connector::source::cdc::{
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
};
use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_connector::source::{
Expand Down Expand Up @@ -977,6 +978,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
),
TEST_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json],
),
ICEBERG_CONNECTOR => hashmap!(
Format::Native => vec![Encode::Native],
)
))
});
Expand Down Expand Up @@ -1213,8 +1217,10 @@ pub async fn handle_create_source(
)
.into());
}

let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?;
let connector = get_connector(&with_properties)
.ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?;
let (mut columns, pk_column_ids, row_id_index) =
bind_pk_on_relation(columns, pk_names, ICEBERG_CONNECTOR != connector)?;

debug_assert!(is_column_ids_dedup(&columns));

Expand Down
25 changes: 19 additions & 6 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::catalog::{
CdcTableDesc, ColumnCatalog, ColumnDesc, TableId, TableVersionId, DEFAULT_SCHEMA_NAME,
INITIAL_SOURCE_VERSION_ID, INITIAL_TABLE_VERSION_ID, USER_COLUMN_ID_OFFSET,
};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_common::util::value_encoding::DatumToProtoExt;
Expand All @@ -35,6 +36,7 @@ use risingwave_connector::source::cdc::external::{
DATABASE_NAME_KEY, SCHEMA_NAME_KEY, TABLE_NAME_KEY,
};
use risingwave_connector::source::cdc::CDC_BACKFILL_ENABLE_KEY;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc};
use risingwave_pb::ddl_service::TableJobType;
Expand All @@ -61,6 +63,7 @@ use crate::handler::create_source::{
bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark,
check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY,
};
use crate::handler::util::get_connector;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource};
Expand Down Expand Up @@ -411,6 +414,7 @@ fn multiple_pk_definition_err() -> RwError {
pub fn bind_pk_on_relation(
mut columns: Vec<ColumnCatalog>,
pk_names: Vec<String>,
must_need_pk: bool,
) -> Result<(Vec<ColumnCatalog>, Vec<ColumnId>, Option<usize>)> {
for c in &columns {
assert!(c.column_id() != ColumnId::placeholder());
Expand All @@ -431,8 +435,10 @@ pub fn bind_pk_on_relation(
})
.try_collect()?;

// Add `_row_id` column if `pk_column_ids` is empty.
let row_id_index = pk_column_ids.is_empty().then(|| {
// Add `_row_id` column if `pk_column_ids` is empty and must_need_pk
let need_row_id = pk_column_ids.is_empty() && must_need_pk;

let row_id_index = need_row_id.then(|| {
let column = ColumnCatalog::row_id_column();
let index = columns.len();
pk_column_ids = vec![column.column_id()];
Expand Down Expand Up @@ -510,7 +516,14 @@ pub(crate) async fn gen_create_table_plan_with_source(
c.column_desc.column_id = col_id_gen.generate(c.name())
}

let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?;
let connector = get_connector(&with_properties)
.ok_or_else(|| RwError::from(ProtocolError("missing field 'connector'".to_string())))?;
if connector == ICEBERG_CONNECTOR {
return Err(
ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(),
);
}
let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?;

let watermark_descs = bind_source_watermark(
session,
Expand Down Expand Up @@ -594,7 +607,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
ensure_table_constraints_supported(&constraints)?;
let pk_names = bind_sql_pk_names(&column_defs, &constraints)?;
let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?;
let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names, true)?;

let watermark_descs = bind_source_watermark(
context.session_ctx(),
Expand Down Expand Up @@ -774,7 +787,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
}

let pk_names = bind_sql_pk_names(&column_defs, &constraints)?;
let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?;
let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?;

let definition = context.normalized_sql().to_owned();

Expand Down Expand Up @@ -1275,7 +1288,7 @@ mod tests {
}
ensure_table_constraints_supported(&constraints)?;
let pk_names = bind_sql_pk_names(&column_defs, &constraints)?;
let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?;
let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?;
Ok(pk_column_ids)
})();
match (expected, actual) {
Expand Down
15 changes: 14 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, Field, Schema, KAFKA_TIMESTAMP_COLUMN_NAME,
};
use risingwave_common::error::Result;
use risingwave_connector::source::DataType;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
use risingwave_connector::source::{DataType, UPSTREAM_SOURCE_KEY};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::GeneratedColumnDesc;

Expand Down Expand Up @@ -546,6 +547,18 @@ impl ToStream for LogicalSource {
}
}
}
if let Some(source) = &self.core.catalog {
let connector = &source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.map(|s| s.to_lowercase())
.unwrap();
if ICEBERG_CONNECTOR == connector {
return Err(
anyhow::anyhow!("Iceberg source is not supported in stream queries").into(),
);
}
}
Ok(plan)
}

Expand Down
Loading

0 comments on commit b5423ea

Please sign in to comment.