diff --git a/Cargo.lock b/Cargo.lock index 0377af8aec5d..206772f2d0ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5096,7 +5096,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=32c0bbf242f5c47b1e743f10577012fe7436c770#32c0bbf242f5c47b1e743f10577012fe7436c770" +source = "git+https://github.com/icelake-io/icelake?rev=3f61f900d6914d4a28c00c2af6374a4dda6d95d4#3f61f900d6914d4a28c00c2af6374a4dda6d95d4" dependencies = [ "anyhow", "apache-avro 0.17.0", @@ -8979,7 +8979,9 @@ dependencies = [ "anyhow", "apache-avro 0.16.0", "arrow-array 50.0.0", + "arrow-row 50.0.0", "arrow-schema 50.0.0", + "arrow-select 50.0.0", "assert_matches", "async-nats", "async-trait", @@ -9285,6 +9287,7 @@ dependencies = [ "futures", "futures-async-stream", "iana-time-zone", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/Cargo.toml b/Cargo.toml index d6fc352af369..37b8a5c1e6ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f61f900d6914d4a28c00c2af6374a4dda6d95d4", features = [ "prometheus", ] } arrow-array = "50" diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index 0e8054a4946a..c3bac96c9654 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -42,6 +42,8 @@ bash ./start_spark_connect_server.sh "$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml "$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_append_only.toml "$HOME"/.local/bin/poetry run python main.py -t ./test_case/partition_upsert.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/range_partition_append_only.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/range_partition_upsert.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/start_spark_connect_server.sh b/e2e_test/iceberg/start_spark_connect_server.sh index cf3bff1e3991..9e263e70c8e3 100755 --- a/e2e_test/iceberg/start_spark_connect_server.sh +++ b/e2e_test/iceberg/start_spark_connect_server.sh @@ -8,9 +8,10 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION" SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz" - -wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE -tar -xzf $SPARK_FILE --no-same-owner +if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then + wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE + tar -xzf $SPARK_FILE --no-same-owner +fi ./spark-${SPARK_VERSION}-bin-hadoop3/sbin/start-connect-server.sh --packages $PACKAGES \ --master local[3] \ @@ -31,4 +32,4 @@ while ! nc -z localhost 15002; do sleep 1 # wait for 1/10 of the second before check again done -echo "Spark connect server launched" \ No newline at end of file +echo "Spark connect server launched" diff --git a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt index f867f0d74645..8d70c9b2e19a 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt @@ -2,13 +2,13 @@ statement ok set streaming_parallelism=4; statement ok -CREATE TABLE t6 (id int, v1 int primary key, v2 bigint, v3 varchar); +CREATE TABLE t6 (id int, v1 int primary key, v2 bigint, v3 varchar, v4 date); statement ok CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; statement ok -CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( +CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3, mv6.v4 as v4 from mv6 WITH ( connector = 'iceberg', type = 'upsert', force_append_only = 'false', @@ -24,7 +24,7 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 ); statement ok -INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1, 5, 2, '5-2'), (1, 8, 2, '8-2'), (1, 13, 2, '13-2'), (1, 21, 2, '21-2'); +INSERT INTO t6 VALUES (1, 1, 2, '1-2', '2022-03-11'), (1, 2, 2, '2-2', '2022-03-12'), (1, 3, 2, '3-2', '2022-03-13'), (1, 5, 2, '5-2', '2022-03-15'), (1, 8, 2, '8-2', '2022-03-18'), (1, 13, 2, '13-2', '2022-03-13'), (1, 21, 2, '21-2', '2022-03-21'); statement ok FLUSH; @@ -32,7 +32,7 @@ FLUSH; sleep 5s statement ok -INSERT INTO t6 VALUES (1, 1, 50, '1-50'); +INSERT INTO t6 VALUES (1, 1, 50, '1-50', '2022-03-11'); statement ok FLUSH; diff --git a/e2e_test/iceberg/test_case/no_partition_upsert.toml b/e2e_test/iceberg/test_case/no_partition_upsert.toml index 0e0215d37465..cd380c53da88 100644 --- a/e2e_test/iceberg/test_case/no_partition_upsert.toml +++ b/e2e_test/iceberg/test_case/no_partition_upsert.toml @@ -6,7 +6,8 @@ init_sqls = [ id int, v1 int, v2 long, - v3 string + v3 string, + v4 date ) USING iceberg TBLPROPERTIES ('format-version'='2'); ''' @@ -19,13 +20,13 @@ verify_schema = ['int','int','long','string'] verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC' verify_data = """ -1,1,50,1-50 -1,2,2,2-2 -1,3,2,3-2 -1,5,2,5-2 -1,8,2,8-2 -1,13,2,13-2 -1,21,2,21-2 +1,1,50,1-50,2022-03-11 +1,2,2,2-2,2022-03-12 +1,3,2,3-2,2022-03-13 +1,5,2,5-2,2022-03-15 +1,8,2,8-2,2022-03-18 +1,13,2,13-2,2022-03-13 +1,21,2,21-2,2022-03-21 """ drop_sqls = [ diff --git a/e2e_test/iceberg/test_case/partition_append_only.toml b/e2e_test/iceberg/test_case/partition_append_only.toml index 4721ef11c5ba..e1949319211c 100644 --- a/e2e_test/iceberg/test_case/partition_append_only.toml +++ b/e2e_test/iceberg/test_case/partition_append_only.toml @@ -14,7 +14,7 @@ init_sqls = [ v_timestamp timestamp, v_ts_ntz timestamp_ntz ) - PARTITIONED BY (v_int,v_long,v_float,v_double,v_varchar,v_bool,v_date,v_timestamp,v_ts_ntz) + PARTITIONED BY (v_int,bucket(10,v_long),truncate(30,v_long),years(v_date),months(v_timestamp),days(v_ts_ntz)) TBLPROPERTIES ('format-version'='2'); ''' ] diff --git a/e2e_test/iceberg/test_case/partition_upsert.toml b/e2e_test/iceberg/test_case/partition_upsert.toml index d95178ed893f..ea027f18ecdb 100644 --- a/e2e_test/iceberg/test_case/partition_upsert.toml +++ b/e2e_test/iceberg/test_case/partition_upsert.toml @@ -6,9 +6,10 @@ init_sqls = [ id int, v1 int, v2 long, - v3 string + v3 string, + v4 date ) USING iceberg - PARTITIONED BY (v1,v2) + PARTITIONED BY (v1,v2,truncate(2,v3)) TBLPROPERTIES ('format-version'='2'); ''' ] @@ -20,13 +21,13 @@ verify_schema = ['int','int','long','string'] verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC' verify_data = """ -1,1,50,1-50 -1,2,2,2-2 -1,3,2,3-2 -1,5,2,5-2 -1,8,2,8-2 -1,13,2,13-2 -1,21,2,21-2 +1,1,50,1-50,2022-03-11 +1,2,2,2-2,2022-03-12 +1,3,2,3-2,2022-03-13 +1,5,2,5-2,2022-03-15 +1,8,2,8-2,2022-03-18 +1,13,2,13-2,2022-03-13 +1,21,2,21-2,2022-03-21 """ drop_sqls = [ diff --git a/e2e_test/iceberg/test_case/range_partition_append_only.toml b/e2e_test/iceberg/test_case/range_partition_append_only.toml new file mode 100644 index 000000000000..d0e270210dff --- /dev/null +++ b/e2e_test/iceberg/test_case/range_partition_append_only.toml @@ -0,0 +1,40 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id long, + v_int int, + v_long long, + v_float float, + v_double double, + v_varchar string, + v_bool boolean, + v_date date, + v_timestamp timestamp, + v_ts_ntz timestamp_ntz + ) + PARTITIONED BY (years(v_date),months(v_timestamp),days(v_ts_ntz)) + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_append_only.slt' + +verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' + + +verify_data = """ +1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00 +2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00 +3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00 +4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00 +5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/e2e_test/iceberg/test_case/range_partition_upsert.toml b/e2e_test/iceberg/test_case/range_partition_upsert.toml new file mode 100644 index 000000000000..ac081d6edeab --- /dev/null +++ b/e2e_test/iceberg/test_case/range_partition_upsert.toml @@ -0,0 +1,36 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id int, + v1 int, + v2 long, + v3 string, + v4 date + ) USING iceberg + PARTITIONED BY (days(v4)) + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_upsert.slt' + +verify_schema = ['int','int','long','string'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC' + +verify_data = """ +1,1,50,1-50,2022-03-11 +1,2,2,2-2,2022-03-12 +1,3,2,3-2,2022-03-13 +1,5,2,5-2,2022-03-15 +1,8,2,8-2,2022-03-18 +1,13,2,13-2,2022-03-13 +1,21,2,21-2,2022-03-21 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 395fc0496850..acc4f3175428 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -223,6 +223,7 @@ message SinkDesc { string sink_from_name = 12; catalog.SinkFormatDesc format_desc = 13; optional uint32 target_table = 14; + optional uint64 extra_partition_col_idx = 15; } enum SinkLogStoreType { diff --git a/src/common/src/array/arrow/arrow_iceberg.rs b/src/common/src/array/arrow/arrow_iceberg.rs new file mode 100644 index 000000000000..73476a8d291a --- /dev/null +++ b/src/common/src/array/arrow/arrow_iceberg.rs @@ -0,0 +1,73 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, StructArray}; +use arrow_schema::DataType; +use itertools::Itertools; + +use crate::array::{ArrayError, DataChunk}; +use crate::util::iter_util::ZipEqFast; + +/// Converts RisingWave array to Arrow array with the schema. +/// The behavior is specified for iceberg: +/// For different struct type, try to use fields in schema to cast. +pub fn to_iceberg_record_batch_with_schema( + schema: arrow_schema::SchemaRef, + chunk: &DataChunk, +) -> Result { + if !chunk.is_compacted() { + let c = chunk.clone(); + return to_iceberg_record_batch_with_schema(schema, &c.compact()); + } + let columns: Vec<_> = chunk + .columns() + .iter() + .zip_eq_fast(schema.fields().iter()) + .map(|(column, field)| { + let column: arrow_array::ArrayRef = column.as_ref().try_into()?; + if column.data_type() == field.data_type() { + Ok(column) + } else if let DataType::Struct(actual) = column.data_type() + && let DataType::Struct(expect) = field.data_type() + { + // Special case for iceberg + if actual.len() != expect.len() { + return Err(ArrayError::to_arrow(format!( + "Struct field count mismatch, expect {}, actual {}", + expect.len(), + actual.len() + ))); + } + let column = column + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + let (_, struct_columns, nullable) = column.into_parts(); + Ok(Arc::new( + StructArray::try_new(expect.clone(), struct_columns, nullable) + .map_err(ArrayError::from_arrow)?, + ) as ArrayRef) + } else { + arrow_cast::cast(&column, field.data_type()).map_err(ArrayError::from_arrow) + } + }) + .try_collect::<_, _, ArrayError>()?; + + let opts = arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity())); + arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts) + .map_err(ArrayError::to_arrow) +} diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 87f82377f967..7274bb3f34e4 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -133,6 +133,7 @@ macro_rules! converts_generic { fn try_from(array: &ArrayImpl) -> Result { match array { $($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)* + ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())), _ => todo!("unsupported array"), } } @@ -152,6 +153,13 @@ macro_rules! converts_generic { .unwrap() .try_into()?, )),)* + Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz( + array + .as_any() + .downcast_ref::() + .unwrap() + .try_into()?, + )), t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))), } } @@ -173,7 +181,6 @@ converts_generic! { { arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 }, { arrow_array::Date32Array, Date32, ArrayImpl::Date }, { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp }, - { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz }, { arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time }, { arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval }, { arrow_array::StructArray, Struct(_), ArrayImpl::Struct }, diff --git a/src/common/src/array/arrow/mod.rs b/src/common/src/array/arrow/mod.rs index 2e0db203a5ee..b3fed6bafaa2 100644 --- a/src/common/src/array/arrow/mod.rs +++ b/src/common/src/array/arrow/mod.rs @@ -14,6 +14,8 @@ mod arrow_default; mod arrow_deltalake; +mod arrow_iceberg; pub use arrow_default::to_record_batch_with_schema; pub use arrow_deltalake::to_deltalake_record_batch_with_schema; +pub use arrow_iceberg::to_iceberg_record_batch_with_schema; diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index ef2caa8daa26..f1012782bf9a 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -15,7 +15,10 @@ //! `Array` defines all in-memory representations of vectorized execution framework. mod arrow; -pub use arrow::{to_deltalake_record_batch_with_schema, to_record_batch_with_schema}; +pub use arrow::{ + to_deltalake_record_batch_with_schema, to_iceberg_record_batch_with_schema, + to_record_batch_with_schema, +}; mod bool_array; pub mod bytes_array; mod chrono_array; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 5231a55ea058..3f78d987f9cc 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c "xz", ] } arrow-array = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } async-nats = "0.33" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 513a39ae90e3..37d35e1d7edd 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -67,6 +67,9 @@ pub struct SinkDesc { /// Id of the target table for sink into table. pub target_table: Option, + + /// See the same name field in `SinkWriterParam`. + pub extra_partition_col_idx: Option, } impl SinkDesc { @@ -123,6 +126,7 @@ impl SinkDesc { db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), target_table: self.target_table.map(|table_id| table_id.table_id()), + extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64), } } } diff --git a/src/connector/src/sink/iceberg/mock_catalog.rs b/src/connector/src/sink/iceberg/mock_catalog.rs new file mode 100644 index 000000000000..f9d60965b1d3 --- /dev/null +++ b/src/connector/src/sink/iceberg/mock_catalog.rs @@ -0,0 +1,237 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use icelake::catalog::{Catalog, UpdateTable}; +use icelake::types::{Field, PartitionField, Schema, Struct, TableMetadata}; +use icelake::{Table, TableIdentifier}; +use opendal::services::Memory; +use opendal::Operator; + +/// A mock catalog for iceberg used for plan test. +pub struct MockCatalog; + +impl MockCatalog { + const RANGE_TABLE: &'static str = "range_table"; + const SPARSE_TABLE: &'static str = "sparse_table"; + + fn sparse_table(self: &Arc) -> Table { + Table::builder_from_catalog( + { + let mut builder = Memory::default(); + builder.root("/tmp"); + Operator::new(builder).unwrap().finish() + }, + self.clone(), + TableMetadata { + format_version: icelake::types::TableFormatVersion::V2, + table_uuid: "1".to_string(), + location: "1".to_string(), + last_sequence_number: 1, + last_updated_ms: 1, + last_column_id: 1, + schemas: vec![Schema::new( + 1, + None, + Struct::new(vec![ + Field::required( + 1, + "v1", + icelake::types::Any::Primitive(icelake::types::Primitive::Int), + ) + .into(), + Field::required( + 2, + "v2", + icelake::types::Any::Primitive(icelake::types::Primitive::Long), + ) + .into(), + Field::required( + 3, + "v3", + icelake::types::Any::Primitive(icelake::types::Primitive::String), + ) + .into(), + Field::required( + 4, + "v4", + icelake::types::Any::Primitive(icelake::types::Primitive::Time), + ) + .into(), + ]), + )], + current_schema_id: 1, + partition_specs: vec![icelake::types::PartitionSpec { + spec_id: 1, + fields: vec![ + PartitionField { + source_column_id: 1, + partition_field_id: 5, + transform: icelake::types::Transform::Identity, + name: "f1".to_string(), + }, + PartitionField { + source_column_id: 2, + partition_field_id: 6, + transform: icelake::types::Transform::Bucket(1), + name: "f2".to_string(), + }, + PartitionField { + source_column_id: 3, + partition_field_id: 7, + transform: icelake::types::Transform::Truncate(1), + name: "f3".to_string(), + }, + PartitionField { + source_column_id: 4, + partition_field_id: 8, + transform: icelake::types::Transform::Void, + name: "f4".to_string(), + }, + ], + }], + default_spec_id: 1, + last_partition_id: 1, + properties: None, + current_snapshot_id: None, + snapshots: None, + snapshot_log: None, + metadata_log: None, + sort_orders: vec![], + default_sort_order_id: 0, + refs: HashMap::new(), + }, + TableIdentifier::new(vec![Self::SPARSE_TABLE]).unwrap(), + ) + .build() + .unwrap() + } + + fn range_table(self: &Arc) -> Table { + Table::builder_from_catalog( + { + let mut builder = Memory::default(); + builder.root("/tmp"); + Operator::new(builder).unwrap().finish() + }, + self.clone(), + TableMetadata { + format_version: icelake::types::TableFormatVersion::V2, + table_uuid: "1".to_string(), + location: "1".to_string(), + last_sequence_number: 1, + last_updated_ms: 1, + last_column_id: 1, + schemas: vec![Schema::new( + 1, + None, + Struct::new(vec![ + Field::required( + 1, + "v1", + icelake::types::Any::Primitive(icelake::types::Primitive::Date), + ) + .into(), + Field::required( + 2, + "v2", + icelake::types::Any::Primitive(icelake::types::Primitive::Timestamp), + ) + .into(), + Field::required( + 3, + "v3", + icelake::types::Any::Primitive(icelake::types::Primitive::Timestampz), + ) + .into(), + Field::required( + 4, + "v4", + icelake::types::Any::Primitive(icelake::types::Primitive::Timestampz), + ) + .into(), + ]), + )], + current_schema_id: 1, + partition_specs: vec![icelake::types::PartitionSpec { + spec_id: 1, + fields: vec![ + PartitionField { + source_column_id: 1, + partition_field_id: 7, + transform: icelake::types::Transform::Year, + name: "f4".to_string(), + }, + PartitionField { + source_column_id: 2, + partition_field_id: 8, + transform: icelake::types::Transform::Month, + name: "f5".to_string(), + }, + PartitionField { + source_column_id: 3, + partition_field_id: 9, + transform: icelake::types::Transform::Day, + name: "f6".to_string(), + }, + PartitionField { + source_column_id: 4, + partition_field_id: 10, + transform: icelake::types::Transform::Hour, + name: "f7".to_string(), + }, + ], + }], + default_spec_id: 1, + last_partition_id: 1, + properties: None, + current_snapshot_id: None, + snapshots: None, + snapshot_log: None, + metadata_log: None, + sort_orders: vec![], + default_sort_order_id: 0, + refs: HashMap::new(), + }, + TableIdentifier::new(vec![Self::RANGE_TABLE]).unwrap(), + ) + .build() + .unwrap() + } +} + +#[async_trait] +impl Catalog for MockCatalog { + fn name(&self) -> &str { + "mock" + } + + // Mock catalog load mock table according to table_name, there is 2 kinds of table for test: + // 1. sparse partition table + // 2. range partition table + async fn load_table(self: Arc, table_name: &TableIdentifier) -> icelake::Result { + match table_name.name.as_ref() { + Self::SPARSE_TABLE => Ok(self.sparse_table()), + Self::RANGE_TABLE => Ok(self.range_table()), + _ => unimplemented!("table {} not found", table_name), + } + } + + async fn update_table(self: Arc, _update_table: &UpdateTable) -> icelake::Result
{ + unimplemented!() + } +} diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 856c8deea0f4..9e51557ef477 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -13,14 +13,18 @@ // limitations under the License. mod jni_catalog; +mod mock_catalog; mod prometheus; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; +use std::sync::Arc; use anyhow::anyhow; -use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use async_trait::async_trait; use icelake::catalog::{ load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME, @@ -35,7 +39,7 @@ use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; use icelake::{Table, TableIdentifier}; use itertools::Itertools; -use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; +use risingwave_common::array::{to_iceberg_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::error::anyhow_error; @@ -47,8 +51,8 @@ use serde_derive::Deserialize; use url::Url; use with_options::WithOptions; +use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; -use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -346,6 +350,56 @@ impl IcebergConfig { } } +async fn create_catalog(config: &IcebergConfig) -> Result { + match config.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = config.build_iceberg_configs()?; + let catalog = load_catalog(&iceberg_configs) + .await + .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; + Ok(catalog) + } + catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", + _ => unreachable!(), + }; + + jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) + } + "mock" => Ok(Arc::new(MockCatalog{})), + _ => { + Err(SinkError::Iceberg(anyhow!( + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", + config.catalog_type() + ))) + } + } +} + +pub async fn create_table(config: &IcebergConfig) -> Result
{ + let catalog = create_catalog(config) + .await + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; + + let table_id = TableIdentifier::new( + vec![config.database_name.as_str()] + .into_iter() + .chain(config.table_name.split('.')), + ) + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; + + catalog + .load_table(&table_id) + .await + .map_err(|err| SinkError::Iceberg(anyhow!(err))) +} + pub struct IcebergSink { config: IcebergConfig, param: SinkParam, @@ -371,54 +425,8 @@ impl Debug for IcebergSink { } impl IcebergSink { - async fn create_catalog(&self) -> Result { - match self.config.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = self.config.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs) - .await - .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - Ok(catalog) - } - catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = self.config.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", - _ => unreachable!(), - }; - - jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) - } - _ => { - Err(SinkError::Iceberg(anyhow!( - "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", - self.config.catalog_type() - ))) - } - } - } - - async fn create_table(&self) -> Result
{ - let catalog = self - .create_catalog() - .await - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; - - let table_id = TableIdentifier::new( - vec![self.config.database_name.as_str()] - .into_iter() - .chain(self.config.table_name.split('.')), - ) - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - - let table = catalog - .load_table(&table_id) - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; + async fn create_and_validate_table(&self) -> Result
{ + let table = create_table(&self.config).await?; let sink_schema = self.param.schema(); let iceberg_schema = table @@ -475,12 +483,12 @@ impl Sink for IcebergSink { const SINK_NAME: &'static str = ICEBERG_SINK; async fn validate(&self) -> Result<()> { - let _ = self.create_table().await?; + let _ = self.create_and_validate_table().await?; Ok(()) } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - let table = self.create_table().await?; + let table = self.create_and_validate_table().await?; let inner = if let Some(unique_column_ids) = &self.unique_column_ids { IcebergWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await? } else { @@ -505,7 +513,7 @@ impl Sink for IcebergSink { } async fn new_coordinator(&self) -> Result { - let table = self.create_table().await?; + let table = self.create_and_validate_table().await?; let partition_type = table.current_partition_type()?; Ok(IcebergSinkCommitter { @@ -526,6 +534,31 @@ enum IcebergWriterEnum { } impl IcebergWriter { + fn schema_with_extra_partition_col(table: &Table, idx: usize) -> Result { + let schema = table.current_arrow_schema()?; + + let mut fields = schema.fields().to_vec(); + let partition_type = + if let ArrowDataType::Struct(s) = table.current_partition_type()?.try_into()? { + let fields = Fields::from( + s.into_iter() + .enumerate() + .map(|(id, field)| { + ArrowField::new(format!("f{id}"), field.data_type().clone(), true) + }) + .collect::>(), + ); + ArrowDataType::Struct(fields) + } else { + unimplemented!() + }; + fields.insert( + idx, + ArrowField::new("_rw_partition", partition_type, false).into(), + ); + Ok(ArrowSchema::new(fields).into()) + } + pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; @@ -537,30 +570,54 @@ impl IcebergWriter { .iceberg_rolling_unflushed_data_file .clone(), )); - let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); - let dispatch_builder = builder_helper - .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; - // wrap a layer with collect write metrics - let prometheus_builder = PrometheusWriterBuilder::new( - dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), - ); - let schema = table.current_arrow_schema()?; - let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); - Ok(Self { - inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), - schema, - }) + if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + let partition_data_file_builder = builder_helper.precompute_partition_writer_builder( + data_file_builder.clone(), + extra_partition_col_idx, + )?; + let dispatch_builder = builder_helper + .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), + schema, + }) + } else { + let partition_data_file_builder = + builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?; + let dispatch_builder = builder_helper + .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = table.current_arrow_schema()?; + let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), + schema, + }) + } } pub async fn new_upsert( @@ -592,30 +649,55 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(delta_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); - let dispatch_builder = - builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; - // wrap a layer with collect write metrics - let prometheus_builder = PrometheusWriterBuilder::new( - dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), - ); - let schema = table.current_arrow_schema()?; - let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); - Ok(Self { - inner_writer: IcebergWriterEnum::Upsert(inner_writer), - schema, - }) + if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + let partition_delta_builder = builder_helper.precompute_partition_writer_builder( + delta_builder.clone(), + extra_partition_col_idx, + )?; + let dispatch_builder = + builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::Upsert(inner_writer), + schema, + }) + } else { + let partition_delta_builder = + builder_helper.fanout_partition_writer_builder(delta_builder.clone())?; + let dispatch_builder = + builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = table.current_arrow_schema()?; + let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::Upsert(inner_writer), + schema, + }) + } } } @@ -642,13 +724,14 @@ impl SinkWriter for IcebergWriter { let filters = chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); chunk.set_visibility(filters); - let chunk = to_record_batch_with_schema(self.schema.clone(), &chunk.compact()) - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; + let chunk = + to_iceberg_record_batch_with_schema(self.schema.clone(), &chunk.compact()) + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; writer.write(chunk).await?; } IcebergWriterEnum::Upsert(writer) => { - let chunk = to_record_batch_with_schema(self.schema.clone(), &chunk) + let chunk = to_iceberg_record_batch_with_schema(self.schema.clone(), &chunk) .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; writer diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index be8d252ded5e..45f185196040 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -270,6 +270,11 @@ pub struct SinkWriterParam { pub vnode_bitmap: Option, pub meta_client: Option, pub sink_metrics: SinkMetrics, + // The val has two effect: + // 1. Indicates that the sink will accpect the data chunk with extra partition value column. + // 2. The index of the extra partition value column. + // More detail of partition value column, see `PartitionComputeInfo` + pub extra_partition_col_idx: Option, } #[derive(Clone)] @@ -303,6 +308,7 @@ impl SinkWriterParam { vnode_bitmap: Default::default(), meta_client: Default::default(), sink_metrics: SinkMetrics::for_test(), + extra_partition_col_idx: Default::default(), } } } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 7e6ff16ac8cf..2a10187995d2 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -38,6 +38,7 @@ fixedbitset = "0.4.2" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" +icelake = { workspace = true } itertools = "0.12" maplit = "1" md5 = "0.7.0" diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index f3c7d9250ec4..f7ce8cc04a95 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -833,6 +833,7 @@ impl TestCase { format_desc, false, None, + None, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/planner_test/tests/testdata/input/sink.yaml b/src/frontend/planner_test/tests/testdata/input/sink.yaml index 70a09e211a4a..f9241cdc7c9a 100644 --- a/src/frontend/planner_test/tests/testdata/input/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/input/sink.yaml @@ -81,3 +81,76 @@ explain create sink sk1 from t2 emit on window close with (connector='blackhole'); expected_outputs: - explain_output +- id: create_mock_iceberg_sink_append_only_with_sparse_partition + sql: | + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'sparse_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' + ); + expected_outputs: + - explain_output +- id: create_mock_iceberg_sink_append_only_with_range_partition + sql: | + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' + ); + expected_outputs: + - explain_output +- id: create_mock_iceberg_sink_upsert_with_sparse_partition + sql: | + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'upsert', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'sparse_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' + ); + expected_outputs: + - explain_output +- id: create_mock_iceberg_sink_upsert_with_range_partition + sql: | + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'upsert', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' + ); + expected_outputs: + - explain_output + diff --git a/src/frontend/planner_test/tests/testdata/output/sink.yaml b/src/frontend/planner_test/tests/testdata/output/sink.yaml index e2aadf7ee85f..8f759b107243 100644 --- a/src/frontend/planner_test/tests/testdata/output/sink.yaml +++ b/src/frontend/planner_test/tests/testdata/output/sink.yaml @@ -172,3 +172,83 @@ StreamSink { type: upsert, columns: [a, b, t2._row_id(hidden)], pk: [t2._row_id] } └─StreamEowcSort { sort_column: t2.b } └─StreamTableScan { table: t2, columns: [a, b, _row_id] } +- id: create_mock_iceberg_sink_append_only_with_sparse_partition + sql: | + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'sparse_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' + ); + explain_output: | + StreamSink { type: append-only, columns: [v1, v2, v3, v4, t1._row_id(hidden)] } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } +- id: create_mock_iceberg_sink_append_only_with_range_partition + sql: | + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'append-only', + force_append_only = 'true', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin' + ); + explain_output: | + StreamSink { type: append-only, columns: [v1, v2, v3, v4, t1._row_id(hidden)] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } +- id: create_mock_iceberg_sink_upsert_with_sparse_partition + sql: | + create table t1 (v1 int, v2 bigint, v3 varchar, v4 time); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'upsert', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'sparse_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' + ); + explain_output: | + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + └─StreamExchange { dist: HashShard($expr1) } + └─StreamProject { exprs: [t1.v1, t1.v2, t1.v3, t1.v4, t1._row_id, Row(t1.v1, IcebergTransform('bucket[1]':Varchar, t1.v2), IcebergTransform('truncate[1]':Varchar, t1.v3), null:Int32) as $expr1] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } +- id: create_mock_iceberg_sink_upsert_with_range_partition + sql: | + create table t1 (v1 date, v2 timestamp, v3 timestamp with time zone, v4 timestamp); + explain create sink s1 as select v1 as v1, v2 as v2, v3 as v3, v4 as v4 from t1 WITH ( + connector = 'iceberg', + type = 'upsert', + catalog.type = 'mock', + database.name = 'demo_db', + table.name = 'range_table', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' + ); + explain_output: | + StreamSink { type: upsert, columns: [v1, v2, v3, v4, t1._row_id(hidden)], pk: [t1._row_id] } + └─StreamTableScan { table: t1, columns: [v1, v2, v3, v4, _row_id] } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f3d905357155..2ccd32a4c006 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -17,16 +17,18 @@ use std::rc::Rc; use std::sync::{Arc, LazyLock}; use anyhow::Context; +use arrow_schema::DataType as ArrowDataType; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId}; use risingwave_common::error::{ErrorCode, Result, RwError}; -use risingwave_common::types::Datum; +use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::iceberg::{create_table, IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; @@ -43,6 +45,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; +use super::create_source::UPSTREAM_SOURCE_KEY; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; @@ -52,13 +55,15 @@ use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGene use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::{generic, Explain, LogicalSource, StreamProject}; +use crate::optimizer::plan_node::{ + generic, IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, +}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::resolve_privatelink_in_with_option; -use crate::{Planner, TableCatalog, WithOptions}; +use crate::{Explain, Planner, TableCatalog, WithOptions}; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { let table_factor = TableFactor::Table { @@ -98,6 +103,7 @@ pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, + partition_info: Option, ) -> Result { let db_name = session.database(); let (sink_schema_name, sink_table_name) = @@ -227,6 +233,7 @@ pub fn gen_sink_plan( format_desc, without_backfill, target_table, + partition_info, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -291,6 +298,96 @@ pub fn gen_sink_plan( }) } +// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`. +// Return: +// `Some(PartitionComputeInfo)` if the sink need to compute partition. +// `None` if the sink does not need to compute partition. +pub async fn get_partition_compute_info( + with_options: &WithOptions, +) -> Result> { + let properties = HashMap::from_iter(with_options.clone().into_inner().into_iter()); + let Some(connector) = properties.get(UPSTREAM_SOURCE_KEY) else { + return Ok(None); + }; + match connector.as_str() { + ICEBERG_SINK => { + let iceberg_config = IcebergConfig::from_hashmap(properties)?; + get_partition_compute_info_for_iceberg(&iceberg_config).await + } + _ => Ok(None), + } +} + +async fn get_partition_compute_info_for_iceberg( + iceberg_config: &IcebergConfig, +) -> Result> { + let table = create_table(iceberg_config).await?; + let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else { + return Ok(None); + }; + + if partition_spec.is_unpartitioned() { + return Ok(None); + } + + // Separate the partition spec into two parts: sparse partition and range partition. + // Sparse partition means that the data distribution is more sparse at a given time. + // Range partition means that the data distribution is likely same at a given time. + // Only compute the partition and shuffle by them for the sparse partition. + let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { + // Sparse partition + icelake::types::Transform::Identity + | icelake::types::Transform::Truncate(_) + | icelake::types::Transform::Bucket(_) => true, + // Range partition + icelake::types::Transform::Year + | icelake::types::Transform::Month + | icelake::types::Transform::Day + | icelake::types::Transform::Hour + | icelake::types::Transform::Void => false, + }); + + if !has_sparse_partition { + return Ok(None); + } + + let arrow_type: ArrowDataType = table + .current_partition_type() + .map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))? + .try_into() + .map_err(|_| { + RwError::from(ErrorCode::SinkError( + "Fail to convert iceberg partition type to arrow type".into(), + )) + })?; + let Some(schema) = table.current_table_metadata().current_schema().ok() else { + return Ok(None); + }; + let partition_fields = partition_spec + .fields + .iter() + .map(|f| { + let source_f = schema + .look_up_field_by_id(f.source_column_id) + .ok_or(RwError::from(ErrorCode::SinkError( + "Fail to look up iceberg partition field".into(), + )))?; + Ok((source_f.name.clone(), f.transform)) + }) + .collect::>>()?; + + let DataType::Struct(partition_type) = arrow_type.into() else { + return Err(RwError::from(ErrorCode::SinkError( + "Partition type of iceberg should be a struct type".into(), + ))); + }; + + Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo { + partition_type, + partition_fields, + }))) +} + pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -305,6 +402,8 @@ pub async fn handle_create_sink( return Ok(resp); } + let partition_info = get_partition_compute_info(&handle_args.with_options).await?; + let (sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); @@ -313,7 +412,7 @@ pub async fn handle_create_sink( sink_plan: plan, sink_catalog: sink, target_table_catalog, - } = gen_sink_plan(&session, context.clone(), stmt)?; + } = gen_sink_plan(&session, context.clone(), stmt, partition_info)?; let has_order_by = !query.order_by.is_empty(); if has_order_by { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index b7981cf7aec6..e5489bf258f8 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -24,7 +24,7 @@ use thiserror_ext::AsReport; use super::create_index::gen_create_index_plan; use super::create_mv::gen_create_mv_plan; -use super::create_sink::gen_sink_plan; +use super::create_sink::{gen_sink_plan, get_partition_compute_info}; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; @@ -88,6 +88,13 @@ async fn do_handle_explain( let context = plan.ctx(); (Ok(plan), context) } + Statement::CreateSink { stmt } => { + let partition_info = get_partition_compute_info(context.with_options()).await?; + let plan = gen_sink_plan(&session, context.into(), stmt, partition_info) + .map(|plan| plan.sink_plan)?; + let context = plan.ctx(); + (Ok(plan), context) + } // For other queries without `await` point, we can keep a copy of reference to the // `OptimizerContext` even if the planning fails. This enables us to log the partial @@ -122,10 +129,6 @@ async fn do_handle_explain( "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into() ).into()); } - Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan) - } - Statement::CreateIndex { name, table_name, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 8eca9de358fe..e27220feb279 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -60,8 +60,8 @@ use self::heuristic_optimizer::ApplyOrder; use self::plan_node::generic::{self, PhysicalPlanRef}; use self::plan_node::{ stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource, - StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, - ToStreamContext, + PartitionComputeInfo, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, + StreamWatermarkFilter, ToStreamContext, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -774,6 +774,7 @@ impl PlanRoot { } /// Optimize and generate a create sink plan. + #[allow(clippy::too_many_arguments)] pub fn gen_sink_plan( &mut self, sink_name: String, @@ -785,6 +786,7 @@ impl PlanRoot { format_desc: Option, without_backfill: bool, target_table: Option, + partition_info: Option, ) -> Result { let stream_scan_type = if without_backfill { StreamScanType::UpstreamOnly @@ -815,6 +817,7 @@ impl PlanRoot { definition, properties, format_desc, + partition_info, ) } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index a0475c4ae092..248bb59f4edd 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -943,7 +943,7 @@ pub use stream_project_set::StreamProjectSet; pub use stream_row_id_gen::StreamRowIdGen; pub use stream_share::StreamShare; pub use stream_simple_agg::StreamSimpleAgg; -pub use stream_sink::StreamSink; +pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink}; pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 8f7abc66fb45..325c373956f0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -17,27 +17,33 @@ use std::io::{Error, ErrorKind}; use anyhow::anyhow; use fixedbitset::FixedBitSet; +use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, Field, TableId}; use risingwave_common::constants::log_store::v1::{KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::types::{DataType, StructType}; +use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::iceberg::ICEBERG_SINK; use risingwave_connector::sink::trivial::TABLE_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; +use risingwave_pb::expr::expr_node::Type; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::{derive_columns, derive_pk}; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef}; use super::stream::prelude::*; use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder}; -use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject}; +use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -46,6 +52,115 @@ use crate::{TableCatalog, WithOptions}; const DOWNSTREAM_PK_KEY: &str = "primary_key"; +/// ## Why we need `PartitionComputeInfo`? +/// +/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink() +/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in . +/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink. +/// +/// ## What is `PartitionComputeInfo`? +/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use +/// these information to create the corresponding expression in `StreamProject` node. +/// +/// #TODO +/// Maybe we should move this in sink? +pub enum PartitionComputeInfo { + Iceberg(IcebergPartitionInfo), +} + +impl PartitionComputeInfo { + pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result { + match self { + PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns), + } + } +} + +pub struct IcebergPartitionInfo { + pub partition_type: StructType, + // (partition_field_name, partition_field_transform) + pub partition_fields: Vec<(String, Transform)>, +} + +impl IcebergPartitionInfo { + #[inline] + fn transform_to_expression( + transform: &Transform, + col_id: usize, + columns: &[ColumnCatalog], + result_type: DataType, + ) -> Result { + match transform { + Transform::Identity => { + if columns[col_id].column_desc.data_type != result_type { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!( + "The partition field {} has type {}, but the partition field is {}", + columns[col_id].column_desc.name, + columns[col_id].column_desc.data_type, + result_type + ), + ))) + .into()); + } + Ok(ExprImpl::InputRef( + InputRef::new(col_id, result_type).into(), + )) + } + Transform::Void => Ok(ExprImpl::literal_null(result_type)), + _ => Ok(ExprImpl::FunctionCall( + FunctionCall::new_unchecked( + Type::IcebergTransform, + vec![ + ExprImpl::literal_varchar(transform.to_string()), + ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()) + .into(), + ), + ], + result_type, + ) + .into(), + )), + } + } + + pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result { + let child_exprs = self + .partition_fields + .into_iter() + .zip_eq_debug(self.partition_type.iter()) + .map(|((field_name, transform), (_, result_type))| { + let col_id = find_column_idx_by_name(columns, &field_name)?; + Self::transform_to_expression(&transform, col_id, columns, result_type.clone()) + }) + .collect::>>()?; + + Ok(ExprImpl::FunctionCall( + FunctionCall::new_unchecked( + Type::Row, + child_exprs, + DataType::Struct(self.partition_type), + ) + .into(), + )) + } +} +#[inline] +fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result { + columns + .iter() + .position(|col| col.column_desc.name == col_name) + .ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name) + ))) + .into() + }) +} + /// [`StreamSink`] represents a table/connector sink at the very end of the graph. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSink { @@ -87,6 +202,7 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (input, sink) = Self::derive_sink_desc( @@ -101,6 +217,7 @@ impl StreamSink { definition, properties, format_desc, + partition_info, )?; let unsupported_sink = @@ -133,9 +250,42 @@ impl StreamSink { Ok(Self::new(input, sink)) } + fn derive_iceberg_sink_distribution( + input: PlanRef, + partition_info: Option, + columns: &[ColumnCatalog], + ) -> Result<(RequiredDist, PlanRef, Option)> { + // For here, we need to add the plan node to compute the partition value, and add it as a extra column. + if let Some(partition_info) = partition_info { + let input_fields = input.schema().fields(); + + let mut exprs: Vec<_> = input_fields + .iter() + .enumerate() + .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into()) + .collect(); + + // Add the partition compute expression to the end of the exprs + exprs.push(partition_info.convert_to_expression(columns)?); + let partition_col_idx = exprs.len() - 1; + let project = StreamProject::new(generic::Project::new(exprs.clone(), input)); + Ok(( + RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]), + project.into(), + Some(partition_col_idx), + )) + } else { + Ok(( + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()), + input, + None, + )) + } + } + #[allow(clippy::too_many_arguments)] fn derive_sink_desc( - input: PlanRef, + mut input: PlanRef, user_distributed_by: RequiredDist, name: String, db_name: String, @@ -146,12 +296,14 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result<(PlanRef, SinkDesc)> { let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; - + let mut downstream_pk = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut extra_partition_col_idx = None; let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { @@ -171,6 +323,21 @@ impl StreamSink { // lock contentions RequiredDist::hash_shard(downstream_pk.as_slice()) } + Some(s) if s == ICEBERG_SINK => { + // If user doesn't specify the downstream primary key, we use the stream key as the pk. + if sink_type.is_upsert() && downstream_pk.is_empty() { + downstream_pk = pk.iter().map(|k| k.column_index).collect_vec(); + } + let (required_dist, new_input, partition_col_idx) = + Self::derive_iceberg_sink_distribution( + input, + partition_info, + &columns, + )?; + input = new_input; + extra_partition_col_idx = partition_col_idx; + required_dist + } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); if downstream_pk.is_empty() { @@ -206,6 +373,7 @@ impl StreamSink { sink_type, format_desc, target_table, + extra_partition_col_idx, }; Ok((input, sink_desc)) } @@ -312,19 +480,7 @@ impl StreamSink { if trimmed_key.is_empty() { continue; } - match columns - .iter() - .position(|col| col.column_desc.name == trimmed_key) - { - Some(index) => downstream_pk_indices.push(index), - None => { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", trimmed_key), - ))) - .into()); - } - } + downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?); } Ok(downstream_pk_indices) } @@ -463,3 +619,98 @@ impl StreamNode for StreamSink { impl ExprRewritable for StreamSink {} impl ExprVisitable for StreamSink {} + +#[cfg(test)] +mod test { + use icelake::types::Transform; + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + use risingwave_common::types::{DataType, StructType}; + use risingwave_common::util::iter_util::ZipEqDebug; + use risingwave_pb::expr::expr_node::Type; + + use super::IcebergPartitionInfo; + use crate::expr::{Expr, ExprImpl}; + + fn create_column_catalog() -> Vec { + vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::named("v2", ColumnId::new(2), DataType::Timestamptz), + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc::named("v3", ColumnId::new(2), DataType::Timestamp), + is_hidden: false, + }, + ] + } + #[test] + fn test_iceberg_convert_to_expression() { + let partition_type = StructType::new(vec![ + ("f1", DataType::Int32), + ("f2", DataType::Int32), + ("f3", DataType::Int32), + ("f4", DataType::Int32), + ("f5", DataType::Int32), + ("f6", DataType::Int32), + ("f7", DataType::Int32), + ("f8", DataType::Int32), + ("f9", DataType::Int32), + ]); + let partition_fields = vec![ + ("v1".into(), Transform::Identity), + ("v1".into(), Transform::Bucket(10)), + ("v1".into(), Transform::Truncate(3)), + ("v2".into(), Transform::Year), + ("v2".into(), Transform::Month), + ("v3".into(), Transform::Day), + ("v3".into(), Transform::Hour), + ("v1".into(), Transform::Void), + ("v3".into(), Transform::Void), + ]; + let partition_info = IcebergPartitionInfo { + partition_type: partition_type.clone(), + partition_fields: partition_fields.clone(), + }; + let catalog = create_column_catalog(); + let actual_expr = partition_info.convert_to_expression(&catalog).unwrap(); + let actual_expr = actual_expr.as_function_call().unwrap(); + + assert_eq!( + actual_expr.return_type(), + DataType::Struct(partition_type.clone()) + ); + assert_eq!(actual_expr.inputs().len(), partition_fields.len()); + assert_eq!(actual_expr.func_type(), Type::Row); + + for ((expr, (_, transform)), (_, expect_type)) in actual_expr + .inputs() + .iter() + .zip_eq_debug(partition_fields.iter()) + .zip_eq_debug(partition_type.iter()) + { + match transform { + Transform::Identity => { + assert!(expr.is_input_ref()); + assert_eq!(expr.return_type(), *expect_type); + } + Transform::Void => { + assert!(expr.is_literal()); + assert_eq!(expr.return_type(), *expect_type); + } + _ => { + let expr = expr.as_function_call().unwrap(); + assert_eq!(expr.func_type(), Type::IcebergTransform); + assert_eq!(expr.inputs().len(), 2); + assert_eq!( + expr.inputs()[0], + ExprImpl::literal_varchar(transform.to_string()) + ); + } + } + } + } +} diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 1f8de0bb4dd9..4e64c3456bc8 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -90,11 +90,21 @@ impl SinkExecutor { log_store_factory: F, ) -> StreamExecutorResult { let sink = build_sink(sink_param.clone())?; - let input_schema: Schema = columns + let sink_input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) .collect(); - assert_eq!(input_schema.data_types(), info.schema.data_types()); + + if let Some(col_dix) = sink_writer_param.extra_partition_col_idx { + // Remove the partition column from the schema. + assert_eq!(sink_input_schema.data_types(), { + let mut data_type = info.schema.data_types(); + data_type.remove(col_dix); + data_type + }); + } else { + assert_eq!(sink_input_schema.data_types(), info.schema.data_types()); + } Ok(Self { actor_context, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index fb05faf90dd8..2837564eefad 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -117,6 +117,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), sink_metrics, + extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize), }; let log_store_identity = format!(