From 76b69310dc54e31e9c3a1d796980ef7f9eebb9a8 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sun, 7 Jan 2024 22:04:36 +0800 Subject: [PATCH] support partition compute for sink --- Cargo.lock | 15 +- Cargo.toml | 5 +- .../iceberg/start_spark_connect_server.sh | 9 +- proto/expr.proto | 12 +- proto/stream_plan.proto | 1 + src/common/src/array/arrow/arrow_impl.rs | 9 +- src/connector/Cargo.toml | 2 + src/connector/src/sink/catalog/desc.rs | 5 + src/connector/src/sink/iceberg/mod.rs | 179 ++++++--- .../iceberg/precomputed_partition_writer.rs | 284 +++++++++++++ src/connector/src/sink/mod.rs | 2 + src/expr/core/Cargo.toml | 1 + src/expr/core/src/expr/build.rs | 16 +- src/expr/core/src/expr/expr_some_all.rs | 10 +- src/expr/core/src/expr/external/iceberg.rs | 378 ++++++++++++++++++ src/expr/core/src/expr/external/mod.rs | 15 + src/expr/core/src/expr/mod.rs | 1 + src/expr/impl/src/scalar/construct_struct.rs | 109 +++++ src/expr/impl/src/scalar/mod.rs | 1 + src/frontend/Cargo.toml | 1 + src/frontend/planner_test/src/lib.rs | 3 + src/frontend/src/expr/pure.rs | 9 +- src/frontend/src/handler/create_sink.rs | 88 +++- src/frontend/src/handler/explain.rs | 13 +- src/frontend/src/optimizer/mod.rs | 6 +- src/frontend/src/optimizer/plan_node/mod.rs | 2 +- .../src/optimizer/plan_node/stream_sink.rs | 239 ++++++++++- src/stream/src/executor/sink.rs | 13 +- src/stream/src/from_proto/sink.rs | 1 + src/workspace-hack/Cargo.toml | 7 +- 30 files changed, 1333 insertions(+), 103 deletions(-) create mode 100644 src/connector/src/sink/iceberg/precomputed_partition_writer.rs create mode 100644 src/expr/core/src/expr/external/iceberg.rs create mode 100644 src/expr/core/src/expr/external/mod.rs create mode 100644 src/expr/impl/src/scalar/construct_struct.rs diff --git a/Cargo.lock b/Cargo.lock index ed87be2ae251d..11b63fc585a07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4689,7 +4689,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=3f7b53ba5b563524212c25810345d1314678e7fc#3f7b53ba5b563524212c25810345d1314678e7fc" +source = "git+https://github.com/ZENOTME/icelake?branch=transform#5f4e462eb3ad6d92b585949938e8fd1a3b4f55f9" dependencies = [ "anyhow", "apache-avro 0.15.0", @@ -7353,7 +7353,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.10.5", "log", "multimap 0.8.3", "once_cell", @@ -7387,7 +7387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.37", @@ -8425,7 +8425,9 @@ dependencies = [ "anyhow", "apache-avro 0.16.0", "arrow-array 49.0.0", + "arrow-row 49.0.0", "arrow-schema 49.0.0", + "arrow-select 49.0.0", "async-nats", "async-trait", "auto_enums", @@ -8607,6 +8609,7 @@ dependencies = [ "expect-test", "futures-async-stream", "futures-util", + "icelake", "itertools 0.12.0", "madsim-tokio", "num-traits", @@ -8697,6 +8700,7 @@ dependencies = [ "futures", "futures-async-stream", "iana-time-zone", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", @@ -12459,7 +12463,7 @@ dependencies = [ "hmac", "hyper", "indexmap 1.9.3", - "itertools 0.11.0", + "itertools 0.10.5", "jni", "lazy_static", "lexical-core", @@ -12542,6 +12546,9 @@ dependencies = [ "url", "uuid", "whoami", + "zstd 0.13.0", + "zstd-safe 7.0.0", + "zstd-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a46874f9e3da6..917260c74f357 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,7 +119,10 @@ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [ +#icelake = { git = "https://github.com/ZENOTME/icelake", rev = "3f7b53ba5b563524212c25810345d1314678e7fc", features = [ +# "prometheus", +#] } +icelake = { git = "https://github.com/ZENOTME/icelake", branch = "transform", features = [ "prometheus", ] } arrow-array = "49" diff --git a/e2e_test/iceberg/start_spark_connect_server.sh b/e2e_test/iceberg/start_spark_connect_server.sh index 827b8dde86179..13fb6ad3fa3d7 100755 --- a/e2e_test/iceberg/start_spark_connect_server.sh +++ b/e2e_test/iceberg/start_spark_connect_server.sh @@ -8,9 +8,10 @@ PACKAGES="$PACKAGES,org.apache.spark:spark-connect_2.12:$SPARK_VERSION" SPARK_FILE="spark-${SPARK_VERSION}-bin-hadoop3.tgz" - -wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE -tar -xzf $SPARK_FILE --no-same-owner +if [ ! -d "spark-${SPARK_VERSION}-bin-hadoop3" ];then + wget https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/$SPARK_FILE + tar -xzf $SPARK_FILE --no-same-owner +fi ./spark-${SPARK_VERSION}-bin-hadoop3/sbin/start-connect-server.sh --packages $PACKAGES \ --master local[3] \ @@ -29,4 +30,4 @@ while ! nc -z localhost 15002; do sleep 1 # wait for 1/10 of the second before check again done -echo "Spark connect server launched" \ No newline at end of file +echo "Spark connect server launched" diff --git a/proto/expr.proto b/proto/expr.proto index 4b195183c4df2..e1b1e13be60be 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -259,6 +259,8 @@ message ExprNode { JSONB_PATH_QUERY_ARRAY = 622; JSONB_PATH_QUERY_FIRST = 623; + CONSTRUCT_STRUCT = 624; + // Non-pure functions below (> 1000) // ------------------------ // Internal functions @@ -274,6 +276,14 @@ message ExprNode { // System information functions PG_GET_INDEXDEF = 2400; COL_DESCRIPTION = 2401; + + // EXTERNAL + ICEBERG_BUCKET = 2201; + ICEBERG_TRUNCATE = 2202; + ICEBERG_YEAR = 2203; + ICEBERG_MONTH = 2204; + ICEBERG_DAY = 2205; + ICEBERG_HOUR = 2206; } Type function_type = 1; data.DataType return_type = 3; @@ -283,7 +293,7 @@ message ExprNode { FunctionCall func_call = 6; UserDefinedFunction udf = 7; NowRexNode now = 8; - } + } } message TableFunction { diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 04ba2246bb859..02453448bb4f3 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -223,6 +223,7 @@ message SinkDesc { string sink_from_name = 12; catalog.SinkFormatDesc format_desc = 13; optional uint32 target_table = 14; + bool with_extra_partition_col = 15; } enum SinkLogStoreType { diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 4eae49ac6ac00..1312c69f80d83 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -133,6 +133,7 @@ macro_rules! converts_generic { fn try_from(array: &ArrayImpl) -> Result { match array { $($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)* + ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())), _ => todo!("unsupported array"), } } @@ -152,6 +153,13 @@ macro_rules! converts_generic { .unwrap() .try_into()?, )),)* + Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz( + array + .as_any() + .downcast_ref::() + .unwrap() + .try_into()?, + )), t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))), } } @@ -173,7 +181,6 @@ converts_generic! { { arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 }, { arrow_array::Date32Array, Date32, ArrayImpl::Date }, { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp }, - { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz }, { arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time }, { arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval }, { arrow_array::StructArray, Struct(_), ArrayImpl::Struct }, diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 82303fd620f29..6eeb13f820de8 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c "xz", ] } arrow-array = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } async-nats = "0.33" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 513a39ae90e31..6d3840acb49e9 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -67,6 +67,10 @@ pub struct SinkDesc { /// Id of the target table for sink into table. pub target_table: Option, + + /// Indicate whether the sink accepts the data chunk with extra partition column. + /// For more detil of partition column, see `PartitionComputeInfo` + pub with_extra_partition_col: bool, } impl SinkDesc { @@ -123,6 +127,7 @@ impl SinkDesc { db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), target_table: self.target_table.map(|table_id| table_id.table_id()), + with_extra_partition_col: self.with_extra_partition_col, } } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index b2cbcef9f6712..e927a4d569e05 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod precomputed_partition_writer; mod prometheus; use std::collections::HashMap; @@ -19,7 +20,9 @@ use std::fmt::Debug; use std::ops::Deref; use anyhow::anyhow; -use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use async_trait::async_trait; use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; @@ -44,7 +47,6 @@ use url::Url; use with_options::WithOptions; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; -use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -152,7 +154,7 @@ impl IcebergConfig { Ok(config) } - fn build_iceberg_configs(&self) -> Result> { + pub fn build_iceberg_configs(&self) -> Result> { let mut iceberg_configs = HashMap::new(); let catalog_type = self @@ -380,6 +382,28 @@ enum IcebergWriterEnum { } impl IcebergWriter { + fn schema_with_extra_partition_col(table: &Table) -> Result { + let schema = table.current_arrow_schema()?; + + let mut fields = schema.fields().to_vec(); + let partition_type = + if let ArrowDataType::Struct(s) = table.current_partition_type()?.try_into()? { + let fields = Fields::from( + s.into_iter() + .enumerate() + .map(|(id, field)| { + ArrowField::new(format!("f{id}"), field.data_type().clone(), true) + }) + .collect::>(), + ); + ArrowDataType::Struct(fields) + } else { + unimplemented!() + }; + fields.push(ArrowField::new("_rw_partition", partition_type, false).into()); + Ok(ArrowSchema::new(fields).into()) + } + pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; @@ -391,30 +415,56 @@ impl IcebergWriter { .iceberg_rolling_unflushed_data_file .clone(), )); - let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); - let dispatch_builder = builder_helper - .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; - // wrap a layer with collect write metrics - let prometheus_builder = PrometheusWriterBuilder::new( - dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), - ); - let schema = table.current_arrow_schema()?; - let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); - Ok(Self { - inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), - schema, - }) + if writer_param.with_extra_partition_col { + let partition_data_file_builder = + precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new( + data_file_builder.clone(), + table.current_partition_type()?, + table.current_arrow_schema()?.fields().len(), + ); + let dispatch_builder = builder_helper + .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = Self::schema_with_extra_partition_col(&table)?; + let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), + schema, + }) + } else { + let partition_data_file_builder = + builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?; + let dispatch_builder = builder_helper + .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = table.current_arrow_schema()?; + let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), + schema, + }) + } } pub async fn new_upsert( @@ -446,30 +496,57 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(delta_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); - let dispatch_builder = - builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; - // wrap a layer with collect write metrics - let prometheus_builder = PrometheusWriterBuilder::new( - dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), - ); - let schema = table.current_arrow_schema()?; - let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); - Ok(Self { - inner_writer: IcebergWriterEnum::Upsert(inner_writer), - schema, - }) + if writer_param.with_extra_partition_col { + let partition_delta_builder = + precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new( + delta_builder.clone(), + table.current_partition_type()?, + table.current_arrow_schema()?.fields().len(), + ); + let dispatch_builder = + builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = Self::schema_with_extra_partition_col(&table)?; + let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::Upsert(inner_writer), + schema, + }) + } else { + let partition_delta_builder = + builder_helper.fanout_partition_writer_builder(delta_builder.clone())?; + let dispatch_builder = + builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = table.current_arrow_schema()?; + let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::Upsert(inner_writer), + schema, + }) + } } } diff --git a/src/connector/src/sink/iceberg/precomputed_partition_writer.rs b/src/connector/src/sink/iceberg/precomputed_partition_writer.rs new file mode 100644 index 0000000000000..a8098d568e41c --- /dev/null +++ b/src/connector/src/sink/iceberg/precomputed_partition_writer.rs @@ -0,0 +1,284 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use arrow_array::{BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, SchemaRef}; +use arrow_select::filter::filter_record_batch; +use icelake::io_v2::{IcebergWriteResult, IcebergWriter, IcebergWriterBuilder}; +use icelake::types::{struct_to_anyvalue_array_with_type, Any, AnyValue}; +use risingwave_common::util::iter_util::ZipEqFast; + +#[derive(Clone)] +pub struct PrecomputedPartitionedWriterBuilder { + inner: B, + partition_type: Any, + partition_col_idx: usize, +} + +impl PrecomputedPartitionedWriterBuilder { + pub fn new(inner: B, partition_type: Any, partition_col_idx: usize) -> Self { + Self { + inner, + partition_type, + partition_col_idx, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for PrecomputedPartitionedWriterBuilder { + type R = PrecomputedPartitionedWriter; + + async fn build(self, schema: &SchemaRef) -> icelake::Result { + let mut new_schema = schema.as_ref().clone(); + let remove_field = new_schema.remove(self.partition_col_idx); + + // Check the remove type is the same as the partition type. + let expect_partition_type = + self.partition_type.clone().try_into().map_err(|e| { + icelake::Error::new(icelake::ErrorKind::ArrowError, format!("{}", e)) + })?; + let is_same = if let DataType::Struct(fields) = remove_field.data_type() + && let DataType::Struct(expected_fields) = &expect_partition_type + { + fields + .iter() + .zip_eq_fast(expected_fields.iter()) + .all(|(f1, f2)| f1.data_type() == f2.data_type()) + } else { + false + }; + if !is_same { + return Err(icelake::Error::new( + icelake::ErrorKind::ArrowError, + format!( + "Expect partition type {:?}, but got {:?}", + expect_partition_type, + remove_field.data_type() + ), + )); + } + + let arrow_partition_type_fields = + if let DataType::Struct(fields) = self.partition_type.clone().try_into()? { + fields + } else { + unreachable!() + }; + let row_converter = RowConverter::new(vec![SortField::new(DataType::Struct( + arrow_partition_type_fields.clone(), + ))]) + .map_err(|e| icelake::Error::new(icelake::ErrorKind::ArrowError, e.to_string()))?; + + Ok(PrecomputedPartitionedWriter { + inner_writers: HashMap::new(), + inner_buidler: self.inner, + schema: new_schema.into(), + partition_type: self.partition_type, + row_converter, + partition_col_idx: self.partition_col_idx, + }) + } +} + +/// Partition append only writer +pub struct PrecomputedPartitionedWriter { + inner_writers: HashMap, + partition_type: Any, + row_converter: RowConverter, + inner_buidler: B, + schema: SchemaRef, + partition_col_idx: usize, +} + +#[async_trait::async_trait] +impl IcebergWriter for PrecomputedPartitionedWriter { + type R = <::R as IcebergWriter>::R; + + /// Write a record batch. The `DataFileWriter` will create a new file when the current row num is greater than `target_file_row_num`. + async fn write(&mut self, mut batch: RecordBatch) -> icelake::Result<()> { + // Remove the partition value from batch. + let partition_value = batch.remove_column(self.partition_col_idx); + let value_array = struct_to_anyvalue_array_with_type( + partition_value + .as_any() + .downcast_ref::() + .unwrap(), + self.partition_type.clone(), + )?; + + // Group the batch by row value. + // # TODO + // Maybe optimize the alloc and clone + let rows = self + .row_converter + .convert_columns(&[partition_value.clone()]) + .map_err(|e| icelake::Error::new(icelake::ErrorKind::ArrowError, e.to_string()))?; + let mut filters = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + filters + .entry(row.owned()) + .or_insert_with(|| (vec![false; batch.num_rows()], row_id)) + .0[row_id] = true; + }); + + for (row, filter) in filters { + let filter_array: BooleanArray = filter.0.into(); + let partial_batch = filter_record_batch(&batch, &filter_array) + .expect("We should guarantee the filter array is valid"); + match self.inner_writers.entry(row) { + Entry::Occupied(mut writer) => { + writer.get_mut().0.write(partial_batch).await?; + } + Entry::Vacant(vacant) => { + let value = value_array.get(filter.1).unwrap().as_ref().unwrap().clone(); + if let AnyValue::Struct(value) = value { + let new_writer = self.inner_buidler.clone().build(&self.schema).await?; + vacant + .insert((new_writer, value)) + .0 + .write(partial_batch) + .await?; + } else { + unreachable!() + } + } + } + } + Ok(()) + } + + /// Complte the write and return the list of `DataFile` as result. + async fn flush(&mut self) -> icelake::Result> { + let mut res_vec = vec![]; + let inner_writers = std::mem::take(&mut self.inner_writers); + for (_key, (mut writer, value)) in inner_writers { + let mut res = writer.flush().await?; + res.iter_mut().for_each(|res| { + res.set_partition(Some(value.clone())); + }); + res_vec.extend(res); + } + Ok(res_vec) + } +} + +// #[cfg(test)] +// mod test { +// use icelake::types::{PartitionSpec, PartitionField}; +// use itertools::Itertools; + +// pub fn create_partition() -> PartitionSpec { +// PartitionSpec { +// spec_id: 1, +// fields: vec![PartitionField { +// source_column_id: 1, +// partition_field_id: 1, +// transform: crate::types::Transform::Identity, +// name: "col1".to_string(), +// }], +// } +// } + +// #[tokio::test] +// async fn test_partition_writer() { +// let schema = create_schema(2); +// let arrow_schema = create_arrow_schema(2); +// let partition_spec = create_partition(); +// let partition_type = Any::Struct(partition_spec.partition_type(&schema).unwrap().into()); + +// let to_write = create_batch( +// &arrow_schema, +// vec![ +// vec![1, 2, 3, 1, 2, 3, 1, 2, 3], +// vec![1, 1, 1, 2, 2, 2, 3, 3, 3], +// ], +// ); + +// let builder = FanoutPartitionedWriterBuilder::new( +// TestWriterBuilder {}, +// partition_type, +// partition_spec, +// ); +// let mut writer = builder.build(&arrow_schema).await.unwrap(); +// writer.write(to_write).await.unwrap(); + +// assert_eq!(writer.inner_writers.len(), 3); + +// let expect1 = create_batch(&arrow_schema, vec![vec![1, 1, 1], vec![1, 2, 3]]); +// let expect2 = create_batch(&arrow_schema, vec![vec![2, 2, 2], vec![1, 2, 3]]); +// let expect3 = create_batch(&arrow_schema, vec![vec![3, 3, 3], vec![1, 2, 3]]); +// let actual_res = writer +// .inner_writers +// .values() +// .map(|writer| writer.res()) +// .collect_vec(); +// assert!(actual_res.contains(&expect1)); +// assert!(actual_res.contains(&expect2)); +// assert!(actual_res.contains(&expect3)); +// } + +// // # NOTE +// // The delta writer will put the op vec in the last column, this test case test that the partition will +// // ignore the last column. +// #[tokio::test] +// async fn test_partition_delta_writer() { +// let schema = create_schema(2); +// let arrow_schema = create_arrow_schema(3); +// let partition_spec = create_partition(); +// let partition_type = Any::Struct(partition_spec.partition_type(&schema).unwrap().into()); + +// let builder = FanoutPartitionedWriterBuilder::new( +// TestWriterBuilder {}, +// partition_type, +// partition_spec, +// ); +// let mut writer = builder.build(&arrow_schema).await.unwrap(); + +// let to_write = create_batch( +// &arrow_schema, +// vec![ +// vec![1, 2, 3, 1, 2, 3, 1, 2, 3], +// vec![1, 1, 1, 2, 2, 2, 3, 3, 3], +// vec![3, 2, 1, 1, 3, 2, 2, 1, 3], +// ], +// ); +// writer.write(to_write).await.unwrap(); +// assert_eq!(writer.inner_writers.len(), 3); +// let expect1 = create_batch( +// &arrow_schema, +// vec![vec![1, 1, 1], vec![1, 2, 3], vec![3, 1, 2]], +// ); +// let expect2 = create_batch( +// &arrow_schema, +// vec![vec![2, 2, 2], vec![1, 2, 3], vec![2, 3, 1]], +// ); +// let expect3 = create_batch( +// &arrow_schema, +// vec![vec![3, 3, 3], vec![1, 2, 3], vec![1, 2, 3]], +// ); +// let actual_res = writer +// .inner_writers +// .values() +// .map(|writer| writer.res()) +// .collect_vec(); +// assert!(actual_res.contains(&expect1)); +// assert!(actual_res.contains(&expect2)); +// assert!(actual_res.contains(&expect3)); +// } +// } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7bbd42edcd338..35c9db6b5a29a 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -265,6 +265,7 @@ pub struct SinkWriterParam { pub vnode_bitmap: Option, pub meta_client: Option, pub sink_metrics: SinkMetrics, + pub with_extra_partition_col: bool, } impl SinkWriterParam { @@ -275,6 +276,7 @@ impl SinkWriterParam { vnode_bitmap: Default::default(), meta_client: Default::default(), sink_metrics: SinkMetrics::for_test(), + with_extra_partition_col: Default::default(), } } } diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 1b5fcb7287ebf..a260d252b4b97 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -34,6 +34,7 @@ either = "1" enum-as-inner = "0.6" futures-async-stream = { workspace = true } futures-util = "0.3" +icelake = { workspace = true } itertools = "0.12" num-traits = "0.2" parse-display = "0.8" diff --git a/src/expr/core/src/expr/build.rs b/src/expr/core/src/expr/build.rs index 16319a01bbb5f..e3f6c108dfa3b 100644 --- a/src/expr/core/src/expr/build.rs +++ b/src/expr/core/src/expr/build.rs @@ -27,7 +27,7 @@ use super::wrapper::non_strict::NonStrict; use super::wrapper::EvalErrorReport; use super::NonStrictExpression; use crate::expr::{ - BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, + external, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression, }; use crate::sig::FUNCTION_REGISTRY; use crate::{bail, ExprError, Result}; @@ -110,6 +110,16 @@ where // Dedicated types E::All | E::Some => SomeAllExpression::build_boxed(prost, build_child), + // Iceberg partition transform functions + // # TODO + // Move to general types + E::IcebergBucket => external::iceberg::Bucket::build_boxed(prost, build_child), + E::IcebergTruncate => external::iceberg::Truncate::build_boxed(prost, build_child), + E::IcebergYear => external::iceberg::Year::build_boxed(prost, build_child), + E::IcebergMonth => external::iceberg::Month::build_boxed(prost, build_child), + E::IcebergDay => external::iceberg::Day::build_boxed(prost, build_child), + E::IcebergHour => external::iceberg::Hour::build_boxed(prost, build_child), + // General types, lookup in the function signature map _ => FuncCallBuilder::build_boxed(prost, build_child), }, @@ -216,7 +226,9 @@ pub fn build_func_non_strict( Ok(wrapped) } -pub(super) fn get_children_and_return_type(prost: &ExprNode) -> Result<(&[ExprNode], DataType)> { +pub fn get_children_and_return_type_for_func_call( + prost: &ExprNode, +) -> Result<(&[ExprNode], DataType)> { let ret_type = DataType::from(prost.get_return_type().unwrap()); if let RexNode::FuncCall(func_call) = prost.get_rex_node().unwrap() { Ok((func_call.get_children(), ret_type)) diff --git a/src/expr/core/src/expr/expr_some_all.rs b/src/expr/core/src/expr/expr_some_all.rs index 9250aa6d877cf..cb408211b7a3c 100644 --- a/src/expr/core/src/expr/expr_some_all.rs +++ b/src/expr/core/src/expr/expr_some_all.rs @@ -22,7 +22,7 @@ use risingwave_common::{bail, ensure}; use risingwave_pb::expr::expr_node::{RexNode, Type}; use risingwave_pb::expr::{ExprNode, FunctionCall}; -use super::build::get_children_and_return_type; +use super::build::get_children_and_return_type_for_func_call; use super::{BoxedExpression, Build, Expression}; use crate::Result; @@ -211,17 +211,19 @@ impl Build for SomeAllExpression { build_child: impl Fn(&ExprNode) -> Result, ) -> Result { let outer_expr_type = prost.get_function_type().unwrap(); - let (outer_children, outer_return_type) = get_children_and_return_type(prost)?; + let (outer_children, outer_return_type) = + get_children_and_return_type_for_func_call(prost)?; ensure!(matches!(outer_return_type, DataType::Boolean)); let mut inner_expr_type = outer_children[0].get_function_type().unwrap(); let (mut inner_children, mut inner_return_type) = - get_children_and_return_type(&outer_children[0])?; + get_children_and_return_type_for_func_call(&outer_children[0])?; let mut stack = vec![]; while inner_children.len() != 2 { stack.push((inner_expr_type, inner_return_type)); inner_expr_type = inner_children[0].get_function_type().unwrap(); - (inner_children, inner_return_type) = get_children_and_return_type(&inner_children[0])?; + (inner_children, inner_return_type) = + get_children_and_return_type_for_func_call(&inner_children[0])?; } let left_expr = build_child(&inner_children[0])?; diff --git a/src/expr/core/src/expr/external/iceberg.rs b/src/expr/core/src/expr/external/iceberg.rs new file mode 100644 index 0000000000000..d151223705cfd --- /dev/null +++ b/src/expr/core/src/expr/external/iceberg.rs @@ -0,0 +1,378 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use icelake::types::{ + Bucket as BucketTransform, Day as DayTransform, Hour as HourTransform, Month as MonthTransform, + TransformFunction, Truncate as TruncateTransform, Year as YearTransform, +}; +use risingwave_common::array::{ArrayRef, DataChunk}; +use risingwave_common::ensure; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum}; +use risingwave_expr::expr::{get_children_and_return_type_for_func_call, BoxedExpression, Build}; +use risingwave_expr::Result; +use risingwave_pb::expr::ExprNode; + +// Bucket +pub struct Bucket { + child: BoxedExpression, + n: i32, + transform: BucketTransform, +} + +impl Debug for Bucket { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Bucket({})", self.n) + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Bucket { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Bucket { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + // # TODO + // Check the first child type here. + ensure!(children.len() == 2); + ensure!(res_type == DataType::Int32); + + // Get the second child as const param + ensure!(DataType::Int32 == children[1].get_return_type().unwrap().into()); + let literal = build_child(&children[1])?; + let n = *literal.eval_const()?.unwrap().as_int32(); + + // Build the child + let child = build_child(&children[0])?; + Ok(Bucket { + child, + n, + transform: BucketTransform::new(n), + }) + } +} + +// Truncate +pub struct Truncate { + child: BoxedExpression, + w: i32, + transform: TruncateTransform, +} + +impl Debug for Truncate { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Truncate({})", self.w) + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Truncate { + fn return_type(&self) -> DataType { + self.child.return_type() + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Truncate { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + // # TODO + // Check the first child type here. + ensure!(children.len() == 2); + ensure!(res_type == children[0].get_return_type().unwrap().into()); + + // Get the second child as const param + ensure!(DataType::Int32 == children[1].get_return_type().unwrap().into()); + let literal = build_child(&children[1])?; + let w = *literal.eval_const()?.unwrap().as_int32(); + + // Build the child + let child = build_child(&children[0])?; + Ok(Truncate { + child, + w, + transform: TruncateTransform::new(w), + }) + } +} + +// Year +pub struct Year { + child: BoxedExpression, + transform: YearTransform, +} + +impl Debug for Year { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Year") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Year { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Year { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + // # TODO + // Check the first child type here. + ensure!(children.len() == 1); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Year { + child, + transform: YearTransform {}, + }) + } +} + +// Month +pub struct Month { + child: BoxedExpression, + transform: MonthTransform, +} + +impl Debug for Month { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Month") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Month { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Month { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + // # TODO + // Check the first child type here. + ensure!(children.len() == 1); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Month { + child, + transform: MonthTransform {}, + }) + } +} + +// Day +pub struct Day { + child: BoxedExpression, + transform: DayTransform, +} + +impl Debug for Day { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Day") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Day { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Day { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + // # TODO + // Check the first child type here. + ensure!(children.len() == 1); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Day { + child, + transform: DayTransform {}, + }) + } +} + +// Hour +pub struct Hour { + child: BoxedExpression, + transform: HourTransform, +} + +impl Debug for Hour { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Iceberg_Hour") + } +} + +#[async_trait::async_trait] +impl risingwave_expr::expr::Expression for Hour { + fn return_type(&self) -> DataType { + DataType::Int32 + } + + async fn eval(&self, data_chunk: &DataChunk) -> Result { + // Get the child array + let array = self.child.eval(data_chunk).await?; + // Convert to arrow array + let arrow_array = array.as_ref().try_into().unwrap(); + // Transform + let res_array = self.transform.transform(arrow_array).unwrap(); + // Convert back to array ref and return it + Ok(Arc::new((&res_array).try_into().unwrap())) + } + + async fn eval_row(&self, _row: &OwnedRow) -> Result { + unimplemented!() + } +} + +impl Build for Hour { + fn build( + prost: &ExprNode, + build_child: impl Fn(&ExprNode) -> Result, + ) -> Result { + let (children, res_type) = get_children_and_return_type_for_func_call(prost)?; + + // Check expression + // # TODO + // Check the first child type here. + ensure!(children.len() == 1); + ensure!(res_type == DataType::Int32); + + // Build the child + let child = build_child(&children[0])?; + Ok(Hour { + child, + transform: HourTransform {}, + }) + } +} diff --git a/src/expr/core/src/expr/external/mod.rs b/src/expr/core/src/expr/external/mod.rs new file mode 100644 index 0000000000000..aa44de6753390 --- /dev/null +++ b/src/expr/core/src/expr/external/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod iceberg; diff --git a/src/expr/core/src/expr/mod.rs b/src/expr/core/src/expr/mod.rs index 951ef4bb99765..ad91640f5a0a5 100644 --- a/src/expr/core/src/expr/mod.rs +++ b/src/expr/core/src/expr/mod.rs @@ -40,6 +40,7 @@ pub(crate) mod expr_udf; pub(crate) mod wrapper; mod build; +mod external; pub mod test_utils; mod value; diff --git a/src/expr/impl/src/scalar/construct_struct.rs b/src/expr/impl/src/scalar/construct_struct.rs new file mode 100644 index 0000000000000..cee4a69169c48 --- /dev/null +++ b/src/expr/impl/src/scalar/construct_struct.rs @@ -0,0 +1,109 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_common::array::{ArrayImpl, ArrayRef, DataChunk, StructArray}; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, Datum, ScalarImpl, StructValue}; +use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_expr::expr::{BoxedExpression, Expression}; +use risingwave_expr::{build_function, Result}; + +#[derive(Debug)] +pub struct ConstructStructExpression { + return_type: DataType, + children: Vec, +} + +#[async_trait::async_trait] +impl Expression for ConstructStructExpression { + fn return_type(&self) -> DataType { + self.return_type.clone() + } + + async fn eval(&self, input: &DataChunk) -> Result { + let mut struct_cols = Vec::with_capacity(self.children.len()); + for child in &self.children { + let res = child.eval(input).await?; + struct_cols.push(res); + } + Ok(Arc::new(ArrayImpl::Struct(StructArray::new( + self.return_type.as_struct().clone(), + struct_cols, + input.visibility().clone(), + )))) + } + + async fn eval_row(&self, input: &OwnedRow) -> Result { + let mut datums = Vec::with_capacity(self.children.len()); + for child in &self.children { + let res = child.eval_row(input).await?; + datums.push(res); + } + Ok(Some(ScalarImpl::Struct(StructValue::new(datums)))) + } +} + +#[build_function("construct_struct(...) -> struct", type_infer = "panic")] +fn build(return_type: DataType, children: Vec) -> Result { + assert!(return_type.is_struct()); + return_type + .as_struct() + .types() + .zip_eq_fast(children.iter()) + .for_each(|(ty, child)| { + assert_eq!(*ty, child.return_type()); + }); + + Ok(Box::new(ConstructStructExpression { + return_type, + children, + })) +} + +#[cfg(test)] +mod tests { + use risingwave_common::array::DataChunk; + use risingwave_common::row::Row; + use risingwave_common::test_prelude::DataChunkTestExt; + use risingwave_common::types::ToOwnedDatum; + use risingwave_common::util::iter_util::ZipEqDebug; + use risingwave_expr::expr::build_from_pretty; + + #[tokio::test] + async fn test_construct_struct_expr() { + let expr = build_from_pretty( + "(construct_struct:struct $0:int4 $1:int4 $2:int4)", + ); + let (input, expected) = DataChunk::from_pretty( + "i i i + 1 2 3 (1,2,3) + 4 2 1 (4,2,1) + 9 1 3 (9,1,3) + 1 1 1 (1,1,1)", + ) + .split_column_at(3); + + // test eval + let output = expr.eval(&input).await.unwrap(); + assert_eq!(&output, expected.column_at(0)); + + // test eval_row + for (row, expected) in input.rows().zip_eq_debug(expected.rows()) { + let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + assert_eq!(result, expected.datum_at(0).to_owned_datum()); + } + } +} diff --git a/src/expr/impl/src/scalar/mod.rs b/src/expr/impl/src/scalar/mod.rs index c76a7b48663ee..9bde2180d644c 100644 --- a/src/expr/impl/src/scalar/mod.rs +++ b/src/expr/impl/src/scalar/mod.rs @@ -78,6 +78,7 @@ mod to_char; mod to_jsonb; mod vnode; pub use to_jsonb::*; +mod construct_struct; mod to_timestamp; mod translate; mod trigonometric; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index e279ccb38ba69..5dac08a07ef6a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -36,6 +36,7 @@ fixedbitset = "0.4.2" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" +icelake = { workspace = true } itertools = "0.12" maplit = "1" md5 = "0.7.0" diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 9e977a3c23ea7..d58bae19d3126 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -794,6 +794,9 @@ impl TestCase { "test_table".into(), format_desc, None, + // # TODO + // Fixed me. It may should not use None here. + None, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index c22f19e3067a5..9f825e46349ee 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -225,7 +225,14 @@ impl ExprVisitor for ImpureAnalyzer { | expr_node::Type::Greatest | expr_node::Type::Least | expr_node::Type::ConvertFrom - | expr_node::Type::ConvertTo => + | expr_node::Type::ConvertTo + | expr_node::Type::ConstructStruct + | expr_node::Type::IcebergBucket + | expr_node::Type::IcebergTruncate + | expr_node::Type::IcebergYear + | expr_node::Type::IcebergMonth + | expr_node::Type::IcebergDay + | expr_node::Type::IcebergHour => // expression output is deterministic(same result for the same input) { func_call diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 9a42f98b6428d..d6145bf557d15 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -17,7 +17,10 @@ use std::rc::Rc; use std::sync::{Arc, LazyLock}; use anyhow::Context; +use arrow_schema::DataType as ArrowDataType; use either::Either; +use icelake::catalog::load_catalog; +use icelake::TableIdentifier; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; @@ -27,6 +30,7 @@ use risingwave_common::types::Datum; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; @@ -43,6 +47,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; +use super::create_source::UPSTREAM_SOURCE_KEY; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; @@ -52,13 +57,15 @@ use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGene use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::{generic, Explain, LogicalSource, StreamProject}; +use crate::optimizer::plan_node::{ + generic, IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, +}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::resolve_privatelink_in_with_option; -use crate::{Planner, TableCatalog, WithOptions}; +use crate::{Explain, Planner, TableCatalog, WithOptions}; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { let table_factor = TableFactor::Table { @@ -98,6 +105,7 @@ pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, + partition_info: Option, ) -> Result { let db_name = session.database(); let (sink_schema_name, sink_table_name) = @@ -207,6 +215,7 @@ pub fn gen_sink_plan( sink_from_table_name, format_desc, target_table, + partition_info, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -271,6 +280,77 @@ pub fn gen_sink_plan( }) } +// This function is used to return partition compute info for a sink. +// Return Some(PartitionComputeInfo) if the sink need to compute partition. +// Return None if the sink does not need to compute partition. +// More details refer in `PartitionComputeInfo`. +// +// # TODO +// We need return None for range partition +pub async fn get_partition_compute_info( + with_options: &WithOptions, +) -> Option { + let properties = HashMap::from_iter(with_options.clone().into_inner().into_iter()); + let connector = properties.get(UPSTREAM_SOURCE_KEY)?; + match connector.as_str() { + ICEBERG_SINK => { + let iceberg_config = IcebergConfig::from_hashmap(properties).ok()?; + get_partition_compute_info_for_iceberg(&iceberg_config).await + } + _ => None, + } +} + +async fn get_partition_compute_info_for_iceberg( + iceberg_config: &IcebergConfig, +) -> Option { + let catalog = load_catalog(&iceberg_config.build_iceberg_configs().ok()?) + .await + .ok()?; + let table_id = TableIdentifier::new(iceberg_config.table_name.split('.')).ok()?; + let table = catalog.load_table(&table_id).await.ok()?; + let partition_spec = table + .current_table_metadata() + .current_partition_spec() + .ok()?; + + if partition_spec.is_unpartitioned() { + return None; + } + + // Only compute the partition and shuffle by them for the sparse partition which means that the data distribution is more sparse. + // For other partition types, we just let the sink to compute the partition. + let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { + icelake::types::Transform::Identity + | icelake::types::Transform::Truncate(_) + | icelake::types::Transform::Bucket(_) => true, + icelake::types::Transform::Year + | icelake::types::Transform::Month + | icelake::types::Transform::Day + | icelake::types::Transform::Hour + | icelake::types::Transform::Void => false, + }); + + if !has_sparse_partition { + return None; + } + + let arrow_type: ArrowDataType = table.current_partition_type().ok()?.try_into().ok()?; + let partition_fields = table + .current_table_metadata() + .current_partition_spec() + .ok()? + .fields + .iter() + .map(|f| (f.name.clone(), f.transform)) + .collect_vec(); + + Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo { + partition_type: arrow_type.into(), + partition_fields, + })) +} + pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -285,6 +365,8 @@ pub async fn handle_create_sink( return Ok(resp); } + let partition_info = get_partition_compute_info(&handle_args.with_options).await; + let (sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); @@ -293,7 +375,7 @@ pub async fn handle_create_sink( sink_plan: plan, sink_catalog: sink, target_table_catalog, - } = gen_sink_plan(&session, context.clone(), stmt)?; + } = gen_sink_plan(&session, context.clone(), stmt, partition_info)?; let has_order_by = !query.order_by.is_empty(); if has_order_by { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index c93f4996fac23..6c7fcfc8a5eba 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -24,7 +24,7 @@ use thiserror_ext::AsReport; use super::create_index::gen_create_index_plan; use super::create_mv::gen_create_mv_plan; -use super::create_sink::gen_sink_plan; +use super::create_sink::{gen_sink_plan, get_partition_compute_info}; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; @@ -86,6 +86,13 @@ async fn do_handle_explain( let context = plan.ctx(); (Ok(plan), context) } + Statement::CreateSink { stmt } => { + let partition_info = get_partition_compute_info(context.with_options()).await; + let plan = gen_sink_plan(&session, context.into(), stmt, partition_info) + .map(|plan| plan.sink_plan)?; + let context = plan.ctx(); + (Ok(plan), context) + } // For other queries without `await` point, we can keep a copy of reference to the // `OptimizerContext` even if the planning fails. This enables us to log the partial @@ -112,10 +119,6 @@ async fn do_handle_explain( ) .map(|x| x.0), - Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan) - } - Statement::CreateIndex { name, table_name, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index a059b46e6488a..1fe8b292edf08 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -58,8 +58,8 @@ use self::heuristic_optimizer::ApplyOrder; use self::plan_node::generic::{self, PhysicalPlanRef}; use self::plan_node::{ stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource, - StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, - ToStreamContext, + PartitionComputeInfo, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, + StreamWatermarkFilter, ToStreamContext, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -718,6 +718,7 @@ impl PlanRoot { sink_from_table_name: String, format_desc: Option, target_table: Option, + partition_info: Option, ) -> Result { let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; @@ -734,6 +735,7 @@ impl PlanRoot { definition, properties, format_desc, + partition_info, ) } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index fd05a4c7ecc9a..d9a6b90a126d4 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -940,7 +940,7 @@ pub use stream_project_set::StreamProjectSet; pub use stream_row_id_gen::StreamRowIdGen; pub use stream_share::StreamShare; pub use stream_simple_agg::StreamSimpleAgg; -pub use stream_sink::StreamSink; +pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink}; pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 50ee550759c72..8c23cb9b735ae 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -17,6 +17,7 @@ use std::io::{Error, ErrorKind}; use anyhow::anyhow; use fixedbitset::FixedBitSet; +use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, Field, TableId}; @@ -25,22 +26,26 @@ use risingwave_common::constants::log_store::{ }; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::iceberg::ICEBERG_SINK; use risingwave_connector::sink::trivial::TABLE_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; +use risingwave_pb::expr::expr_node::Type; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::{derive_columns, derive_pk}; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef}; use super::stream::prelude::*; use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder}; -use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject}; +use crate::expr::{Expr, ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -49,6 +54,156 @@ use crate::{TableCatalog, WithOptions}; const DOWNSTREAM_PK_KEY: &str = "primary_key"; +/// ## Why we need `PartitionComputeInfo`? +/// +/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink() +/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in . +/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink. +/// +/// ## What is `PartitionComputeInfo`? +/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use +/// these information to create the corresponding expression in `StreamProject` node. +/// +/// #TODO +/// Maybe we should move this in sink? +pub enum PartitionComputeInfo { + Iceberg(IcebergPartitionInfo), +} + +impl PartitionComputeInfo { + pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result { + match self { + PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns), + } + } +} + +pub struct IcebergPartitionInfo { + pub partition_type: DataType, + // (partition_field_name, partition_field_transform) + pub partition_fields: Vec<(String, Transform)>, +} + +impl IcebergPartitionInfo { + #[inline] + fn transform_to_expression( + transform: &Transform, + col_id: usize, + columns: &[ColumnCatalog], + ) -> Result { + match transform { + Transform::Identity => Ok(ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )), + Transform::Void => Ok(ExprImpl::literal_null( + columns[col_id].column_desc.data_type.clone(), + )), + Transform::Bucket(n) => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergBucket, + vec![ + ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()) + .into(), + ), + ExprImpl::literal_int(*n), + ], + ) + .unwrap() + .into(), + )), + Transform::Truncate(w) => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergTruncate, + vec![ + ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()) + .into(), + ), + ExprImpl::literal_int(*w), + ], + ) + .unwrap() + .into(), + )), + Transform::Year => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergYear, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + Transform::Month => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergMonth, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + Transform::Day => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergDay, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + Transform::Hour => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergHour, + vec![ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )], + ) + .unwrap() + .into(), + )), + } + } + + pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result { + let child_exprs = self + .partition_fields + .into_iter() + .map(|(field_name, transform)| { + let col_id = find_column_idx_by_name(columns, &field_name)?; + Self::transform_to_expression(&transform, col_id, columns) + }) + .collect::>>()?; + let return_type = DataType::Struct(StructType::new( + child_exprs + .iter() + .enumerate() + .map(|(id, expr)| (format!("f{id}"), expr.return_type())) + .collect_vec(), + )); + Ok(ExprImpl::FunctionCall( + FunctionCall::new_unchecked(Type::ConstructStruct, child_exprs, return_type).into(), + )) + } +} +#[inline] +fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result { + columns + .iter() + .position(|col| col.column_desc.name == col_name) + .ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name) + ))) + .into() + }) +} + /// [`StreamSink`] represents a table/connector sink at the very end of the graph. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSink { @@ -90,6 +245,7 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (input, sink) = Self::derive_sink_desc( @@ -104,6 +260,7 @@ impl StreamSink { definition, properties, format_desc, + partition_info, )?; let unsupported_sink = @@ -136,9 +293,40 @@ impl StreamSink { Ok(Self::new(input, sink)) } + fn derive_iceberg_sink_distribution( + input: PlanRef, + partition_info: Option, + columns: &[ColumnCatalog], + ) -> Result<(RequiredDist, PlanRef)> { + // For here, we need to add the plan node to compute the partition value, and add it as a extra column. + if let Some(partition_info) = partition_info { + let input_fields = input.schema().fields(); + + let mut exprs: Vec<_> = input_fields + .iter() + .enumerate() + .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into()) + .collect(); + + // Add the partition compute expression to the end of the exprs + exprs.push(partition_info.convert_to_expression(columns)?); + let partition_col_idx = exprs.len() - 1; + let project = StreamProject::new(generic::Project::new(exprs.clone(), input)); + Ok(( + RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]), + project.into(), + )) + } else { + Ok(( + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()), + input, + )) + } + } + #[allow(clippy::too_many_arguments)] fn derive_sink_desc( - input: PlanRef, + mut input: PlanRef, user_distributed_by: RequiredDist, name: String, db_name: String, @@ -149,17 +337,28 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result<(PlanRef, SinkDesc)> { + let connector_name = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + "connector not specified when create sink", + ))) + })?; + let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut downstream_pk = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + + let with_extra_partition_col = partition_info.is_some(); let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { - match properties.get("connector") { - Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => { + match connector_name.as_str() { + s if s == "jdbc" && sink_type == SinkType::Upsert => { if sink_type == SinkType::Upsert && downstream_pk.is_empty() { return Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, @@ -174,6 +373,19 @@ impl StreamSink { // lock contentions RequiredDist::hash_shard(downstream_pk.as_slice()) } + ICEBERG_SINK => { + // If user doesn't specify the downstream primary key, we use the stream key as the pk. + if sink_type.is_upsert() && downstream_pk.is_empty() { + downstream_pk = pk.iter().map(|k| k.column_index).collect_vec(); + } + let (required_dist, new_input) = Self::derive_iceberg_sink_distribution( + input, + partition_info, + &columns, + )?; + input = new_input; + required_dist + } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); if downstream_pk.is_empty() { @@ -209,6 +421,7 @@ impl StreamSink { sink_type, format_desc, target_table, + with_extra_partition_col, }; Ok((input, sink_desc)) } @@ -315,19 +528,7 @@ impl StreamSink { if trimmed_key.is_empty() { continue; } - match columns - .iter() - .position(|col| col.column_desc.name == trimmed_key) - { - Some(index) => downstream_pk_indices.push(index), - None => { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", trimmed_key), - ))) - .into()); - } - } + downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?); } Ok(downstream_pk_indices) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 5aca2842f5ad6..5531ef6e42516 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -90,11 +90,20 @@ impl SinkExecutor { log_store_factory: F, ) -> StreamExecutorResult { let sink = build_sink(sink_param.clone())?; - let input_schema: Schema = columns + let sink_input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) .collect(); - assert_eq!(input_schema.data_types(), info.schema.data_types()); + + if sink_writer_param.with_extra_partition_col { + // Remove the partition column from the schema. + assert_eq!( + sink_input_schema.data_types(), + info.schema.data_types()[..info.schema.data_types().len() - 1] + ); + } else { + assert_eq!(sink_input_schema.data_types(), info.schema.data_types()); + } Ok(Self { actor_context, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 659450e83c077..203f3a82ec4dd 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -116,6 +116,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client(), sink_metrics, + with_extra_partition_col: sink_desc.with_extra_partition_col, }; let log_store_identity = format!( diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 7893c3086d776..8a4780504df30 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -62,7 +62,7 @@ hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["serde", "std"] } -itertools = { version = "0.11" } +itertools = { version = "0.10" } jni = { version = "0.21", features = ["invocation"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lexical-core = { version = "0.8", features = ["format"] } @@ -143,6 +143,9 @@ unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } whoami = { version = "1" } +zstd = { version = "0.13" } +zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] } +zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] } [build-dependencies] ahash = { version = "0.8" } @@ -158,7 +161,7 @@ fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } -itertools = { version = "0.11" } +itertools = { version = "0.10" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }