diff --git a/Cargo.lock b/Cargo.lock index d130d57d6ae90..f7534e1397003 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4649,7 +4649,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=3f7b53ba5b563524212c25810345d1314678e7fc#3f7b53ba5b563524212c25810345d1314678e7fc" +source = "git+https://github.com/ZENOTME/icelake?branch=transform#b68fb3f0b80b28158e0715ee7ec28cbddbf8feca" dependencies = [ "anyhow", "apache-avro 0.15.0", @@ -8390,7 +8390,9 @@ dependencies = [ "anyhow", "apache-avro 0.16.0", "arrow-array 49.0.0", + "arrow-row 49.0.0", "arrow-schema 49.0.0", + "arrow-select 49.0.0", "async-nats", "async-trait", "auto_enums", @@ -8661,6 +8663,7 @@ dependencies = [ "futures", "futures-async-stream", "iana-time-zone", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/Cargo.toml b/Cargo.toml index 80d6d6092fb49..6241d32c2e002 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,7 +118,10 @@ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [ +#icelake = { git = "https://github.com/ZENOTME/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [ +# "prometheus", +#] } +icelake = { git = "https://github.com/ZENOTME/icelake", branch = "transform" , features = [ "prometheus", ] } arrow-array = "49" diff --git a/proto/expr.proto b/proto/expr.proto index 653cbeaa339ad..7ee09a95f73a1 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -271,6 +271,19 @@ message ExprNode { // Adminitration functions COL_DESCRIPTION = 2100; CAST_REGCLASS = 2101; + + CONSTRUCT_STRUCT = 2111; + + // EXTERNAL + ICEBERG_PARTITION = 2102; + ICEBERG_BUCKET_I32 = 2103; + ICEBERG_BUCKET_I64 = 2104; + ICEBERG_BUCKET_DECIMAL = 2105; + ICEBERG_TRUNCATE_I32 = 2106; + ICEBERG_YEAR = 2107; + ICEBERG_MONTH = 2108; + ICEBERG_DAY = 2109; + ICEBERG_HOUR = 2110; } Type function_type = 1; data.DataType return_type = 3; @@ -280,7 +293,7 @@ message ExprNode { FunctionCall func_call = 6; UserDefinedFunction udf = 7; NowRexNode now = 8; - } + } } message TableFunction { diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 78dc79c530b9f..0c61547d09449 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -133,6 +133,7 @@ macro_rules! converts_generic { fn try_from(array: &ArrayImpl) -> Result { match array { $($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)* + ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())), _ => todo!("unsupported array"), } } @@ -152,6 +153,13 @@ macro_rules! converts_generic { .unwrap() .try_into()?, )),)* + Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz( + array + .as_any() + .downcast_ref::() + .unwrap() + .try_into()?, + )), t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))), } } @@ -173,7 +181,6 @@ converts_generic! { { arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 }, { arrow_array::Date32Array, Date32, ArrayImpl::Date }, { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp }, - { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz }, { arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time }, { arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval }, { arrow_array::StructArray, Struct(_), ArrayImpl::Struct }, diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 1d7ad5ef58f77..5d33b499f3d71 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c "xz", ] } arrow-array = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } async-nats = "0.32" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 660fdc3f9c6b3..01f7f22d058ff 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod precomputed_partition_writer; mod prometheus; use std::collections::HashMap; @@ -19,7 +20,9 @@ use std::fmt::Debug; use std::ops::Deref; use anyhow::anyhow; -use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use async_trait::async_trait; use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; @@ -44,7 +47,6 @@ use url::Url; use with_options::WithOptions; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; -use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -163,7 +165,7 @@ impl IcebergConfig { Ok(config) } - fn build_iceberg_configs(&self) -> Result> { + pub fn build_iceberg_configs(&self) -> Result> { let mut iceberg_configs = HashMap::new(); let catalog_type = self @@ -391,6 +393,40 @@ enum IcebergWriterEnum { } impl IcebergWriter { + fn schema(table: &Table) -> Result { + let schema = table.current_arrow_schema()?; + // #TODO + // When iceberg table is partitioned, we will add a partition column to the schema. + // Maybe we should accept a flag from `SinkWriterParam`? + if !table + .current_table_metadata() + .current_partition_spec()? + .is_unpartitioned() + { + let mut fields = schema.fields().to_vec(); + // # TODO + // When convert rw struct to arrow struct, we will compare the type of each field. + // To avoid the impact of metadata, we should reconstruct struct type without metadata. + // This approach may hack, do we have a better way? + let partition_type = if let ArrowDataType::Struct(s) = + table.current_partition_type()?.try_into()? + { + let fields = Fields::from( + s.into_iter() + .map(|field| ArrowField::new(field.name(), field.data_type().clone(), true)) + .collect::>(), + ); + ArrowDataType::Struct(fields) + } else { + unimplemented!() + }; + fields.push(ArrowField::new("_rw_partition", partition_type, false).into()); + Ok(ArrowSchema::new(fields).into()) + } else { + Ok(schema) + } + } + pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; @@ -402,10 +438,11 @@ impl IcebergWriter { .iceberg_rolling_unflushed_data_file .clone(), )); - let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); + let partition_data_file_builder = + precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new( + data_file_builder.clone(), + table.current_partition_type()?, + ); let dispatch_builder = builder_helper .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; // wrap a layer with collect write metrics @@ -420,7 +457,7 @@ impl IcebergWriter { .clone(), ), ); - let schema = table.current_arrow_schema()?; + let schema = Self::schema(&table)?; let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), @@ -457,10 +494,11 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(delta_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); + let partition_delta_builder = + precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new( + delta_builder.clone(), + table.current_partition_type()?, + ); let dispatch_builder = builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; // wrap a layer with collect write metrics @@ -475,7 +513,7 @@ impl IcebergWriter { .clone(), ), ); - let schema = table.current_arrow_schema()?; + let schema = Self::schema(&table)?; let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), diff --git a/src/connector/src/sink/iceberg/precomputed_partition_writer.rs b/src/connector/src/sink/iceberg/precomputed_partition_writer.rs new file mode 100644 index 0000000000000..e49316d542a43 --- /dev/null +++ b/src/connector/src/sink/iceberg/precomputed_partition_writer.rs @@ -0,0 +1,282 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use arrow_array::{BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, SchemaRef}; +use arrow_select::filter::filter_record_batch; +use icelake::io_v2::{IcebergWriteResult, IcebergWriter, IcebergWriterBuilder}; +use icelake::types::{struct_to_anyvalue_array_with_type, Any, AnyValue}; +use risingwave_common::util::iter_util::ZipEqFast; + +#[derive(Clone)] +pub struct PrecomputedPartitionedWriterBuilder { + inner: B, + partition_type: Any, +} + +impl PrecomputedPartitionedWriterBuilder { + pub fn new(inner: B, partition_type: Any) -> Self { + Self { + inner, + partition_type, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for PrecomputedPartitionedWriterBuilder { + type R = PrecomputedPartitionedWriter; + + async fn build(self, schema: &SchemaRef) -> icelake::Result { + // Find the partition value column + let expect_partition_type = + self.partition_type.clone().try_into().map_err(|e| { + icelake::Error::new(icelake::ErrorKind::ArrowError, format!("{}", e)) + })?; + let partition_col_idx = schema + .fields() + .iter() + .position(|field|{ + if let DataType::Struct(s1) = field.data_type() && let DataType::Struct(s2) = &expect_partition_type { + s1.iter().zip_eq_fast(s2.iter()).all(|(f1,f2)|f1.data_type() == f2.data_type()) + } else { + false + } + }) + .ok_or_else(|| { + icelake::Error::new( + icelake::ErrorKind::ArrowError, + format!( + "Schema should contains partition type {:?}", + expect_partition_type + ), + ) + })?; + + // remove the partition type from schema. + let mut new_schema = schema.as_ref().clone(); + new_schema.remove(partition_col_idx); + + let arrow_partition_type_fields = + if let DataType::Struct(fields) = self.partition_type.clone().try_into()? { + fields + } else { + unreachable!() + }; + let row_converter = RowConverter::new(vec![SortField::new(DataType::Struct( + arrow_partition_type_fields.clone(), + ))]) + .map_err(|e| icelake::Error::new(icelake::ErrorKind::ArrowError, e.to_string()))?; + + Ok(PrecomputedPartitionedWriter { + inner_writers: HashMap::new(), + inner_buidler: self.inner, + schema: new_schema.into(), + partition_type: self.partition_type, + row_converter, + partition_col_idx, + }) + } +} + +/// Partition append only writer +pub struct PrecomputedPartitionedWriter { + inner_writers: HashMap, + partition_type: Any, + row_converter: RowConverter, + inner_buidler: B, + schema: SchemaRef, + partition_col_idx: usize, +} + +#[async_trait::async_trait] +impl IcebergWriter for PrecomputedPartitionedWriter { + type R = <::R as IcebergWriter>::R; + + /// Write a record batch. The `DataFileWriter` will create a new file when the current row num is greater than `target_file_row_num`. + async fn write(&mut self, mut batch: RecordBatch) -> icelake::Result<()> { + // Remove the partition value from batch. + let partition_value = batch.remove_column(self.partition_col_idx); + let value_array = struct_to_anyvalue_array_with_type( + partition_value + .as_any() + .downcast_ref::() + .unwrap(), + self.partition_type.clone(), + )?; + + // Group the batch by row value. + // # TODO + // Maybe optimize the alloc and clone + let rows = self + .row_converter + .convert_columns(&[partition_value.clone()]) + .map_err(|e| icelake::Error::new(icelake::ErrorKind::ArrowError, e.to_string()))?; + let mut filters = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + filters + .entry(row.owned()) + .or_insert_with(|| (vec![false; batch.num_rows()], row_id)) + .0[row_id] = true; + }); + + for (row, filter) in filters { + let filter_array: BooleanArray = filter.0.into(); + let partial_batch = filter_record_batch(&batch, &filter_array) + .expect("We should guarantee the filter array is valid"); + match self.inner_writers.entry(row) { + Entry::Occupied(mut writer) => { + writer.get_mut().0.write(partial_batch).await?; + } + Entry::Vacant(vacant) => { + let value = value_array.get(filter.1).unwrap().as_ref().unwrap().clone(); + if let AnyValue::Struct(value) = value { + let new_writer = self.inner_buidler.clone().build(&self.schema).await?; + vacant + .insert((new_writer, value)) + .0 + .write(partial_batch) + .await?; + } else { + unreachable!() + } + } + } + } + Ok(()) + } + + /// Complte the write and return the list of `DataFile` as result. + async fn flush(&mut self) -> icelake::Result> { + let mut res_vec = vec![]; + let inner_writers = std::mem::take(&mut self.inner_writers); + for (_key, (mut writer, value)) in inner_writers { + let mut res = writer.flush().await?; + res.iter_mut().for_each(|res| { + res.set_partition(Some(value.clone())); + }); + res_vec.extend(res); + } + Ok(res_vec) + } +} + +// #[cfg(test)] +// mod test { +// use icelake::types::{PartitionSpec, PartitionField}; +// use itertools::Itertools; + +// pub fn create_partition() -> PartitionSpec { +// PartitionSpec { +// spec_id: 1, +// fields: vec![PartitionField { +// source_column_id: 1, +// partition_field_id: 1, +// transform: crate::types::Transform::Identity, +// name: "col1".to_string(), +// }], +// } +// } + +// #[tokio::test] +// async fn test_partition_writer() { +// let schema = create_schema(2); +// let arrow_schema = create_arrow_schema(2); +// let partition_spec = create_partition(); +// let partition_type = Any::Struct(partition_spec.partition_type(&schema).unwrap().into()); + +// let to_write = create_batch( +// &arrow_schema, +// vec![ +// vec![1, 2, 3, 1, 2, 3, 1, 2, 3], +// vec![1, 1, 1, 2, 2, 2, 3, 3, 3], +// ], +// ); + +// let builder = FanoutPartitionedWriterBuilder::new( +// TestWriterBuilder {}, +// partition_type, +// partition_spec, +// ); +// let mut writer = builder.build(&arrow_schema).await.unwrap(); +// writer.write(to_write).await.unwrap(); + +// assert_eq!(writer.inner_writers.len(), 3); + +// let expect1 = create_batch(&arrow_schema, vec![vec![1, 1, 1], vec![1, 2, 3]]); +// let expect2 = create_batch(&arrow_schema, vec![vec![2, 2, 2], vec![1, 2, 3]]); +// let expect3 = create_batch(&arrow_schema, vec![vec![3, 3, 3], vec![1, 2, 3]]); +// let actual_res = writer +// .inner_writers +// .values() +// .map(|writer| writer.res()) +// .collect_vec(); +// assert!(actual_res.contains(&expect1)); +// assert!(actual_res.contains(&expect2)); +// assert!(actual_res.contains(&expect3)); +// } + +// // # NOTE +// // The delta writer will put the op vec in the last column, this test case test that the partition will +// // ignore the last column. +// #[tokio::test] +// async fn test_partition_delta_writer() { +// let schema = create_schema(2); +// let arrow_schema = create_arrow_schema(3); +// let partition_spec = create_partition(); +// let partition_type = Any::Struct(partition_spec.partition_type(&schema).unwrap().into()); + +// let builder = FanoutPartitionedWriterBuilder::new( +// TestWriterBuilder {}, +// partition_type, +// partition_spec, +// ); +// let mut writer = builder.build(&arrow_schema).await.unwrap(); + +// let to_write = create_batch( +// &arrow_schema, +// vec![ +// vec![1, 2, 3, 1, 2, 3, 1, 2, 3], +// vec![1, 1, 1, 2, 2, 2, 3, 3, 3], +// vec![3, 2, 1, 1, 3, 2, 2, 1, 3], +// ], +// ); +// writer.write(to_write).await.unwrap(); +// assert_eq!(writer.inner_writers.len(), 3); +// let expect1 = create_batch( +// &arrow_schema, +// vec![vec![1, 1, 1], vec![1, 2, 3], vec![3, 1, 2]], +// ); +// let expect2 = create_batch( +// &arrow_schema, +// vec![vec![2, 2, 2], vec![1, 2, 3], vec![2, 3, 1]], +// ); +// let expect3 = create_batch( +// &arrow_schema, +// vec![vec![3, 3, 3], vec![1, 2, 3], vec![1, 2, 3]], +// ); +// let actual_res = writer +// .inner_writers +// .values() +// .map(|writer| writer.res()) +// .collect_vec(); +// assert!(actual_res.contains(&expect1)); +// assert!(actual_res.contains(&expect2)); +// assert!(actual_res.contains(&expect3)); +// } +// } diff --git a/src/expr/impl/src/external/iceberg.rs b/src/expr/impl/src/external/iceberg.rs new file mode 100644 index 0000000000000..de7aec1504392 --- /dev/null +++ b/src/expr/impl/src/external/iceberg.rs @@ -0,0 +1,164 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::{Decimal, Date, Timestamp, Timestamptz}; +use risingwave_expr::function; + +// Bucket +#[function( + "iceberg_bucket_i32(int4, int4) -> int4", + prebuild = "i32::clone(&$1)" +)] +fn iceberg_bucket_i32(v1: i32, n: &i32) -> i32 { + todo!() +} + +#[function( + "iceberg_bucket_i64(int8, int4) -> int4", + prebuild = "i32::clone(&$1)" +)] +fn iceberg_bucket_i64(v1: i64, n: &i32) -> i32 { + todo!() +} + +#[function( + "iceberg_bucket_decimal(decimal, int4) -> int4", + prebuild = "i32::clone(&$1)" +)] +fn iceberg_bucket_decimal(v1: Decimal, n: &i32) -> i32 { + todo!() +} + +// Truncate +#[function( + "iceberg_truncate_i32(int4, int4) -> int4", + prebuild = "i32::clone(&$1)" +)] +fn iceberg_truncate_i32(v1: i32, n: &i32) -> i32 { + todo!() +} + +trait Time { + fn year(&self) -> i32; + fn month(&self) -> i32; + fn day(&self) -> i32; +} + +impl Time for Date { + fn year(&self) -> i32 { + todo!() + } + + fn month(&self) -> i32 { + todo!() + } + + fn day(&self) -> i32 { + todo!() + } +} + +impl Time for Timestamp { + fn year(&self) -> i32 { + todo!() + } + + fn month(&self) -> i32 { + todo!() + } + + fn day(&self) -> i32 { + todo!() + } +} + +impl Time for Timestamptz { + fn year(&self) -> i32 { + todo!() + } + + fn month(&self) -> i32 { + todo!() + } + + fn day(&self) -> i32 { + todo!() + } +} + +#[function( + "iceberg_year(date) -> int4", +)] +#[function( + "iceberg_year(timestamp) -> int4", +)] +#[function( + "iceberg_year(timestamptz) -> int4", +)] +fn iceberg_year(v1: T) -> i32 { + v1.year() +} + +#[function( + "iceberg_month(date) -> int4", +)] +#[function( + "iceberg_month(timestamp) -> int4", +)] +#[function( + "iceberg_month(timestamptz) -> int4", +)] +fn iceberg_month(v1: T) -> i32 { + v1.month() +} + +#[function( + "iceberg_day(date) -> int4", +)] +#[function( + "iceberg_day(timestamp) -> int4", +)] +#[function( + "iceberg_day(timestamptz) -> int4", +)] +fn iceberg_day(v1: T) -> i32 { + v1.day() +} + +trait Hour { + fn hour(&self) -> i32; +} + +impl Hour for Timestamp { + fn hour(&self) -> i32 { + todo!() + } +} + +impl Hour for Timestamptz { + fn hour(&self) -> i32 { + todo!() + } +} + +#[function( + "iceberg_hour(timestamp) -> int4", +)] +#[function( + "iceberg_hour(timestamptz) -> int4", +)] +fn iceberg_hour(v1: T) -> i32 { + v1.hour() +} + diff --git a/src/expr/impl/src/external/mod.rs b/src/expr/impl/src/external/mod.rs new file mode 100644 index 0000000000000..aa44de6753390 --- /dev/null +++ b/src/expr/impl/src/external/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod iceberg; diff --git a/src/expr/impl/src/lib.rs b/src/expr/impl/src/lib.rs index f6a8a5667e46e..8cae03b1a9b83 100644 --- a/src/expr/impl/src/lib.rs +++ b/src/expr/impl/src/lib.rs @@ -37,6 +37,7 @@ mod aggregate; mod scalar; mod table_function; +mod external; /// Enable functions in this crate. #[macro_export] diff --git a/src/expr/impl/src/scalar/construct_struct.rs b/src/expr/impl/src/scalar/construct_struct.rs new file mode 100644 index 0000000000000..eadb384336eb6 --- /dev/null +++ b/src/expr/impl/src/scalar/construct_struct.rs @@ -0,0 +1,103 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use itertools::Itertools; +use risingwave_common::array::{ArrayImpl, ArrayRef, DataChunk, StructArray}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum, ScalarImpl, StructValue}; +use risingwave_expr::expr::{BoxedExpression, Expression}; +use risingwave_expr::{build_function, Result}; + +#[derive(Debug)] +pub struct ConstructStructExpression { + return_type: DataType, + children: Vec, +} + +#[async_trait::async_trait] +impl Expression for ConstructStructExpression { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + async fn eval(&self, input: &DataChunk) -> Result { + let mut struct_cols = Vec::with_capacity(self.children.len()); + for child in &self.children { + let res = child.eval(&input).await?; + struct_cols.push(res); + } + Ok(Arc::new(ArrayImpl::Struct(StructArray::new( + self.return_type.as_struct().clone(), + struct_cols, + input.visibility().clone(), + )))) + } + + async fn eval_row(&self, input: &OwnedRow) -> Result { + let mut datums = Vec::with_capacity(self.children.len()); + for child in &self.children { + let res = child.eval_row(input).await?; + datums.push(res); + } + Ok(Some(ScalarImpl::Struct(StructValue::new(datums)))) + } +} + +#[build_function("construct_struct(...) -> struct", type_infer = "panic")] +fn build(return_type: DataType, children: Vec) -> Result { + assert!(return_type.is_struct()); + return_type.as_struct().types().zip_eq(children.iter()).for_each(|(ty, child)| { + assert_eq!(*ty, child.return_type()); + }); + + Ok(Box::new(ConstructStructExpression { + return_type, + children, + })) +} + +#[cfg(test)] +mod tests { + use risingwave_common::array::DataChunk; + use risingwave_common::row::Row; + use risingwave_common::test_prelude::DataChunkTestExt; + use risingwave_common::types::ToOwnedDatum; + use risingwave_common::util::iter_util::ZipEqDebug; + use risingwave_expr::expr::build_from_pretty; + + #[tokio::test] + async fn test_construct_struct_expr() { + let expr = build_from_pretty("(construct_struct:struct $0:int4 $1:int4 $2:int4)"); + let (input, expected) = DataChunk::from_pretty( + "i i i + 1 2 3 (1,2,3) + 4 2 1 (4,2,1) + 9 1 3 (9,1,3) + 1 1 1 (1,1,1)", + ) + .split_column_at(3); + + // test eval + let output = expr.eval(&input).await.unwrap(); + assert_eq!(&output, expected.column_at(0)); + + // test eval_row + for (row, expected) in input.rows().zip_eq_debug(expected.rows()) { + let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + assert_eq!(result, expected.datum_at(0).to_owned_datum()); + } + } +} diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index 1d1f3e4bc1500..6b85cde188a85 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -84,3 +84,4 @@ mod trim; mod trim_array; mod tumble; mod upper; +mod construct_struct; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index bb94d17b2f47b..ac7fff9677821 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -36,6 +36,7 @@ fixedbitset = "0.4.2" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" +icelake = { workspace = true } itertools = "0.12" maplit = "1" md5 = "0.7.0" diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 341d6c697c725..6c00a8fee015b 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -792,6 +792,9 @@ impl TestCase { "test_table".into(), format_desc, None, + // # TODO + // Fixed me. It may should not use None here. + None, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index 79ece88731608..cf955ad9dcd8b 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -224,7 +224,17 @@ impl ExprVisitor for ImpureAnalyzer { | expr_node::Type::Greatest | expr_node::Type::Least | expr_node::Type::ConvertFrom - | expr_node::Type::ConvertTo => + | expr_node::Type::ConvertTo + | expr_node::Type::ConstructStruct + | expr_node::Type::IcebergPartition + | expr_node::Type::IcebergBucketI32 + | expr_node::Type::IcebergBucketI64 + | expr_node::Type::IcebergBucketDecimal + | expr_node::Type::IcebergTruncateI32 + | expr_node::Type::IcebergYear + | expr_node::Type::IcebergMonth + | expr_node::Type::IcebergDay + | expr_node::Type::IcebergHour => // expression output is deterministic(same result for the same input) { func_call diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 73060b7a9f965..8f5d463d44c60 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -17,7 +17,10 @@ use std::rc::Rc; use std::sync::{Arc, LazyLock}; use anyhow::Context; +use arrow_schema::DataType as ArrowDataType; use either::Either; +use icelake::catalog::load_catalog; +use icelake::TableIdentifier; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; @@ -27,6 +30,7 @@ use risingwave_common::types::Datum; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, bail_not_implemented, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; @@ -43,6 +47,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; +use super::create_source::UPSTREAM_SOURCE_KEY; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; @@ -52,13 +57,15 @@ use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGene use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::{generic, Explain, LogicalSource, StreamProject}; +use crate::optimizer::plan_node::{ + generic, IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, +}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::resolve_privatelink_in_with_option; -use crate::{Planner, TableCatalog, WithOptions}; +use crate::{Explain, Planner, TableCatalog, WithOptions}; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { let table_factor = TableFactor::Table { @@ -98,6 +105,7 @@ pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, + partition_info: Option, ) -> Result { let db_name = session.database(); let (sink_schema_name, sink_table_name) = @@ -207,6 +215,7 @@ pub fn gen_sink_plan( sink_from_table_name, format_desc, target_table, + partition_info, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -271,6 +280,49 @@ pub fn gen_sink_plan( }) } +// # TODO +// We need return None for range partition +pub async fn get_partition_compute_info( + with_options: &WithOptions, +) -> Option { + let properties = HashMap::from_iter(with_options.clone().into_inner().into_iter()); + let connector = properties.get(UPSTREAM_SOURCE_KEY)?; + match connector.as_str() { + ICEBERG_SINK => { + let iceberg_config = IcebergConfig::from_hashmap(properties).ok()?; + let catalog = load_catalog(&iceberg_config.build_iceberg_configs().ok()?) + .await + .ok()?; + let table_id = TableIdentifier::new(iceberg_config.table_name.split('.')).ok()?; + let table = catalog.load_table(&table_id).await.ok()?; + let partition_spec = table + .current_table_metadata() + .current_partition_spec() + .ok()?; + + if partition_spec.is_unpartitioned() { + return None; + } + + let arrow_type: ArrowDataType = table.current_partition_type().ok()?.try_into().ok()?; + let partition_fields = table + .current_table_metadata() + .current_partition_spec() + .ok()? + .fields + .iter() + .map(|f| (f.name.clone(), f.transform.clone())) + .collect_vec(); + + Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo { + partition_type: arrow_type.try_into().ok()?, + partition_fields, + })) + } + _ => None, + } +} + pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -285,6 +337,8 @@ pub async fn handle_create_sink( return Ok(resp); } + let partition_info = get_partition_compute_info(&handle_args.with_options).await; + let (sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); @@ -293,7 +347,7 @@ pub async fn handle_create_sink( sink_plan: plan, sink_catalog: sink, target_table_catalog, - } = gen_sink_plan(&session, context.clone(), stmt)?; + } = gen_sink_plan(&session, context.clone(), stmt, partition_info)?; let has_order_by = !query.order_by.is_empty(); if has_order_by { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 8e2dc051379f9..8f4d4b6de3b2d 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -24,7 +24,7 @@ use thiserror_ext::AsReport; use super::create_index::gen_create_index_plan; use super::create_mv::gen_create_mv_plan; -use super::create_sink::gen_sink_plan; +use super::create_sink::{gen_sink_plan, get_partition_compute_info}; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; @@ -84,6 +84,13 @@ async fn do_handle_explain( let context = plan.ctx(); (Ok(plan), context) } + Statement::CreateSink { stmt } => { + let partition_info = get_partition_compute_info(context.with_options()).await; + let plan = gen_sink_plan(&session, context.into(), stmt, partition_info) + .map(|plan| plan.sink_plan)?; + let context = plan.ctx(); + (Ok(plan), context) + } // For other queries without `await` point, we can keep a copy of reference to the // `OptimizerContext` even if the planning fails. This enables us to log the partial @@ -110,10 +117,6 @@ async fn do_handle_explain( ) .map(|x| x.0), - Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan) - } - Statement::CreateIndex { name, table_name, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f6de31e4b6990..64a4dbd59eaf8 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -59,7 +59,7 @@ use self::plan_node::generic::{self, PhysicalPlanRef}; use self::plan_node::{ stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, - ToStreamContext, + ToStreamContext, PartitionComputeInfo, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -718,6 +718,7 @@ impl PlanRoot { sink_from_table_name: String, format_desc: Option, target_table: Option, + partition_info: Option, ) -> Result { let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; @@ -734,6 +735,7 @@ impl PlanRoot { definition, properties, format_desc, + partition_info, ) } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 65a87af0c371a..2922ef1b88022 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -936,7 +936,7 @@ pub use stream_project_set::StreamProjectSet; pub use stream_row_id_gen::StreamRowIdGen; pub use stream_share::StreamShare; pub use stream_simple_agg::StreamSimpleAgg; -pub use stream_sink::StreamSink; +pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink}; pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 68814531d9293..2ed870034f979 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -17,6 +17,7 @@ use std::io::{Error, ErrorKind}; use anyhow::anyhow; use fixedbitset::FixedBitSet; +use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, Field, TableId}; @@ -25,22 +26,26 @@ use risingwave_common::constants::log_store::{ }; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::iceberg::ICEBERG_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; +use risingwave_pb::expr::expr_node::Type; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use tracing::info; use super::derive::{derive_columns, derive_pk}; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef}; use super::stream::prelude::*; use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder}; -use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject}; +use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -49,6 +54,169 @@ use crate::{TableCatalog, WithOptions}; const DOWNSTREAM_PK_KEY: &str = "primary_key"; +/// ## Why we need `PartitionComputeInfo`? +/// +/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink(https://iceberg.apache.org/spec/#partitioning) +/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in https://github.com/risingwavelabs/rfcs/pull/77. +/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink. +/// +/// ## What is `PartitionComputeInfo`? +/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use +/// these information to create the corresponding expression in `StreamProject` node. +/// +/// #TODO +/// Maybe we should move this in sink? +pub enum PartitionComputeInfo { + Iceberg(IcebergPartitionInfo), +} + +impl PartitionComputeInfo { + pub fn to_expression(self, columns: &[ColumnCatalog]) -> Result { + match self { + PartitionComputeInfo::Iceberg(info) => info.to_expression(columns), + } + } +} + +pub struct IcebergPartitionInfo { + pub partition_type: DataType, + // (partition_field_name, partition_field_transform) + pub partition_fields: Vec<(String, Transform)>, +} + +impl IcebergPartitionInfo { + #[inline] + fn transform_to_expression( + transform: &Transform, + col_id: usize, + columns: &[ColumnCatalog], + ) -> Result { + match transform { + Transform::Identity => Ok(ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )), + Transform::Void => Ok(ExprImpl::literal_null( + columns[col_id].column_desc.data_type.clone(), + )), + Transform::Bucket(n) => { + let func_type = match &columns[col_id].column_desc.data_type { + DataType::Int32 => Type::IcebergBucketI32, + DataType::Int64 => Type::IcebergBucketI64, + DataType::Decimal => Type::IcebergBucketDecimal, + _ => unreachable!(), + }; + Ok(ExprImpl::FunctionCall( + FunctionCall::new( + func_type, + vec![ + ExprImpl::InputRef( + InputRef::new( + col_id, + columns[col_id].column_desc.data_type.clone(), + ) + .into(), + ), + ExprImpl::literal_int(*n), + ], + ) + .unwrap() + .into(), + )) + } + Transform::Truncate(w) => { + let func_type = match &columns[col_id].column_desc.data_type { + DataType::Int32 => Type::IcebergTruncateI32, + _ => unreachable!(), + }; + Ok(ExprImpl::FunctionCall( + FunctionCall::new( + func_type, + vec![ + ExprImpl::InputRef( + InputRef::new( + col_id, + columns[col_id].column_desc.data_type.clone(), + ) + .into(), + ), + ExprImpl::literal_int(*w), + ], + ) + .unwrap() + .into(), + )) + } + Transform::Year => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergYear, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + Transform::Month => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergMonth, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + Transform::Day => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergDay, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + Transform::Hour => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergHour, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + } + } + + pub fn to_expression(self, columns: &[ColumnCatalog]) -> Result { + let child_exprs = self + .partition_fields + .into_iter() + .map(|(field_name, transform)| { + let col_id = find_column_idx_by_name(columns, &field_name)?; + Self::transform_to_expression(&transform, col_id, columns) + }) + .collect::>>()?; + Ok(ExprImpl::FunctionCall( + FunctionCall::new(Type::ConstructStruct, child_exprs)?.into(), + )) + } +} +#[inline] +fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result { + columns + .iter() + .position(|col| col.column_desc.name == col_name) + .ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name) + ))) + .into() + }) +} + /// [`StreamSink`] represents a table/connector sink at the very end of the graph. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSink { @@ -90,6 +258,7 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (input, sink) = Self::derive_sink_desc( @@ -104,6 +273,7 @@ impl StreamSink { definition, properties, format_desc, + partition_info, )?; // check and ensure that the sink connector is specified and supported @@ -127,9 +297,40 @@ impl StreamSink { Ok(Self::new(input, sink)) } + fn derive_iceberg_sink_distribution( + input: PlanRef, + partition_info: Option, + columns: &[ColumnCatalog], + ) -> Result<(RequiredDist, PlanRef)> { + // For here, we need to add the plan node to compute the partition value, and add it as a extra column. + if let Some(partition_info) = partition_info { + let input_fields = input.schema().fields(); + + let mut exprs: Vec<_> = input_fields + .iter() + .enumerate() + .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into()) + .collect(); + + // Add the partition compute expression to the end of the exprs + exprs.push(partition_info.to_expression(columns)?.into()); + let partition_col_idx = exprs.len() - 1; + let project = StreamProject::new(generic::Project::new(exprs.clone(), input)); + Ok(( + RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]), + project.into(), + )) + } else { + Ok(( + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()), + input, + )) + } + } + #[allow(clippy::too_many_arguments)] fn derive_sink_desc( - input: PlanRef, + mut input: PlanRef, user_distributed_by: RequiredDist, name: String, db_name: String, @@ -140,24 +341,33 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result<(PlanRef, SinkDesc)> { + let connector_name = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + "connector not specified when create sink", + ))) + })?; + let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut downstream_pk = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { - match properties.get("connector") { - Some(s) if s == "deltalake" => { + match connector_name.to_lowercase().as_str() { + "deltalake" => { // iceberg with multiple parallelism will fail easily with concurrent commit // on metadata // TODO: reset iceberg sink to have multiple parallelism info!("setting iceberg sink parallelism to singleton"); RequiredDist::single() } - Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => { + s if s == "jdbc" && sink_type == SinkType::Upsert => { if sink_type == SinkType::Upsert && downstream_pk.is_empty() { return Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, @@ -172,6 +382,19 @@ impl StreamSink { // lock contentions RequiredDist::hash_shard(downstream_pk.as_slice()) } + ICEBERG_SINK => { + // If user doesn't specify the downstream primary key, we use the stream key as the pk. + if sink_type.is_upsert() && downstream_pk.is_empty() { + downstream_pk = pk.iter().map(|k| k.column_index).collect_vec(); + } + let (required_dist, new_input) = Self::derive_iceberg_sink_distribution( + input, + partition_info, + &columns, + )?; + input = new_input; + required_dist + } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); if downstream_pk.is_empty() { @@ -313,19 +536,7 @@ impl StreamSink { if trimmed_key.is_empty() { continue; } - match columns - .iter() - .position(|col| col.column_desc.name == trimmed_key) - { - Some(index) => downstream_pk_indices.push(index), - None => { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", trimmed_key), - ))) - .into()); - } - } + downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?); } Ok(downstream_pk_indices) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 86e8dd5ec47c3..32f398c6c5349 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -93,7 +93,13 @@ impl SinkExecutor { .iter() .map(|column| Field::from(&column.column_desc)) .collect(); - assert_eq!(input_schema.data_types(), info.schema.data_types()); + // # TODO + // Do we have more general way to check this? + if let Some(last_field) = info.schema.fields.last() && last_field.name == "_rw_partition"{ + assert_eq!(input_schema.data_types(), info.schema.data_types()[..info.schema.fields.len()-1]); + } else { + assert_eq!(input_schema.data_types(), info.schema.data_types()); + } Ok(Self { actor_context,