diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cb54c1606356..78574872d8f0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,6 +6,7 @@ repos: rev: v2.3.0 hooks: - id: end-of-file-fixer + exclude: 'src/frontend/planner_test/tests/testdata/.*' - id: trailing-whitespace - repo: https://github.com/crate-ci/typos rev: v1.23.1 diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index b55575d6eb1a..2fb2d2a8c064 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -36,7 +36,7 @@ python3 -m pip install --break-system-packages requests protobuf fastavro conflu apt-get -y install jq echo "--- e2e, inline test" -RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \ risedev ci-start ci-inline-source-test risedev slt './e2e_test/source_inline/**/*.slt' -j16 risedev slt './e2e_test/source_inline/**/*.slt.serial' diff --git a/e2e_test/source_inline/kafka/issue_19563.slt.serial b/e2e_test/source_inline/kafka/issue_19563.slt.serial index 91b795d6acf3..528fac073576 100644 --- a/e2e_test/source_inline/kafka/issue_19563.slt.serial +++ b/e2e_test/source_inline/kafka/issue_19563.slt.serial @@ -24,9 +24,9 @@ explain create materialized view mv1 as select v1 from kafkasource where v1 betw StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamDynamicFilter { predicate: (v1 <= $expr1), output: [v1, _row_id], cleaned_by_watermark: true } ├─StreamProject { exprs: [v1, _row_id], output_watermarks: [v1] } - │ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset], cleaned_by_watermark: true } - │ ├─StreamRowIdGen { row_id_index: 2 } - │ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } + │ └─StreamDynamicFilter { predicate: (v1 >= now), output_watermarks: [v1], output: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], cleaned_by_watermark: true } + │ ├─StreamRowIdGen { row_id_index: 4 } + │ │ └─StreamSourceScan { columns: [v1, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } │ └─StreamExchange { dist: Broadcast } │ └─StreamNow └─StreamExchange { dist: Broadcast } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 78ca50f0c5e9..5edb050a5e48 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -281,9 +281,20 @@ pub fn build_additional_column_desc( pub fn source_add_partition_offset_cols( columns: &[ColumnCatalog], connector_name: &str, + skip_col_id: bool, ) -> ([bool; 2], [ColumnDesc; 2]) { let mut columns_exist = [false; 2]; + let mut last_column_id = max_column_id(columns); + let mut assign_col_id = || { + if skip_col_id { + // col id will be filled outside later. Here just use a placeholder. + ColumnId::placeholder() + } else { + last_column_id = last_column_id.next(); + last_column_id + } + }; let additional_columns: Vec<_> = { let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS @@ -292,11 +303,10 @@ pub fn source_add_partition_offset_cols( ["partition", "file", "offset"] .iter() .filter_map(|col_type| { - last_column_id = last_column_id.next(); if compat_col_types.contains(col_type) { Some( build_additional_column_desc( - last_column_id, + assign_col_id(), connector_name, col_type, None, diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index af607d2537ea..d64435ef3a3c 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -91,7 +91,7 @@ impl SourceDescBuilder { .map(|s| s.to_lowercase()) .unwrap(); let (columns_exist, additional_columns) = - source_add_partition_offset_cols(&self.columns, &connector_name); + source_add_partition_offset_cols(&self.columns, &connector_name, false); let mut columns: Vec<_> = self .columns diff --git a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml index 63daa9c43b2d..2b7f23dff932 100644 --- a/src/frontend/planner_test/tests/testdata/output/batch_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/batch_source.yaml @@ -3,11 +3,11 @@ select * from s logical_plan: |- LogicalProject { exprs: [id, value] } - └─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id] } + └─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [id, value] } - └─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], filter: (None, None) } + └─BatchKafkaScan { source: s, columns: [id, value, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) } create_source: format: plain encode: protobuf diff --git a/src/frontend/planner_test/tests/testdata/output/shared_source.yml b/src/frontend/planner_test/tests/testdata/output/shared_source.yml index 83fde26bfc7d..fa75610ff09e 100644 --- a/src/frontend/planner_test/tests/testdata/output/shared_source.yml +++ b/src/frontend/planner_test/tests/testdata/output/shared_source.yml @@ -68,12 +68,12 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [x, y] } - └─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) } + └─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) } stream_plan: |- StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [x, y, _row_id] } - └─StreamRowIdGen { row_id_index: 3 } - └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } + └─StreamRowIdGen { row_id_index: 5 } + └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } with_config_map: streaming_use_shared_source: 'true' - before: @@ -84,11 +84,11 @@ batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [x, y] } - └─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _row_id], filter: (None, None) } + └─BatchKafkaScan { source: s, columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id], filter: (None, None) } stream_plan: |- StreamMaterialize { columns: [x, y, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } └─StreamProject { exprs: [x, y, _row_id] } - └─StreamRowIdGen { row_id_index: 3 } - └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _row_id, _rw_kafka_partition, _rw_kafka_offset] } + └─StreamRowIdGen { row_id_index: 5 } + └─StreamSourceScan { columns: [x, y, _rw_kafka_timestamp, _rw_kafka_partition, _rw_kafka_offset, _row_id] } with_config_map: streaming_use_shared_source: 'true' diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 9fbc557478a8..2fd87cc4dcae 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -30,8 +30,10 @@ use risingwave_common::catalog::{ use risingwave_common::license::Feature; use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::parser::additional_columns::{ build_additional_column_desc, get_supported_additional_columns, + source_add_partition_offset_cols, }; use risingwave_connector::parser::{ fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig, @@ -1476,6 +1478,7 @@ pub async fn bind_create_source_or_table_with_connector( col_id_gen: &mut ColumnIdGenerator, // `true` for "create source", `false` for "create table with connector" is_create_source: bool, + is_shared_non_cdc: bool, source_rate_limit: Option, ) -> Result<(SourceCatalog, DatabaseId, SchemaId)> { let session = &handler_args.session; @@ -1538,6 +1541,21 @@ pub async fn bind_create_source_or_table_with_connector( if is_create_source { // must behind `handle_addition_columns` check_and_add_timestamp_column(&with_properties, &mut columns); + + // For shared sources, we will include partition and offset cols in the SourceExecutor's *output*, to be used by the SourceBackfillExecutor. + // For shared CDC source, the schema is different. See debezium_cdc_source_schema, CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS + if is_shared_non_cdc { + let (columns_exist, additional_columns) = source_add_partition_offset_cols( + &columns, + &with_properties.get_connector().unwrap(), + true, // col_id filled below at col_id_gen.generate + ); + for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { + if !existed { + columns.push(ColumnCatalog::hidden(c)); + } + } + } } // resolve privatelink connection for Kafka @@ -1655,14 +1673,14 @@ pub async fn handle_create_source( let with_properties = bind_connector_props(&handler_args, &source_schema, true)?; let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); - let is_shared = create_cdc_source_job - || (with_properties.is_shareable_non_cdc_connector() - && session - .env() - .streaming_config() - .developer - .enable_shared_source - && session.config().streaming_use_shared_source()); + let is_shared_non_cdc = with_properties.is_shareable_non_cdc_connector() + && session + .env() + .streaming_config() + .developer + .enable_shared_source + && session.config().streaming_use_shared_source(); + let is_shared = create_cdc_source_job || is_shared_non_cdc; let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? @@ -1690,6 +1708,7 @@ pub async fn handle_create_source( stmt.include_column_options, &mut col_id_gen, true, + is_shared_non_cdc, overwrite_options.source_rate_limit, ) .await?; @@ -1762,8 +1781,7 @@ pub mod tests { use std::sync::Arc; use risingwave_common::catalog::{ - CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, - ROWID_PREFIX, TABLE_NAME_COLUMN_NAME, + CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROWID_PREFIX, }; use risingwave_common::types::DataType; @@ -1917,15 +1935,29 @@ pub mod tests { .columns .iter() .map(|col| (col.name(), col.data_type().clone())) - .collect::>(); + .collect::>(); - let expected_columns = maplit::hashmap! { - ROWID_PREFIX => DataType::Serial, - "payload" => DataType::Jsonb, - OFFSET_COLUMN_NAME => DataType::Varchar, - TABLE_NAME_COLUMN_NAME => DataType::Varchar, - }; - assert_eq!(columns, expected_columns); + expect_test::expect![[r#" + [ + ( + "payload", + Jsonb, + ), + ( + "_rw_offset", + Varchar, + ), + ( + "_rw_table_name", + Varchar, + ), + ( + "_row_id", + Serial, + ), + ] + "#]] + .assert_debug_eq(&columns); } #[tokio::test] @@ -1954,16 +1986,41 @@ pub mod tests { .unwrap(); assert_eq!(source.name, "s"); - let columns = GET_COLUMN_FROM_CATALOG(source); - let expect_columns = maplit::hashmap! { - ROWID_PREFIX => DataType::Serial, - "v1" => DataType::Int32, - "_rw_kafka_key" => DataType::Bytea, - // todo: kafka connector will automatically derive the column - // will change to a required field in the include clause - "_rw_kafka_timestamp" => DataType::Timestamptz, - }; - assert_eq!(columns, expect_columns); + let columns = source + .columns + .iter() + .map(|col| (col.name(), col.data_type().clone())) + .collect::>(); + + expect_test::expect![[r#" + [ + ( + "v1", + Int32, + ), + ( + "_rw_kafka_key", + Bytea, + ), + ( + "_rw_kafka_timestamp", + Timestamptz, + ), + ( + "_rw_kafka_partition", + Varchar, + ), + ( + "_rw_kafka_offset", + Varchar, + ), + ( + "_row_id", + Serial, + ), + ] + "#]] + .assert_debug_eq(&columns); let sql = "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json" diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 000d0c7b6051..73fb65dbf3a4 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -509,6 +509,7 @@ pub(crate) async fn gen_create_table_plan_with_source( include_column_options, &mut col_id_gen, false, + false, rate_limit, ) .await?; diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index d8ec8b44d827..a4f4c0d52b7c 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -684,7 +684,6 @@ pub fn handle_show_create_object( #[cfg(test)] mod tests { - use std::collections::HashMap; use std::ops::Index; use futures_async_stream::for_await; @@ -720,36 +719,78 @@ mod tests { let sql = "show columns from t"; let mut pg_response = frontend.run_sql(sql).await.unwrap(); - let mut columns = HashMap::new(); + let mut columns = Vec::new(); #[for_await] for row_set in pg_response.values_stream() { let row_set = row_set.unwrap(); for row in row_set { - columns.insert( + columns.push(( std::str::from_utf8(row.index(0).as_ref().unwrap()) .unwrap() .to_string(), std::str::from_utf8(row.index(1).as_ref().unwrap()) .unwrap() .to_string(), - ); + )); } } - let expected_columns: HashMap = maplit::hashmap! { - "id".into() => "integer".into(), - "country.zipcode".into() => "character varying".into(), - "zipcode".into() => "bigint".into(), - "country.city.address".into() => "character varying".into(), - "country.address".into() => "character varying".into(), - "country.city".into() => "test.City".into(), - "country.city.zipcode".into() => "character varying".into(), - "rate".into() => "real".into(), - "country".into() => "test.Country".into(), - "_rw_kafka_timestamp".into() => "timestamp with time zone".into(), - "_row_id".into() => "serial".into(), - }; - - assert_eq!(columns, expected_columns); + expect_test::expect![[r#" + [ + ( + "id", + "integer", + ), + ( + "country", + "test.Country", + ), + ( + "country.address", + "character varying", + ), + ( + "country.city", + "test.City", + ), + ( + "country.city.address", + "character varying", + ), + ( + "country.city.zipcode", + "character varying", + ), + ( + "country.zipcode", + "character varying", + ), + ( + "zipcode", + "bigint", + ), + ( + "rate", + "real", + ), + ( + "_rw_kafka_timestamp", + "timestamp with time zone", + ), + ( + "_rw_kafka_partition", + "character varying", + ), + ( + "_rw_kafka_offset", + "character varying", + ), + ( + "_row_id", + "serial", + ), + ] + "#]] + .assert_debug_eq(&columns); } } diff --git a/src/frontend/src/optimizer/optimizer_context.rs b/src/frontend/src/optimizer/optimizer_context.rs index 35c5a3ee9129..b1b35f43b66f 100644 --- a/src/frontend/src/optimizer/optimizer_context.rs +++ b/src/frontend/src/optimizer/optimizer_context.rs @@ -187,7 +187,7 @@ impl OptimizerContext { } let mut optimizer_trace = self.optimizer_trace.borrow_mut(); let string = str.into(); - tracing::trace!(target: "explain_trace", "{}", string); + tracing::trace!(target: "explain_trace", "\n{}", string); optimizer_trace.push(string); optimizer_trace.push("\n".to_string()); } diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index d7808e4be51c..909fa1e0d300 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -17,9 +17,6 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::ColumnCatalog; -use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{PbStreamSource, SourceNode}; @@ -40,24 +37,7 @@ pub struct StreamSource { } impl StreamSource { - pub fn new(mut core: generic::Source) -> Self { - // For shared sources, we will include partition and offset cols in the *output*, to be used by the SourceBackfillExecutor. - // XXX: If we don't add here, these cols are also added in source reader, but pruned in the SourceExecutor's output. - // Should we simply add them here for all sources for consistency? - if let Some(source_catalog) = &core.catalog - && source_catalog.info.is_shared() - { - let (columns_exist, additional_columns) = source_add_partition_offset_cols( - &core.column_catalog, - &source_catalog.connector_name(), - ); - for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { - if !existed { - core.column_catalog.push(ColumnCatalog::hidden(c)); - } - } - } - + pub fn new(core: generic::Source) -> Self { let base = PlanBase::new_stream_with_core( &core, Distribution::SomeShard, diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 83c79259952b..ade1a4b6f2fe 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -17,11 +17,9 @@ use std::rc::Rc; use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{ColumnCatalog, Field}; +use risingwave_common::catalog::Field; use risingwave_common::types::DataType; -use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::PbStreamNode; @@ -52,23 +50,7 @@ pub struct StreamSourceScan { impl_plan_tree_node_for_leaf! { StreamSourceScan } impl StreamSourceScan { - pub fn new(mut core: generic::Source) -> Self { - // XXX: do we need to include partition and offset cols here? It's needed by Backfill's input, but maybe not output? - // But the source's "schema" contains the hidden columns. - if let Some(source_catalog) = &core.catalog - && source_catalog.info.is_shared() - { - let (columns_exist, additional_columns) = source_add_partition_offset_cols( - &core.column_catalog, - &source_catalog.connector_name(), - ); - for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { - if !existed { - core.column_catalog.push(ColumnCatalog::hidden(c)); - } - } - } - + pub fn new(core: generic::Source) -> Self { let base = PlanBase::new_stream_with_core( &core, Distribution::SomeShard,