From d8a927455e45fc506a0a06ebcefc268ae3f8ac2d Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sun, 17 Dec 2023 00:39:27 +0800 Subject: [PATCH] draft stream partition compute --- Cargo.lock | 4 + proto/stream_plan.proto | 13 + src/common/src/array/arrow/arrow_impl.rs | 9 +- src/connector/Cargo.toml | 2 + src/connector/src/sink/iceberg/mod.rs | 64 +++- .../iceberg/precomputed_partition_writer.rs | 282 ++++++++++++++++++ src/frontend/Cargo.toml | 1 + src/frontend/planner_test/src/lib.rs | 3 + src/frontend/src/handler/create_sink.rs | 58 +++- src/frontend/src/handler/explain.rs | 13 +- src/frontend/src/optimizer/mod.rs | 6 +- src/frontend/src/optimizer/plan_node/mod.rs | 5 + .../plan_node/stream_partition_compute.rs | 135 +++++++++ .../src/optimizer/plan_node/stream_sink.rs | 111 +++++-- .../src/utils/stream_graph_formatter.rs | 3 +- src/stream/Cargo.toml | 1 + src/stream/src/executor/mod.rs | 1 + .../src/executor/partition_compute/iceberg.rs | 78 +++++ .../src/executor/partition_compute/mod.rs | 270 +++++++++++++++++ src/stream/src/executor/sink.rs | 8 +- src/stream/src/from_proto/mod.rs | 3 + .../src/from_proto/partition_compute.rs | 51 ++++ 22 files changed, 1077 insertions(+), 44 deletions(-) create mode 100644 src/connector/src/sink/iceberg/precomputed_partition_writer.rs create mode 100644 src/frontend/src/optimizer/plan_node/stream_partition_compute.rs create mode 100644 src/stream/src/executor/partition_compute/iceberg.rs create mode 100644 src/stream/src/executor/partition_compute/mod.rs create mode 100644 src/stream/src/from_proto/partition_compute.rs diff --git a/Cargo.lock b/Cargo.lock index d130d57d6ae90..de551fdf9aac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8390,7 +8390,9 @@ dependencies = [ "anyhow", "apache-avro 0.16.0", "arrow-array 49.0.0", + "arrow-row 49.0.0", "arrow-schema 49.0.0", + "arrow-select 49.0.0", "async-nats", "async-trait", "auto_enums", @@ -8661,6 +8663,7 @@ dependencies = [ "futures", "futures-async-stream", "iana-time-zone", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", @@ -9357,6 +9360,7 @@ dependencies = [ "futures-async-stream", "governor", "hytra", + "icelake", "itertools 0.12.0", "local_stats_alloc", "lru 0.7.6", diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index fa1936454d860..7a3bcf2276067 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -239,6 +239,18 @@ message SinkNode { SinkLogStoreType log_store_type = 3; } +message PartitionComputeNode { + message PartitionField { + uint64 col_id = 1; + string transform = 2; + } + message PartitionSpec { + string connector_type = 1; + repeated PartitionField fields = 2; + } + PartitionSpec spec = 1; +} + message ProjectNode { repeated expr.ExprNode select_list = 1; // this two field is expressing a list of usize pair, which means when project receives a @@ -753,6 +765,7 @@ message StreamNode { StreamFsFetchNode stream_fs_fetch = 138; StreamCdcScanNode stream_cdc_scan = 139; CdcFilterNode cdc_filter = 140; + PartitionComputeNode partition_compute = 141; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 78dc79c530b9f..0c61547d09449 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -133,6 +133,7 @@ macro_rules! converts_generic { fn try_from(array: &ArrayImpl) -> Result { match array { $($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)* + ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())), _ => todo!("unsupported array"), } } @@ -152,6 +153,13 @@ macro_rules! converts_generic { .unwrap() .try_into()?, )),)* + Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz( + array + .as_any() + .downcast_ref::() + .unwrap() + .try_into()?, + )), t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))), } } @@ -173,7 +181,6 @@ converts_generic! { { arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 }, { arrow_array::Date32Array, Date32, ArrayImpl::Date }, { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp }, - { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz }, { arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time }, { arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval }, { arrow_array::StructArray, Struct(_), ArrayImpl::Struct }, diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 1d7ad5ef58f77..5d33b499f3d71 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c "xz", ] } arrow-array = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } async-nats = "0.32" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 660fdc3f9c6b3..01f7f22d058ff 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod precomputed_partition_writer; mod prometheus; use std::collections::HashMap; @@ -19,7 +20,9 @@ use std::fmt::Debug; use std::ops::Deref; use anyhow::anyhow; -use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use async_trait::async_trait; use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; @@ -44,7 +47,6 @@ use url::Url; use with_options::WithOptions; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; -use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -163,7 +165,7 @@ impl IcebergConfig { Ok(config) } - fn build_iceberg_configs(&self) -> Result> { + pub fn build_iceberg_configs(&self) -> Result> { let mut iceberg_configs = HashMap::new(); let catalog_type = self @@ -391,6 +393,40 @@ enum IcebergWriterEnum { } impl IcebergWriter { + fn schema(table: &Table) -> Result { + let schema = table.current_arrow_schema()?; + // #TODO + // When iceberg table is partitioned, we will add a partition column to the schema. + // Maybe we should accept a flag from `SinkWriterParam`? + if !table + .current_table_metadata() + .current_partition_spec()? + .is_unpartitioned() + { + let mut fields = schema.fields().to_vec(); + // # TODO + // When convert rw struct to arrow struct, we will compare the type of each field. + // To avoid the impact of metadata, we should reconstruct struct type without metadata. + // This approach may hack, do we have a better way? + let partition_type = if let ArrowDataType::Struct(s) = + table.current_partition_type()?.try_into()? + { + let fields = Fields::from( + s.into_iter() + .map(|field| ArrowField::new(field.name(), field.data_type().clone(), true)) + .collect::>(), + ); + ArrowDataType::Struct(fields) + } else { + unimplemented!() + }; + fields.push(ArrowField::new("_rw_partition", partition_type, false).into()); + Ok(ArrowSchema::new(fields).into()) + } else { + Ok(schema) + } + } + pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; @@ -402,10 +438,11 @@ impl IcebergWriter { .iceberg_rolling_unflushed_data_file .clone(), )); - let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); + let partition_data_file_builder = + precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new( + data_file_builder.clone(), + table.current_partition_type()?, + ); let dispatch_builder = builder_helper .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; // wrap a layer with collect write metrics @@ -420,7 +457,7 @@ impl IcebergWriter { .clone(), ), ); - let schema = table.current_arrow_schema()?; + let schema = Self::schema(&table)?; let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), @@ -457,10 +494,11 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(delta_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); + let partition_delta_builder = + precomputed_partition_writer::PrecomputedPartitionedWriterBuilder::new( + delta_builder.clone(), + table.current_partition_type()?, + ); let dispatch_builder = builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; // wrap a layer with collect write metrics @@ -475,7 +513,7 @@ impl IcebergWriter { .clone(), ), ); - let schema = table.current_arrow_schema()?; + let schema = Self::schema(&table)?; let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), diff --git a/src/connector/src/sink/iceberg/precomputed_partition_writer.rs b/src/connector/src/sink/iceberg/precomputed_partition_writer.rs new file mode 100644 index 0000000000000..e49316d542a43 --- /dev/null +++ b/src/connector/src/sink/iceberg/precomputed_partition_writer.rs @@ -0,0 +1,282 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use arrow_array::{BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, SchemaRef}; +use arrow_select::filter::filter_record_batch; +use icelake::io_v2::{IcebergWriteResult, IcebergWriter, IcebergWriterBuilder}; +use icelake::types::{struct_to_anyvalue_array_with_type, Any, AnyValue}; +use risingwave_common::util::iter_util::ZipEqFast; + +#[derive(Clone)] +pub struct PrecomputedPartitionedWriterBuilder { + inner: B, + partition_type: Any, +} + +impl PrecomputedPartitionedWriterBuilder { + pub fn new(inner: B, partition_type: Any) -> Self { + Self { + inner, + partition_type, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for PrecomputedPartitionedWriterBuilder { + type R = PrecomputedPartitionedWriter; + + async fn build(self, schema: &SchemaRef) -> icelake::Result { + // Find the partition value column + let expect_partition_type = + self.partition_type.clone().try_into().map_err(|e| { + icelake::Error::new(icelake::ErrorKind::ArrowError, format!("{}", e)) + })?; + let partition_col_idx = schema + .fields() + .iter() + .position(|field|{ + if let DataType::Struct(s1) = field.data_type() && let DataType::Struct(s2) = &expect_partition_type { + s1.iter().zip_eq_fast(s2.iter()).all(|(f1,f2)|f1.data_type() == f2.data_type()) + } else { + false + } + }) + .ok_or_else(|| { + icelake::Error::new( + icelake::ErrorKind::ArrowError, + format!( + "Schema should contains partition type {:?}", + expect_partition_type + ), + ) + })?; + + // remove the partition type from schema. + let mut new_schema = schema.as_ref().clone(); + new_schema.remove(partition_col_idx); + + let arrow_partition_type_fields = + if let DataType::Struct(fields) = self.partition_type.clone().try_into()? { + fields + } else { + unreachable!() + }; + let row_converter = RowConverter::new(vec![SortField::new(DataType::Struct( + arrow_partition_type_fields.clone(), + ))]) + .map_err(|e| icelake::Error::new(icelake::ErrorKind::ArrowError, e.to_string()))?; + + Ok(PrecomputedPartitionedWriter { + inner_writers: HashMap::new(), + inner_buidler: self.inner, + schema: new_schema.into(), + partition_type: self.partition_type, + row_converter, + partition_col_idx, + }) + } +} + +/// Partition append only writer +pub struct PrecomputedPartitionedWriter { + inner_writers: HashMap, + partition_type: Any, + row_converter: RowConverter, + inner_buidler: B, + schema: SchemaRef, + partition_col_idx: usize, +} + +#[async_trait::async_trait] +impl IcebergWriter for PrecomputedPartitionedWriter { + type R = <::R as IcebergWriter>::R; + + /// Write a record batch. The `DataFileWriter` will create a new file when the current row num is greater than `target_file_row_num`. + async fn write(&mut self, mut batch: RecordBatch) -> icelake::Result<()> { + // Remove the partition value from batch. + let partition_value = batch.remove_column(self.partition_col_idx); + let value_array = struct_to_anyvalue_array_with_type( + partition_value + .as_any() + .downcast_ref::() + .unwrap(), + self.partition_type.clone(), + )?; + + // Group the batch by row value. + // # TODO + // Maybe optimize the alloc and clone + let rows = self + .row_converter + .convert_columns(&[partition_value.clone()]) + .map_err(|e| icelake::Error::new(icelake::ErrorKind::ArrowError, e.to_string()))?; + let mut filters = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + filters + .entry(row.owned()) + .or_insert_with(|| (vec![false; batch.num_rows()], row_id)) + .0[row_id] = true; + }); + + for (row, filter) in filters { + let filter_array: BooleanArray = filter.0.into(); + let partial_batch = filter_record_batch(&batch, &filter_array) + .expect("We should guarantee the filter array is valid"); + match self.inner_writers.entry(row) { + Entry::Occupied(mut writer) => { + writer.get_mut().0.write(partial_batch).await?; + } + Entry::Vacant(vacant) => { + let value = value_array.get(filter.1).unwrap().as_ref().unwrap().clone(); + if let AnyValue::Struct(value) = value { + let new_writer = self.inner_buidler.clone().build(&self.schema).await?; + vacant + .insert((new_writer, value)) + .0 + .write(partial_batch) + .await?; + } else { + unreachable!() + } + } + } + } + Ok(()) + } + + /// Complte the write and return the list of `DataFile` as result. + async fn flush(&mut self) -> icelake::Result> { + let mut res_vec = vec![]; + let inner_writers = std::mem::take(&mut self.inner_writers); + for (_key, (mut writer, value)) in inner_writers { + let mut res = writer.flush().await?; + res.iter_mut().for_each(|res| { + res.set_partition(Some(value.clone())); + }); + res_vec.extend(res); + } + Ok(res_vec) + } +} + +// #[cfg(test)] +// mod test { +// use icelake::types::{PartitionSpec, PartitionField}; +// use itertools::Itertools; + +// pub fn create_partition() -> PartitionSpec { +// PartitionSpec { +// spec_id: 1, +// fields: vec![PartitionField { +// source_column_id: 1, +// partition_field_id: 1, +// transform: crate::types::Transform::Identity, +// name: "col1".to_string(), +// }], +// } +// } + +// #[tokio::test] +// async fn test_partition_writer() { +// let schema = create_schema(2); +// let arrow_schema = create_arrow_schema(2); +// let partition_spec = create_partition(); +// let partition_type = Any::Struct(partition_spec.partition_type(&schema).unwrap().into()); + +// let to_write = create_batch( +// &arrow_schema, +// vec![ +// vec![1, 2, 3, 1, 2, 3, 1, 2, 3], +// vec![1, 1, 1, 2, 2, 2, 3, 3, 3], +// ], +// ); + +// let builder = FanoutPartitionedWriterBuilder::new( +// TestWriterBuilder {}, +// partition_type, +// partition_spec, +// ); +// let mut writer = builder.build(&arrow_schema).await.unwrap(); +// writer.write(to_write).await.unwrap(); + +// assert_eq!(writer.inner_writers.len(), 3); + +// let expect1 = create_batch(&arrow_schema, vec![vec![1, 1, 1], vec![1, 2, 3]]); +// let expect2 = create_batch(&arrow_schema, vec![vec![2, 2, 2], vec![1, 2, 3]]); +// let expect3 = create_batch(&arrow_schema, vec![vec![3, 3, 3], vec![1, 2, 3]]); +// let actual_res = writer +// .inner_writers +// .values() +// .map(|writer| writer.res()) +// .collect_vec(); +// assert!(actual_res.contains(&expect1)); +// assert!(actual_res.contains(&expect2)); +// assert!(actual_res.contains(&expect3)); +// } + +// // # NOTE +// // The delta writer will put the op vec in the last column, this test case test that the partition will +// // ignore the last column. +// #[tokio::test] +// async fn test_partition_delta_writer() { +// let schema = create_schema(2); +// let arrow_schema = create_arrow_schema(3); +// let partition_spec = create_partition(); +// let partition_type = Any::Struct(partition_spec.partition_type(&schema).unwrap().into()); + +// let builder = FanoutPartitionedWriterBuilder::new( +// TestWriterBuilder {}, +// partition_type, +// partition_spec, +// ); +// let mut writer = builder.build(&arrow_schema).await.unwrap(); + +// let to_write = create_batch( +// &arrow_schema, +// vec![ +// vec![1, 2, 3, 1, 2, 3, 1, 2, 3], +// vec![1, 1, 1, 2, 2, 2, 3, 3, 3], +// vec![3, 2, 1, 1, 3, 2, 2, 1, 3], +// ], +// ); +// writer.write(to_write).await.unwrap(); +// assert_eq!(writer.inner_writers.len(), 3); +// let expect1 = create_batch( +// &arrow_schema, +// vec![vec![1, 1, 1], vec![1, 2, 3], vec![3, 1, 2]], +// ); +// let expect2 = create_batch( +// &arrow_schema, +// vec![vec![2, 2, 2], vec![1, 2, 3], vec![2, 3, 1]], +// ); +// let expect3 = create_batch( +// &arrow_schema, +// vec![vec![3, 3, 3], vec![1, 2, 3], vec![1, 2, 3]], +// ); +// let actual_res = writer +// .inner_writers +// .values() +// .map(|writer| writer.res()) +// .collect_vec(); +// assert!(actual_res.contains(&expect1)); +// assert!(actual_res.contains(&expect2)); +// assert!(actual_res.contains(&expect3)); +// } +// } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index bb94d17b2f47b..ac7fff9677821 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -36,6 +36,7 @@ fixedbitset = "0.4.2" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" +icelake = { workspace = true } itertools = "0.12" maplit = "1" md5 = "0.7.0" diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 341d6c697c725..6c00a8fee015b 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -792,6 +792,9 @@ impl TestCase { "test_table".into(), format_desc, None, + // # TODO + // Fixed me. It may should not use None here. + None, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 73060b7a9f965..383a50e60df32 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -17,7 +17,10 @@ use std::rc::Rc; use std::sync::{Arc, LazyLock}; use anyhow::Context; +use arrow_schema::DataType as ArrowDataType; use either::Either; +use icelake::catalog::load_catalog; +use icelake::TableIdentifier; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; @@ -27,6 +30,7 @@ use risingwave_common::types::Datum; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, bail_not_implemented, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; @@ -43,6 +47,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; +use super::create_source::UPSTREAM_SOURCE_KEY; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; @@ -52,7 +57,9 @@ use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGene use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::{generic, Explain, LogicalSource, StreamProject}; +use crate::optimizer::plan_node::{ + generic, Explain, LogicalSource, PartitionComputeInfo, StreamProject, +}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; @@ -98,6 +105,7 @@ pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, + partition_info: Option, ) -> Result { let db_name = session.database(); let (sink_schema_name, sink_table_name) = @@ -207,6 +215,7 @@ pub fn gen_sink_plan( sink_from_table_name, format_desc, target_table, + partition_info, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -271,6 +280,49 @@ pub fn gen_sink_plan( }) } +// # TODO +// We need return None for range partition +pub async fn get_partition_compute_info( + with_options: &WithOptions, +) -> Option { + let properties = HashMap::from_iter(with_options.clone().into_inner().into_iter()); + let connector = properties.get(UPSTREAM_SOURCE_KEY)?; + match connector.as_str() { + ICEBERG_SINK => { + let iceberg_config = IcebergConfig::from_hashmap(properties).ok()?; + let catalog = load_catalog(&iceberg_config.build_iceberg_configs().ok()?) + .await + .ok()?; + let table_id = TableIdentifier::new(iceberg_config.table_name.split('.')).ok()?; + let table = catalog.load_table(&table_id).await.ok()?; + let partition_spec = table + .current_table_metadata() + .current_partition_spec() + .ok()?; + + if partition_spec.is_unpartitioned() { + return None; + } + + let arrow_type: ArrowDataType = table.current_partition_type().ok()?.try_into().ok()?; + let partition_fields = table + .current_table_metadata() + .current_partition_spec() + .ok()? + .fields + .iter() + .map(|f| (f.name.clone(), (&f.transform).to_string())) + .collect_vec(); + + Some(PartitionComputeInfo { + partition_type: arrow_type.try_into().ok()?, + partition_fields, + }) + } + _ => None, + } +} + pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -285,6 +337,8 @@ pub async fn handle_create_sink( return Ok(resp); } + let partition_info = get_partition_compute_info(&handle_args.with_options).await; + let (sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); @@ -293,7 +347,7 @@ pub async fn handle_create_sink( sink_plan: plan, sink_catalog: sink, target_table_catalog, - } = gen_sink_plan(&session, context.clone(), stmt)?; + } = gen_sink_plan(&session, context.clone(), stmt, partition_info)?; let has_order_by = !query.order_by.is_empty(); if has_order_by { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 8e2dc051379f9..8f4d4b6de3b2d 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -24,7 +24,7 @@ use thiserror_ext::AsReport; use super::create_index::gen_create_index_plan; use super::create_mv::gen_create_mv_plan; -use super::create_sink::gen_sink_plan; +use super::create_sink::{gen_sink_plan, get_partition_compute_info}; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; @@ -84,6 +84,13 @@ async fn do_handle_explain( let context = plan.ctx(); (Ok(plan), context) } + Statement::CreateSink { stmt } => { + let partition_info = get_partition_compute_info(context.with_options()).await; + let plan = gen_sink_plan(&session, context.into(), stmt, partition_info) + .map(|plan| plan.sink_plan)?; + let context = plan.ctx(); + (Ok(plan), context) + } // For other queries without `await` point, we can keep a copy of reference to the // `OptimizerContext` even if the planning fails. This enables us to log the partial @@ -110,10 +117,6 @@ async fn do_handle_explain( ) .map(|x| x.0), - Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan) - } - Statement::CreateIndex { name, table_name, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index f6de31e4b6990..7dd2377bc2233 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -58,8 +58,8 @@ use self::heuristic_optimizer::ApplyOrder; use self::plan_node::generic::{self, PhysicalPlanRef}; use self::plan_node::{ stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource, - StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, - ToStreamContext, + PartitionComputeInfo, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, + StreamWatermarkFilter, ToStreamContext, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -718,6 +718,7 @@ impl PlanRoot { sink_from_table_name: String, format_desc: Option, target_table: Option, + partition_info: Option, ) -> Result { let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; @@ -734,6 +735,7 @@ impl PlanRoot { definition, properties, format_desc, + partition_info, ) } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 65a87af0c371a..4422a2fbd7604 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -856,9 +856,11 @@ mod stream_watermark_filter; mod derive; mod stream_cdc_table_scan; +mod stream_partition_compute; mod stream_share; mod stream_temporal_join; mod stream_union; +pub use stream_partition_compute::PartitionComputeInfo; pub mod utils; pub use batch_delete::BatchDelete; @@ -931,6 +933,7 @@ pub use stream_hop_window::StreamHopWindow; pub use stream_materialize::StreamMaterialize; pub use stream_now::StreamNow; pub use stream_over_window::StreamOverWindow; +pub use stream_partition_compute::StreamPartitionCompute; pub use stream_project::StreamProject; pub use stream_project_set::StreamProjectSet; pub use stream_row_id_gen::StreamRowIdGen; @@ -1056,6 +1059,7 @@ macro_rules! for_all_plan_nodes { , { Stream, EowcSort } , { Stream, OverWindow } , { Stream, FsFetch } + , { Stream, PartitionCompute } } }; } @@ -1168,6 +1172,7 @@ macro_rules! for_stream_plan_nodes { , { Stream, EowcSort } , { Stream, OverWindow } , { Stream, FsFetch } + , { Stream, PartitionCompute } } }; } diff --git a/src/frontend/src/optimizer/plan_node/stream_partition_compute.rs b/src/frontend/src/optimizer/plan_node/stream_partition_compute.rs new file mode 100644 index 0000000000000..2157c7218a91b --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/stream_partition_compute.rs @@ -0,0 +1,135 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use pretty_xmlish::XmlNode; +use risingwave_common::catalog::Field; +use risingwave_common::types::DataType; +use risingwave_pb::stream_plan::stream_node::PbNodeBody; + +use super::expr_visitable::ExprVisitable; +use super::stream::prelude::{GenericPlanRef, PhysicalPlanRef}; +use super::stream::StreamPlanRef; +use super::utils::{childless_record, Distill}; +use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode}; +use crate::stream_fragmenter::BuildFragmentGraphState; +use crate::PlanRef; + +pub struct PartitionComputeInfo { + pub partition_type: DataType, + // (partition_field_name, partition_field_transform) + pub partition_fields: Vec<(String, String)>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PartitionField { + pub col_index: u64, + pub transform: String, +} + +/// [`StreamPartitionCompute`] used to compute the partition of the stream in advance. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StreamPartitionCompute { + pub base: PlanBase, + input: PlanRef, + connector_name: String, + partition_fields: Vec, + partition_type: DataType, +} + +impl StreamPartitionCompute { + #[must_use] + pub fn new( + input: PlanRef, + connector_name: String, + partition_fields: Vec, + partition_type: DataType, + ) -> Self { + let mut schema = input.schema().clone(); + schema + .fields + .push(Field::with_name(partition_type.clone(), "_rw_partition")); + let mut watermark_columns = input.watermark_columns().clone(); + watermark_columns.grow(input.watermark_columns().len() + 1); + let base = PlanBase::new_stream( + input.ctx(), + schema, + input.stream_key().map(|v| v.to_vec()), + input.functional_dependency().clone(), + input.distribution().clone(), + input.append_only(), + input.emit_on_window_close(), + watermark_columns, + ); + Self { + base, + input, + connector_name, + partition_fields, + partition_type, + } + } +} + +impl PlanTreeNodeUnary for StreamPartitionCompute { + fn input(&self) -> PlanRef { + self.input.clone() + } + + fn clone_with_input(&self, input: PlanRef) -> Self { + Self::new( + input, + self.connector_name.clone(), + self.partition_fields.clone(), + self.partition_type.clone(), + ) + } +} + +impl_plan_tree_node_for_unary! { StreamPartitionCompute } + +impl StreamNode for StreamPartitionCompute { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { + use risingwave_pb::stream_plan::*; + + PbNodeBody::PartitionCompute(PartitionComputeNode { + spec: Some( + risingwave_pb::stream_plan::partition_compute_node::PartitionSpec { + connector_type: self.connector_name.clone(), + fields: self + .partition_fields + .clone() + .into_iter() + .map(|field| { + risingwave_pb::stream_plan::partition_compute_node::PartitionField { + col_id: field.col_index, + transform: field.transform, + } + }) + .collect_vec(), + }, + ), + }) + } +} + +impl Distill for StreamPartitionCompute { + fn distill<'a>(&self) -> XmlNode<'a> { + childless_record("StreamPartitionCompute", vec![]) + } +} + +impl ExprRewritable for StreamPartitionCompute {} + +impl ExprVisitable for StreamPartitionCompute {} diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 68814531d9293..476b022cd38e2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -29,6 +29,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::iceberg::ICEBERG_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, @@ -39,8 +40,9 @@ use tracing::info; use super::derive::{derive_columns, derive_pk}; use super::generic::GenericPlanRef; use super::stream::prelude::*; +use super::stream_partition_compute::{PartitionComputeInfo, PartitionField}; use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder}; -use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamPartitionCompute}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -90,6 +92,7 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (input, sink) = Self::derive_sink_desc( @@ -104,6 +107,7 @@ impl StreamSink { definition, properties, format_desc, + partition_info, )?; // check and ensure that the sink connector is specified and supported @@ -127,9 +131,50 @@ impl StreamSink { Ok(Self::new(input, sink)) } + fn derive_iceberg_sink_distribution( + input: PlanRef, + partition_info: Option, + columns: &[ColumnCatalog], + connector_name: &str, + ) -> Result<(RequiredDist, PlanRef)> { + // For here, we need to add the plan node to compute the partition value, and add it as a extra column. + if let Some(partition_info) = &partition_info { + let partition_fields = partition_info + .partition_fields + .iter() + .map(|(name, transform)| { + let idx = Self::find_column_idx_by_name(columns, name)?; + Ok(PartitionField { + col_index: idx as u64, + transform: transform.clone(), + }) + }) + .collect::>>()?; + let new_input: PlanRef = StreamPartitionCompute::new( + input, + connector_name.to_string(), + partition_fields, + partition_info.partition_type.clone(), + ) + .into(); + Ok(( + RequiredDist::shard_by_key( + new_input.schema().len(), + &[new_input.schema().len() - 1], + ), + new_input, + )) + } else { + Ok(( + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()), + input, + )) + } + } + #[allow(clippy::too_many_arguments)] fn derive_sink_desc( - input: PlanRef, + mut input: PlanRef, user_distributed_by: RequiredDist, name: String, db_name: String, @@ -140,24 +185,33 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result<(PlanRef, SinkDesc)> { + let connector_name = properties.get(CONNECTOR_TYPE_KEY).ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + "connector not specified when create sink", + ))) + })?; + let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut downstream_pk = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { - match properties.get("connector") { - Some(s) if s == "deltalake" => { + match connector_name.to_lowercase().as_str() { + "deltalake" => { // iceberg with multiple parallelism will fail easily with concurrent commit // on metadata // TODO: reset iceberg sink to have multiple parallelism info!("setting iceberg sink parallelism to singleton"); RequiredDist::single() } - Some(s) if s == "jdbc" && sink_type == SinkType::Upsert => { + s if s == "jdbc" && sink_type == SinkType::Upsert => { if sink_type == SinkType::Upsert && downstream_pk.is_empty() { return Err(ErrorCode::SinkError(Box::new(Error::new( ErrorKind::InvalidInput, @@ -172,6 +226,22 @@ impl StreamSink { // lock contentions RequiredDist::hash_shard(downstream_pk.as_slice()) } + ICEBERG_SINK => { + // If user doesn't specify the downstream primary key, we use the stream key as the pk. + // # TODO + // Is `input.expect_stream_key()` == pk? + if sink_type.is_upsert() && downstream_pk.is_empty() { + downstream_pk = pk.iter().map(|k| k.column_index).collect_vec(); + } + let (required_dist, new_input) = Self::derive_iceberg_sink_distribution( + input, + partition_info, + &columns, + connector_name, + )?; + input = new_input; + required_dist + } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); if downstream_pk.is_empty() { @@ -313,19 +383,8 @@ impl StreamSink { if trimmed_key.is_empty() { continue; } - match columns - .iter() - .position(|col| col.column_desc.name == trimmed_key) - { - Some(index) => downstream_pk_indices.push(index), - None => { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", trimmed_key), - ))) - .into()); - } - } + downstream_pk_indices + .push(Self::find_column_idx_by_name(columns, trimmed_key)?); } Ok(downstream_pk_indices) } @@ -337,6 +396,20 @@ impl StreamSink { } } + #[inline] + fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result { + columns + .iter() + .position(|col| col.column_desc.name == col_name) + .ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name) + ))) + .into() + }) + } + /// The table schema is: | epoch | seq id | row op | sink columns | /// Pk is: | epoch | seq id | fn infer_kv_log_store_table_catalog(&self) -> TableCatalog { diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 489c0bd4e453c..9329dff66fb90 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -317,7 +317,8 @@ impl StreamGraphFormatter { | stream_node::NodeBody::Values(_) | stream_node::NodeBody::Source(_) | stream_node::NodeBody::StreamFsFetch(_) - | stream_node::NodeBody::NoOp(_) => {} + | stream_node::NodeBody::NoOp(_) + | stream_node::NodeBody::PartitionCompute(_) => {} }; if self.verbose { diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index c24729dfbdb25..0be57ecf14f23 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -34,6 +34,7 @@ governor = { version = "0.6", default-features = false, features = [ "jitter", ] } hytra = "0.1.2" +icelake = { workspace = true } itertools = "0.12" local_stats_alloc = { path = "../utils/local_stats_alloc" } lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 04754d71807bb..0f6a34bd635a1 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -78,6 +78,7 @@ mod mview; mod no_op; mod now; mod over_window; +pub mod partition_compute; mod project; mod project_set; mod rearranged_chain; diff --git a/src/stream/src/executor/partition_compute/iceberg.rs b/src/stream/src/executor/partition_compute/iceberg.rs new file mode 100644 index 0000000000000..caccbbe98dccd --- /dev/null +++ b/src/stream/src/executor/partition_compute/iceberg.rs @@ -0,0 +1,78 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::str::FromStr; +use std::sync::Arc; + +use icelake::types::{ + create_transform_function, BoxedTransformFunction, Transform as IcelakeTransform, +}; +use risingwave_common::array::ArrayRef; +use risingwave_connector::sink::iceberg::ICEBERG_SINK; + +use super::{Transform, TransformBuilder}; +use crate::error::{StreamError, StreamResult}; +use crate::executor::partition_compute::PartitionComputeField; + +#[derive(Debug)] +pub struct IcebergTransformBuilder; + +impl TransformBuilder for IcebergTransformBuilder { + fn build( + spec: risingwave_pb::stream_plan::partition_compute_node::PartitionSpec, + ) -> StreamResult> { + assert_eq!(spec.connector_type, ICEBERG_SINK); + spec.fields + .into_iter() + .map(|field| { + Ok(PartitionComputeField { + col_index: field.col_id as usize, + transform: Box::new(IcebergTransform::from_str(&field.transform)?), + }) + }) + .collect::>>() + } +} + +pub struct IcebergTransform { + inner: BoxedTransformFunction, +} + +impl Debug for IcebergTransform { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IcebergTransform").finish() + } +} + +impl FromStr for IcebergTransform { + type Err = StreamError; + + fn from_str(s: &str) -> StreamResult { + let transform = IcelakeTransform::from_str(s).unwrap(); + let inner = create_transform_function(&transform).unwrap(); + Ok(Self { inner }) + } +} + +impl Transform for IcebergTransform { + fn transform(&self, array: &ArrayRef) -> StreamResult { + // convert array as arrow array + let res = self + .inner + .transform(array.as_ref().try_into().unwrap()) + .unwrap(); + Ok(Arc::new((&res).try_into().unwrap())) + } +} diff --git a/src/stream/src/executor/partition_compute/mod.rs b/src/stream/src/executor/partition_compute/mod.rs new file mode 100644 index 0000000000000..c76f2d53f0367 --- /dev/null +++ b/src/stream/src/executor/partition_compute/mod.rs @@ -0,0 +1,270 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod iceberg; +use std::fmt::{Debug, Formatter}; + +use futures_async_stream::try_stream; +pub use iceberg::*; +use risingwave_common::array::{ArrayImpl, ArrayRef, StructArray}; +use risingwave_common::catalog::Schema; +use risingwave_pb::stream_plan::partition_compute_node::PartitionSpec; + +use super::{ + ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, ExecutorInfo, Message, + PkIndicesRef, *, +}; + +pub trait TransformBuilder: Debug + Send { + fn build(spec: PartitionSpec) -> StreamResult>; +} + +pub trait Transform: Debug + Send { + fn transform(&self, array: &ArrayRef) -> StreamResult; +} + +pub struct PartitionComputeField { + col_index: usize, + transform: Box, +} + +pub struct PartitionComputeExecutor { + _ctx: ActorContextRef, + info: ExecutorInfo, + input: BoxedExecutor, + partition_compute_fields: Vec, +} + +impl PartitionComputeExecutor { + pub fn new( + ctx: ActorContextRef, + info: ExecutorInfo, + input: Box, + partition_compute_fields: Vec, + ) -> Self { + Self { + _ctx: ctx, + info, + input, + partition_compute_fields, + } + } +} + +impl Debug for PartitionComputeExecutor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartitionComputeExecutor").finish() + } +} + +impl Executor for PartitionComputeExecutor { + fn schema(&self) -> &Schema { + &self.info.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.info.pk_indices + } + + fn identity(&self) -> &str { + &self.info.identity + } + + fn execute(self: Box) -> BoxedMessageStream { + self.execute_inner().boxed() + } +} + +impl PartitionComputeExecutor { + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_inner(self) { + let input = self.input.execute(); + #[for_await] + for msg in input { + let msg = msg?; + match msg { + Message::Watermark(w) => yield Message::Watermark(w), + Message::Chunk(chunk) => { + let (chunk, op) = chunk.compact().into_parts(); + // compute the partition column + let mut partition_columns = + Vec::with_capacity(self.partition_compute_fields.len()); + for field in &self.partition_compute_fields { + partition_columns.push( + field + .transform + .transform(chunk.column_at(field.col_index)) + .unwrap(), + ); + } + // build partition_columns + let struct_array = StructArray::new( + self.info.schema.fields()[self.info.schema.fields().len() - 1] + .data_type + .as_struct() + .clone(), + partition_columns, + chunk.visibility().clone(), + ); + // build new chunk + let (mut columns, bitmap) = chunk.into_parts(); + columns.push(Arc::new(ArrayImpl::Struct(struct_array))); + let new_chunk = StreamChunk::with_visibility(op, columns, bitmap); + yield Message::Chunk(new_chunk); + } + m => yield m, + } + } + } +} + +#[cfg(test)] +mod test { + use futures::StreamExt; + use risingwave_common::array::stream_chunk::StreamChunkTestExt; + use risingwave_common::array::StreamChunk; + use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::types::{DataType, StructType}; + use risingwave_pb::stream_plan::partition_compute_node::PartitionField; + + use super::super::test_utils::MockSource; + use super::super::*; + use super::*; + + #[tokio::test] + async fn test_iceberg_partition_compute() { + // source + let chunk1 = StreamChunk::from_pretty( + " I I TZ T + + 34 4 1970-01-01T00:00:00.000000000+00:00 iceberg + + 34 5 1971-02-01T01:00:00.000000000+00:00 risingwave + + 34 6 1972-03-01T02:00:00.000000000+00:00 delta", + ); + let chunk2 = StreamChunk::from_pretty( + " I I TZ T + + 34 8 1970-05-01T06:00:00.000000000+00:00 iceberg + - 34 6 1970-06-01T07:00:00.000000000+00:00 kafka", + ); + let schema = Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Timestamptz), + Field::unnamed(DataType::Varchar), + ], + }; + let pk_indices = vec![0]; + let (mut tx, source) = MockSource::channel(schema, pk_indices); + + // build partition compute executor + let info = ExecutorInfo { + schema: Schema { + fields: vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Timestamptz), + Field::unnamed(DataType::Varchar), + Field::unnamed(DataType::Struct(StructType::new(vec![ + ("col0-identity", DataType::Int64), + ("col2-identity", DataType::Timestamptz), + ("col2-year", DataType::Int32), + ("col2-month", DataType::Int32), + ("col2-day", DataType::Int32), + ("col2-hour", DataType::Int32), + ("col0-bucket", DataType::Int32), + ("col3-truncate", DataType::Varchar), + ]))), + ], + }, + pk_indices: vec![4], + identity: "PartitionComputeExecutor".to_string(), + }; + let partition_compute = Box::new(PartitionComputeExecutor::new( + ActorContext::create(123), + info, + Box::new(source), + IcebergTransformBuilder::build(PartitionSpec { + connector_type: "iceberg".to_string(), + fields: vec![ + PartitionField { + col_id: 0, + transform: "identity".to_string(), + }, + PartitionField { + col_id: 2, + transform: "identity".to_string(), + }, + PartitionField { + col_id: 2, + transform: "year".to_string(), + }, + PartitionField { + col_id: 2, + transform: "month".to_string(), + }, + PartitionField { + col_id: 2, + transform: "day".to_string(), + }, + PartitionField { + col_id: 2, + transform: "hour".to_string(), + }, + PartitionField { + col_id: 0, + transform: "bucket[2017]".to_string(), + }, + PartitionField { + col_id: 3, + transform: "truncate[3]".to_string(), + }, + ], + }) + .unwrap(), + )); + let mut partition_compute = partition_compute.execute(); + + // test + tx.push_barrier(1, false); + let barrier = partition_compute.next().await.unwrap().unwrap(); + barrier.as_barrier().unwrap(); + + tx.push_chunk(chunk1); + tx.push_chunk(chunk2); + + let msg = partition_compute.next().await.unwrap().unwrap(); + assert_eq!( + *msg.as_chunk().unwrap(), + StreamChunk::from_pretty( + " I I TZ T + + 34 4 1970-01-01T00:00:00.000000000+00:00 iceberg (34,1970-01-01T00:00:00.000000000+00:00,0,0,0,0,1373,ice) + + 34 5 1971-02-01T01:00:00.000000000+00:00 risingwave (34,1971-02-01T01:00:00.000000000+00:00,1,13,396,9505,1373,ris) + + 34 6 1972-03-01T02:00:00.000000000+00:00 delta (34,1972-03-01T02:00:00.000000000+00:00,2,26,790,18962,1373,del)", + ) + ); + + let msg = partition_compute.next().await.unwrap().unwrap(); + assert_eq!( + *msg.as_chunk().unwrap(), + StreamChunk::from_pretty( + " I I TZ T + + 34 8 1970-05-01T06:00:00.000000000+00:00 iceberg (34,1970-05-01T06:00:00.000000000+00:00,0,4,120,2886,1373,ice) + - 34 6 1970-06-01T07:00:00.000000000+00:00 kafka (34,1970-06-01T07:00:00.000000000+00:00,0,5,151,3631,1373,kaf)" + ) + ); + + tx.push_barrier(2, true); + assert!(partition_compute.next().await.unwrap().unwrap().is_stop()); + } +} diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 86e8dd5ec47c3..32f398c6c5349 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -93,7 +93,13 @@ impl SinkExecutor { .iter() .map(|column| Field::from(&column.column_desc)) .collect(); - assert_eq!(input_schema.data_types(), info.schema.data_types()); + // # TODO + // Do we have more general way to check this? + if let Some(last_field) = info.schema.fields.last() && last_field.name == "_rw_partition"{ + assert_eq!(input_schema.data_types(), info.schema.data_types()[..info.schema.fields.len()-1]); + } else { + assert_eq!(input_schema.data_types(), info.schema.data_types()); + } Ok(Self { actor_context, diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 23e22794af26f..3e86e4fbbd63f 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -35,6 +35,7 @@ mod mview; mod no_op; mod now; mod over_window; +mod partition_compute; mod project; mod project_set; mod row_id_gen; @@ -93,6 +94,7 @@ use self::union::*; use self::watermark_filter::WatermarkFilterBuilder; use crate::error::StreamResult; use crate::executor::{BoxedExecutor, Executor, ExecutorInfo}; +use crate::from_proto::partition_compute::PartitionComputeExecutorBuilder; use crate::from_proto::values::ValuesExecutorBuilder; use crate::task::{ExecutorParams, LocalStreamManagerCore}; @@ -172,5 +174,6 @@ pub async fn create_executor( NodeBody::EowcOverWindow => EowcOverWindowExecutorBuilder, NodeBody::OverWindow => OverWindowExecutorBuilder, NodeBody::StreamFsFetch => FsFetchExecutorBuilder, + NodeBody::PartitionCompute => PartitionComputeExecutorBuilder } } diff --git a/src/stream/src/from_proto/partition_compute.rs b/src/stream/src/from_proto/partition_compute.rs new file mode 100644 index 0000000000000..e34cfe9860e33 --- /dev/null +++ b/src/stream/src/from_proto/partition_compute.rs @@ -0,0 +1,51 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::PartitionComputeNode; +use risingwave_storage::StateStore; + +use super::ExecutorBuilder; +use crate::error::StreamResult; +use crate::executor::partition_compute::{ + IcebergTransformBuilder, PartitionComputeExecutor, TransformBuilder, +}; +use crate::executor::BoxedExecutor; +use crate::task::{ExecutorParams, LocalStreamManagerCore}; + +pub struct PartitionComputeExecutorBuilder; + +impl ExecutorBuilder for PartitionComputeExecutorBuilder { + type Node = PartitionComputeNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + _store: impl StateStore, + _stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let [input]: [_; 1] = params.input.try_into().unwrap(); + let spec = node.spec.as_ref().unwrap().clone(); + let connector_type = spec.connector_type.clone(); + let partition_compute_fields = match connector_type.as_str() { + "iceberg" => IcebergTransformBuilder::build(spec).unwrap(), + _ => unimplemented!("unsupported connector type: {}", spec.connector_type), + }; + Ok(Box::new(PartitionComputeExecutor::new( + params.actor_context, + params.info, + input, + partition_compute_fields, + ))) + } +}