From bc926bd5013ec403ea0dbeca5ba9e162b88dbc8b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 22 Jan 2024 12:42:28 +0800 Subject: [PATCH] support precompute partition --- Cargo.lock | 5 +- Cargo.toml | 2 +- proto/stream_plan.proto | 1 + src/common/src/array/arrow/arrow_impl.rs | 9 +- src/connector/Cargo.toml | 2 + src/connector/src/sink/catalog/desc.rs | 5 + src/connector/src/sink/iceberg/mod.rs | 282 +++++++++++------- src/connector/src/sink/mod.rs | 2 + src/frontend/Cargo.toml | 1 + src/frontend/planner_test/src/lib.rs | 3 + src/frontend/src/handler/create_sink.rs | 98 +++++- src/frontend/src/handler/explain.rs | 13 +- src/frontend/src/optimizer/mod.rs | 7 +- src/frontend/src/optimizer/plan_node/mod.rs | 2 +- .../src/optimizer/plan_node/stream_sink.rs | 178 +++++++++-- src/stream/src/executor/sink.rs | 14 +- src/stream/src/from_proto/sink.rs | 1 + 17 files changed, 489 insertions(+), 136 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 090129a564581..c35c06a58dfae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5060,7 +5060,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=32c0bbf242f5c47b1e743f10577012fe7436c770#32c0bbf242f5c47b1e743f10577012fe7436c770" +source = "git+https://github.com/icelake-io/icelake?rev=3f61f900d6914d4a28c00c2af6374a4dda6d95d4#3f61f900d6914d4a28c00c2af6374a4dda6d95d4" dependencies = [ "anyhow", "apache-avro 0.17.0", @@ -8936,7 +8936,9 @@ dependencies = [ "anyhow", "apache-avro 0.16.0", "arrow-array 49.0.0", + "arrow-row 49.0.0", "arrow-schema 49.0.0", + "arrow-select 49.0.0", "async-nats", "async-trait", "auto_enums", @@ -9216,6 +9218,7 @@ dependencies = [ "futures", "futures-async-stream", "iana-time-zone", + "icelake", "itertools 0.12.0", "madsim-tokio", "madsim-tonic", diff --git a/Cargo.toml b/Cargo.toml index e0280fe3b9323..5f15df78762a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "3f61f900d6914d4a28c00c2af6374a4dda6d95d4", features = [ "prometheus", ] } arrow-array = "49" diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e69a712c9e3d8..e3979ddc459a5 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -223,6 +223,7 @@ message SinkDesc { string sink_from_name = 12; catalog.SinkFormatDesc format_desc = 13; optional uint32 target_table = 14; + optional uint64 extra_partition_col_idx = 15; } enum SinkLogStoreType { diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 87f82377f9675..7274bb3f34e4d 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -133,6 +133,7 @@ macro_rules! converts_generic { fn try_from(array: &ArrayImpl) -> Result { match array { $($ArrayImplPattern(a) => Ok(Arc::new(<$ArrowType>::try_from(a)?)),)* + ArrayImpl::Timestamptz(a) => Ok(Arc::new(arrow_array::TimestampMicrosecondArray::try_from(a)?. with_timezone_utc())), _ => todo!("unsupported array"), } } @@ -152,6 +153,13 @@ macro_rules! converts_generic { .unwrap() .try_into()?, )),)* + Timestamp(Microsecond, Some(_)) => Ok(ArrayImpl::Timestamptz( + array + .as_any() + .downcast_ref::() + .unwrap() + .try_into()?, + )), t => Err(ArrayError::from_arrow(format!("unsupported data type: {t:?}"))), } } @@ -173,7 +181,6 @@ converts_generic! { { arrow_array::Decimal256Array, Decimal256(_, _), ArrayImpl::Int256 }, { arrow_array::Date32Array, Date32, ArrayImpl::Date }, { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, None), ArrayImpl::Timestamp }, - { arrow_array::TimestampMicrosecondArray, Timestamp(Microsecond, Some(_)), ArrayImpl::Timestamptz }, { arrow_array::Time64MicrosecondArray, Time64(Microsecond), ArrayImpl::Time }, { arrow_array::IntervalMonthDayNanoArray, Interval(MonthDayNano), ArrayImpl::Interval }, { arrow_array::StructArray, Struct(_), ArrayImpl::Struct }, diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3f469e5ad65ae..2ed27a2dcc576 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -22,7 +22,9 @@ apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "d0846a16c "xz", ] } arrow-array = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } async-nats = "0.33" async-trait = "0.1" auto_enums = { version = "0.8", features = ["futures03"] } diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index 513a39ae90e31..33abc4ce52570 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -67,6 +67,10 @@ pub struct SinkDesc { /// Id of the target table for sink into table. pub target_table: Option, + + /// Indicate whether the sink accepts the data chunk with extra partition column. + /// For more detil of partition column, see `PartitionComputeInfo` + pub extra_partition_col_idx: Option, } impl SinkDesc { @@ -123,6 +127,7 @@ impl SinkDesc { db_name: self.db_name.clone(), sink_from_name: self.sink_from_name.clone(), target_table: self.target_table.map(|table_id| table_id.table_id()), + extra_partition_col_idx: self.extra_partition_col_idx.map(|idx| idx as u64), } } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 856c8deea0f40..ce9b525d957b4 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -20,7 +20,9 @@ use std::fmt::Debug; use std::ops::Deref; use anyhow::anyhow; -use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef}; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use async_trait::async_trait; use icelake::catalog::{ load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME, @@ -48,7 +50,6 @@ use url::Url; use with_options::WithOptions; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; -use self::prometheus::monitored_partition_writer::MonitoredFanoutPartitionedWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -177,7 +178,7 @@ impl IcebergConfig { self.catalog_type.as_deref().unwrap_or("storage") } - fn build_iceberg_configs(&self) -> Result> { + pub fn build_iceberg_configs(&self) -> Result> { let mut iceberg_configs = HashMap::new(); let catalog_type = self.catalog_type().to_string(); @@ -346,6 +347,55 @@ impl IcebergConfig { } } +pub async fn create_catalog(config: &IcebergConfig) -> Result { + match config.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = config.build_iceberg_configs()?; + let catalog = load_catalog(&iceberg_configs) + .await + .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; + Ok(catalog) + } + catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = config.build_jni_catalog_configs()?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", + _ => unreachable!(), + }; + + jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) + } + _ => { + Err(SinkError::Iceberg(anyhow!( + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", + config.catalog_type() + ))) + } + } +} + +pub async fn create_table(config: &IcebergConfig) -> Result { + let catalog = create_catalog(config) + .await + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; + + let table_id = TableIdentifier::new( + vec![config.database_name.as_str()] + .into_iter() + .chain(config.table_name.split('.')), + ) + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; + + catalog + .load_table(&table_id) + .await + .map_err(|err| SinkError::Iceberg(anyhow!(err))) +} + pub struct IcebergSink { config: IcebergConfig, param: SinkParam, @@ -371,54 +421,8 @@ impl Debug for IcebergSink { } impl IcebergSink { - async fn create_catalog(&self) -> Result { - match self.config.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = self.config.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs) - .await - .map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - Ok(catalog) - } - catalog_type if catalog_type == "hive" || catalog_type == "sql" || catalog_type == "glue" || catalog_type == "dynamodb" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = self.config.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "sql" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - "dynamodb" => "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog", - _ => unreachable!(), - }; - - jni_catalog::JniCatalog::build(base_catalog_config, "risingwave", catalog_impl, java_catalog_props) - } - _ => { - Err(SinkError::Iceberg(anyhow!( - "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", - self.config.catalog_type() - ))) - } - } - } - - async fn create_table(&self) -> Result
{ - let catalog = self - .create_catalog() - .await - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; - - let table_id = TableIdentifier::new( - vec![self.config.database_name.as_str()] - .into_iter() - .chain(self.config.table_name.split('.')), - ) - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - - let table = catalog - .load_table(&table_id) - .await - .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; + async fn create_validated_table(&self) -> Result
{ + let table = create_table(&self.config).await?; let sink_schema = self.param.schema(); let iceberg_schema = table @@ -475,12 +479,12 @@ impl Sink for IcebergSink { const SINK_NAME: &'static str = ICEBERG_SINK; async fn validate(&self) -> Result<()> { - let _ = self.create_table().await?; + let _ = self.create_validated_table().await?; Ok(()) } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - let table = self.create_table().await?; + let table = self.create_validated_table().await?; let inner = if let Some(unique_column_ids) = &self.unique_column_ids { IcebergWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await? } else { @@ -505,7 +509,7 @@ impl Sink for IcebergSink { } async fn new_coordinator(&self) -> Result { - let table = self.create_table().await?; + let table = self.create_validated_table().await?; let partition_type = table.current_partition_type()?; Ok(IcebergSinkCommitter { @@ -526,6 +530,31 @@ enum IcebergWriterEnum { } impl IcebergWriter { + fn schema_with_extra_partition_col(table: &Table, idx: usize) -> Result { + let schema = table.current_arrow_schema()?; + + let mut fields = schema.fields().to_vec(); + let partition_type = + if let ArrowDataType::Struct(s) = table.current_partition_type()?.try_into()? { + let fields = Fields::from( + s.into_iter() + .enumerate() + .map(|(id, field)| { + ArrowField::new(format!("f{id}"), field.data_type().clone(), true) + }) + .collect::>(), + ); + ArrowDataType::Struct(fields) + } else { + unimplemented!() + }; + fields.insert( + idx, + ArrowField::new("_rw_partition", partition_type, false).into(), + ); + Ok(ArrowSchema::new(fields).into()) + } + pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; @@ -537,30 +566,54 @@ impl IcebergWriter { .iceberg_rolling_unflushed_data_file .clone(), )); - let partition_data_file_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); - let dispatch_builder = builder_helper - .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; - // wrap a layer with collect write metrics - let prometheus_builder = PrometheusWriterBuilder::new( - dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), - ); - let schema = table.current_arrow_schema()?; - let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); - Ok(Self { - inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), - schema, - }) + if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + let partition_data_file_builder = builder_helper.precompute_partition_writer_builder( + data_file_builder.clone(), + extra_partition_col_idx, + )?; + let dispatch_builder = builder_helper + .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), + schema, + }) + } else { + let partition_data_file_builder = + builder_helper.fanout_partition_writer_builder(data_file_builder.clone())?; + let dispatch_builder = builder_helper + .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = table.current_arrow_schema()?; + let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), + schema, + }) + } } pub async fn new_upsert( @@ -592,30 +645,55 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - let partition_delta_builder = MonitoredFanoutPartitionedWriterBuilder::new( - builder_helper.fanout_partition_writer_builder(delta_builder.clone())?, - writer_param.sink_metrics.iceberg_partition_num.clone(), - ); - let dispatch_builder = - builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; - // wrap a layer with collect write metrics - let prometheus_builder = PrometheusWriterBuilder::new( - dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), - ); - let schema = table.current_arrow_schema()?; - let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); - Ok(Self { - inner_writer: IcebergWriterEnum::Upsert(inner_writer), - schema, - }) + if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + let partition_delta_builder = builder_helper.precompute_partition_writer_builder( + delta_builder.clone(), + extra_partition_col_idx, + )?; + let dispatch_builder = + builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::Upsert(inner_writer), + schema, + }) + } else { + let partition_delta_builder = + builder_helper.fanout_partition_writer_builder(delta_builder.clone())?; + let dispatch_builder = + builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; + // wrap a layer with collect write metrics + let prometheus_builder = PrometheusWriterBuilder::new( + dispatch_builder, + WriterMetrics::new( + writer_param.sink_metrics.iceberg_write_qps.deref().clone(), + writer_param + .sink_metrics + .iceberg_write_latency + .deref() + .clone(), + ), + ); + let schema = table.current_arrow_schema()?; + let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); + Ok(Self { + inner_writer: IcebergWriterEnum::Upsert(inner_writer), + schema, + }) + } } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index be8d252ded5e4..70da9bb8533f4 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -270,6 +270,7 @@ pub struct SinkWriterParam { pub vnode_bitmap: Option, pub meta_client: Option, pub sink_metrics: SinkMetrics, + pub extra_partition_col_idx: Option, } #[derive(Clone)] @@ -303,6 +304,7 @@ impl SinkWriterParam { vnode_bitmap: Default::default(), meta_client: Default::default(), sink_metrics: SinkMetrics::for_test(), + extra_partition_col_idx: Default::default(), } } } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 3c9a1b62f94dc..97f9454effff7 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -38,6 +38,7 @@ fixedbitset = "0.4.2" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = { workspace = true } iana-time-zone = "0.1" +icelake = { workspace = true } itertools = "0.12" maplit = "1" md5 = "0.7.0" diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index f3c7d9250ec44..46a69992e1996 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -833,6 +833,9 @@ impl TestCase { format_desc, false, None, + // # TODO + // Fixed me. It may should not use None here. + None, ) { Ok(sink_plan) => { ret.sink_plan = Some(explain_plan(&sink_plan.into())); diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index f3d9053571558..c6650e6ef774d 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -17,6 +17,7 @@ use std::rc::Rc; use std::sync::{Arc, LazyLock}; use anyhow::Context; +use arrow_schema::DataType as ArrowDataType; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; @@ -27,6 +28,7 @@ use risingwave_common::types::Datum; use risingwave_common::util::value_encoding::DatumFromProtoExt; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::iceberg::{create_table, IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; @@ -43,6 +45,7 @@ use risingwave_sqlparser::ast::{ use risingwave_sqlparser::parser::Parser; use super::create_mv::get_column_names; +use super::create_source::UPSTREAM_SOURCE_KEY; use super::RwPgResponse; use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; @@ -52,13 +55,15 @@ use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGene use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; -use crate::optimizer::plan_node::{generic, Explain, LogicalSource, StreamProject}; +use crate::optimizer::plan_node::{ + generic, IcebergPartitionInfo, LogicalSource, PartitionComputeInfo, StreamProject, +}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; use crate::utils::resolve_privatelink_in_with_option; -use crate::{Planner, TableCatalog, WithOptions}; +use crate::{Explain, Planner, TableCatalog, WithOptions}; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { let table_factor = TableFactor::Table { @@ -98,6 +103,7 @@ pub fn gen_sink_plan( session: &SessionImpl, context: OptimizerContextRef, stmt: CreateSinkStatement, + partition_info: Option, ) -> Result { let db_name = session.database(); let (sink_schema_name, sink_table_name) = @@ -227,6 +233,7 @@ pub fn gen_sink_plan( format_desc, without_backfill, target_table, + partition_info, )?; let sink_desc = sink_plan.sink_desc().clone(); @@ -291,6 +298,89 @@ pub fn gen_sink_plan( }) } +// This function is used to return partition compute info for a sink. More details refer in `PartitionComputeInfo`. +// Return: +// `Some(PartitionComputeInfo)` if the sink need to compute partition. +// `None` if the sink does not need to compute partition. +pub async fn get_partition_compute_info( + with_options: &WithOptions, +) -> Result> { + let properties = HashMap::from_iter(with_options.clone().into_inner().into_iter()); + let connector = properties.get(UPSTREAM_SOURCE_KEY).ok_or_else(|| { + RwError::from(ErrorCode::SinkError( + format!("missing field {UPSTREAM_SOURCE_KEY} in sink config").into(), + )) + })?; + match connector.as_str() { + ICEBERG_SINK => { + let iceberg_config = IcebergConfig::from_hashmap(properties)?; + get_partition_compute_info_for_iceberg(&iceberg_config).await + } + _ => Ok(None), + } +} + +async fn get_partition_compute_info_for_iceberg( + iceberg_config: &IcebergConfig, +) -> Result> { + let table = create_table(iceberg_config).await?; + let partition_spec = table + .current_table_metadata() + .current_partition_spec() + .map_err(|err| { + RwError::from(ErrorCode::SinkError( + format!("Failed to get icberg partition spec: {}", err).into(), + )) + })?; + + if partition_spec.is_unpartitioned() { + return Ok(None); + } + + // Separate the partition spec into two parts: sparse partition and range partition. + // Sparse partition means that the data distribution is more sparse at a given time. + // Range partition means that the data distribution is likely same at a given time. + // Only compute the partition and shuffle by them for the sparse partition. + let has_sparse_partition = partition_spec.fields.iter().any(|f| match f.transform { + // Sparse partition + // # TODO + // `Identity`` transform may need to depends on the input type. + icelake::types::Transform::Identity + | icelake::types::Transform::Truncate(_) + | icelake::types::Transform::Bucket(_) => true, + // Range partition + icelake::types::Transform::Year + | icelake::types::Transform::Month + | icelake::types::Transform::Day + | icelake::types::Transform::Hour + | icelake::types::Transform::Void => false, + }); + + if !has_sparse_partition { + return Ok(None); + } + + let arrow_type: ArrowDataType = table + .current_partition_type() + .map_err(|err| RwError::from(ErrorCode::SinkError(err.into())))? + .try_into() + .map_err(|_| { + RwError::from(ErrorCode::SinkError( + "Fail to convert iceberg partition type to arrow type".into(), + )) + })?; + let partition_fields = partition_spec + .fields + .iter() + .map(|f| (f.name.clone(), f.transform)) + .collect_vec(); + + Ok(Some(PartitionComputeInfo::Iceberg(IcebergPartitionInfo { + partition_type: arrow_type.into(), + partition_fields, + }))) +} + pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -305,6 +395,8 @@ pub async fn handle_create_sink( return Ok(resp); } + let partition_info = get_partition_compute_info(&handle_args.with_options).await?; + let (sink, graph, target_table_catalog) = { let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); @@ -313,7 +405,7 @@ pub async fn handle_create_sink( sink_plan: plan, sink_catalog: sink, target_table_catalog, - } = gen_sink_plan(&session, context.clone(), stmt)?; + } = gen_sink_plan(&session, context.clone(), stmt, partition_info)?; let has_order_by = !query.order_by.is_empty(); if has_order_by { diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index b7981cf7aec6e..e5489bf258f8b 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -24,7 +24,7 @@ use thiserror_ext::AsReport; use super::create_index::gen_create_index_plan; use super::create_mv::gen_create_mv_plan; -use super::create_sink::gen_sink_plan; +use super::create_sink::{gen_sink_plan, get_partition_compute_info}; use super::create_table::ColumnIdGenerator; use super::query::gen_batch_plan_by_statement; use super::util::SourceSchemaCompatExt; @@ -88,6 +88,13 @@ async fn do_handle_explain( let context = plan.ctx(); (Ok(plan), context) } + Statement::CreateSink { stmt } => { + let partition_info = get_partition_compute_info(context.with_options()).await?; + let plan = gen_sink_plan(&session, context.into(), stmt, partition_info) + .map(|plan| plan.sink_plan)?; + let context = plan.ctx(); + (Ok(plan), context) + } // For other queries without `await` point, we can keep a copy of reference to the // `OptimizerContext` even if the planning fails. This enables us to log the partial @@ -122,10 +129,6 @@ async fn do_handle_explain( "A created VIEW is just an alias. Instead, use EXPLAIN on the queries which reference the view.".into() ).into()); } - Statement::CreateSink { stmt } => { - gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan) - } - Statement::CreateIndex { name, table_name, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index cc66d1341b4f7..7cd7c943df46c 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -60,8 +60,8 @@ use self::heuristic_optimizer::ApplyOrder; use self::plan_node::generic::{self, PhysicalPlanRef}; use self::plan_node::{ stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource, - StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter, - ToStreamContext, + PartitionComputeInfo, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, + StreamWatermarkFilter, ToStreamContext, }; #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; @@ -775,6 +775,7 @@ impl PlanRoot { } /// Optimize and generate a create sink plan. + #[allow(clippy::too_many_arguments)] pub fn gen_sink_plan( &mut self, sink_name: String, @@ -786,6 +787,7 @@ impl PlanRoot { format_desc: Option, without_backfill: bool, target_table: Option, + partition_info: Option, ) -> Result { let stream_scan_type = if without_backfill { StreamScanType::UpstreamOnly @@ -816,6 +818,7 @@ impl PlanRoot { definition, properties, format_desc, + partition_info, ) } diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index a0475c4ae092e..248bb59f4edde 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -943,7 +943,7 @@ pub use stream_project_set::StreamProjectSet; pub use stream_row_id_gen::StreamRowIdGen; pub use stream_share::StreamShare; pub use stream_simple_agg::StreamSimpleAgg; -pub use stream_sink::StreamSink; +pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink}; pub use stream_sort::StreamEowcSort; pub use stream_source::StreamSource; pub use stream_stateless_simple_agg::StreamStatelessSimpleAgg; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 8f7abc66fb455..6ce295546d020 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -17,27 +17,32 @@ use std::io::{Error, ErrorKind}; use anyhow::anyhow; use fixedbitset::FixedBitSet; +use icelake::types::Transform; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnCatalog, Field, TableId}; use risingwave_common::constants::log_store::v1::{KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::types::{DataType, StructType}; use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::desc::SinkDesc; use risingwave_connector::sink::catalog::{SinkFormat, SinkFormatDesc, SinkId, SinkType}; +use risingwave_connector::sink::iceberg::ICEBERG_SINK; use risingwave_connector::sink::trivial::TABLE_SINK; use risingwave_connector::sink::{ SinkError, CONNECTOR_TYPE_KEY, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; +use risingwave_pb::expr::expr_node::Type; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::{derive_columns, derive_pk}; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef}; use super::stream::prelude::*; use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder}; -use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; +use super::{ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProject}; +use crate::expr::{Expr, ExprImpl, FunctionCall, InputRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::PlanTreeNodeUnary; use crate::optimizer::property::{Distribution, Order, RequiredDist}; @@ -46,6 +51,102 @@ use crate::{TableCatalog, WithOptions}; const DOWNSTREAM_PK_KEY: &str = "primary_key"; +/// ## Why we need `PartitionComputeInfo`? +/// +/// For some sink, it will write the data into different file based on the partition value. E.g. iceberg sink() +/// For this kind of sink, the file num can be reduced if we can shuffle the data based on the partition value. More details can be found in . +/// So if the `PartitionComputeInfo` provided, we will create a `StreamProject` node to compute the partition value and shuffle the data based on the partition value before the sink. +/// +/// ## What is `PartitionComputeInfo`? +/// The `PartitionComputeInfo` contains the information about partition compute. The stream sink will use +/// these information to create the corresponding expression in `StreamProject` node. +/// +/// #TODO +/// Maybe we should move this in sink? +pub enum PartitionComputeInfo { + Iceberg(IcebergPartitionInfo), +} + +impl PartitionComputeInfo { + pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result { + match self { + PartitionComputeInfo::Iceberg(info) => info.convert_to_expression(columns), + } + } +} + +pub struct IcebergPartitionInfo { + pub partition_type: DataType, + // (partition_field_name, partition_field_transform) + pub partition_fields: Vec<(String, Transform)>, +} + +impl IcebergPartitionInfo { + #[inline] + fn transform_to_expression( + transform: &Transform, + col_id: usize, + columns: &[ColumnCatalog], + ) -> Result { + match transform { + Transform::Identity => Ok(ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()).into(), + )), + Transform::Void => Ok(ExprImpl::literal_null( + columns[col_id].column_desc.data_type.clone(), + )), + _ => Ok(ExprImpl::FunctionCall( + FunctionCall::new( + Type::IcebergTransform, + vec![ + ExprImpl::literal_varchar(transform.to_string()), + ExprImpl::InputRef( + InputRef::new(col_id, columns[col_id].column_desc.data_type.clone()) + .into(), + ), + ], + ) + .unwrap() + .into(), + )), + } + } + + pub fn convert_to_expression(self, columns: &[ColumnCatalog]) -> Result { + let child_exprs = self + .partition_fields + .into_iter() + .map(|(field_name, transform)| { + let col_id = find_column_idx_by_name(columns, &field_name)?; + Self::transform_to_expression(&transform, col_id, columns) + }) + .collect::>>()?; + let return_type = DataType::Struct(StructType::new( + child_exprs + .iter() + .enumerate() + .map(|(id, expr)| (format!("f{id}"), expr.return_type())) + .collect_vec(), + )); + Ok(ExprImpl::FunctionCall( + FunctionCall::new_unchecked(Type::Row, child_exprs, return_type).into(), + )) + } +} +#[inline] +fn find_column_idx_by_name(columns: &[ColumnCatalog], col_name: &str) -> Result { + columns + .iter() + .position(|col| col.column_desc.name == col_name) + .ok_or_else(|| { + ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", col_name) + ))) + .into() + }) +} + /// [`StreamSink`] represents a table/connector sink at the very end of the graph. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamSink { @@ -87,6 +188,7 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result { let columns = derive_columns(input.schema(), out_names, &user_cols)?; let (input, sink) = Self::derive_sink_desc( @@ -101,6 +203,7 @@ impl StreamSink { definition, properties, format_desc, + partition_info, )?; let unsupported_sink = @@ -133,9 +236,42 @@ impl StreamSink { Ok(Self::new(input, sink)) } + fn derive_iceberg_sink_distribution( + input: PlanRef, + partition_info: Option, + columns: &[ColumnCatalog], + ) -> Result<(RequiredDist, PlanRef, Option)> { + // For here, we need to add the plan node to compute the partition value, and add it as a extra column. + if let Some(partition_info) = partition_info { + let input_fields = input.schema().fields(); + + let mut exprs: Vec<_> = input_fields + .iter() + .enumerate() + .map(|(idx, field)| InputRef::new(idx, field.data_type.clone()).into()) + .collect(); + + // Add the partition compute expression to the end of the exprs + exprs.push(partition_info.convert_to_expression(columns)?); + let partition_col_idx = exprs.len() - 1; + let project = StreamProject::new(generic::Project::new(exprs.clone(), input)); + Ok(( + RequiredDist::shard_by_key(project.schema().len(), &[partition_col_idx]), + project.into(), + Some(partition_col_idx), + )) + } else { + Ok(( + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()), + input, + None, + )) + } + } + #[allow(clippy::too_many_arguments)] fn derive_sink_desc( - input: PlanRef, + mut input: PlanRef, user_distributed_by: RequiredDist, name: String, db_name: String, @@ -146,12 +282,14 @@ impl StreamSink { definition: String, properties: WithOptions, format_desc: Option, + partition_info: Option, ) -> Result<(PlanRef, SinkDesc)> { let sink_type = Self::derive_sink_type(input.append_only(), &properties, format_desc.as_ref())?; let (pk, _) = derive_pk(input.clone(), user_order_by, &columns); - let downstream_pk = Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; - + let mut downstream_pk = + Self::parse_downstream_pk(&columns, properties.get(DOWNSTREAM_PK_KEY))?; + let mut extra_partition_col_idx = None; let required_dist = match input.distribution() { Distribution::Single => RequiredDist::single(), _ => { @@ -171,6 +309,21 @@ impl StreamSink { // lock contentions RequiredDist::hash_shard(downstream_pk.as_slice()) } + Some(s) if s == ICEBERG_SINK => { + // If user doesn't specify the downstream primary key, we use the stream key as the pk. + if sink_type.is_upsert() && downstream_pk.is_empty() { + downstream_pk = pk.iter().map(|k| k.column_index).collect_vec(); + } + let (required_dist, new_input, partition_col_idx) = + Self::derive_iceberg_sink_distribution( + input, + partition_info, + &columns, + )?; + input = new_input; + extra_partition_col_idx = partition_col_idx; + required_dist + } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); if downstream_pk.is_empty() { @@ -206,6 +359,7 @@ impl StreamSink { sink_type, format_desc, target_table, + extra_partition_col_idx, }; Ok((input, sink_desc)) } @@ -312,19 +466,7 @@ impl StreamSink { if trimmed_key.is_empty() { continue; } - match columns - .iter() - .position(|col| col.column_desc.name == trimmed_key) - { - Some(index) => downstream_pk_indices.push(index), - None => { - return Err(ErrorCode::SinkError(Box::new(Error::new( - ErrorKind::InvalidInput, - format!("Sink primary key column not found: {}. Please use ',' as the delimiter for different primary key columns.", trimmed_key), - ))) - .into()); - } - } + downstream_pk_indices.push(find_column_idx_by_name(columns, trimmed_key)?); } Ok(downstream_pk_indices) } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 5aca2842f5ad6..bf0f049cf4eb0 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -90,11 +90,21 @@ impl SinkExecutor { log_store_factory: F, ) -> StreamExecutorResult { let sink = build_sink(sink_param.clone())?; - let input_schema: Schema = columns + let sink_input_schema: Schema = columns .iter() .map(|column| Field::from(&column.column_desc)) .collect(); - assert_eq!(input_schema.data_types(), info.schema.data_types()); + + if let Some(col_dix) = sink_writer_param.extra_partition_col_idx { + // Remove the partition column from the schema. + assert_eq!(sink_input_schema.data_types(), { + let mut data_type = info.schema.data_types(); + data_type.remove(col_dix); + data_type + }); + } else { + assert_eq!(sink_input_schema.data_types(), info.schema.data_types()); + } Ok(Self { actor_context, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index fb05faf90dd8e..2837564eefad8 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -117,6 +117,7 @@ impl ExecutorBuilder for SinkExecutorBuilder { vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), sink_metrics, + extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize), }; let log_store_identity = format!(