diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 76e0900c32fef..5127731256c6b 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -32,6 +32,7 @@ mkdir ./connector-node tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- e2e, inline test" +RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-inline-source-test risedev slt './e2e_test/source_inline/**/*.slt' echo "--- Kill cluster" @@ -55,7 +56,7 @@ createdb psql < ./e2e_test/source/cdc/postgres_cdc.sql echo "--- starting risingwave cluster" -RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ risedev ci-start ci-1cn-1fe-with-recovery echo "--- mongodb cdc test" diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index 77c8b6b5448ca..25189eff09e6e 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -15,7 +15,7 @@ select cnt from shipments_cnt; 4 query ITTTT -select * from person_new order by id; +SELECT id,name,email_address,credit_card,city from person_new order by id; ---- 1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne 1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise @@ -68,7 +68,7 @@ SELECT * from orders_test_cnt 5 query ITT -SELECT * FROM rw.products_test order by id limit 3 +SELECT id,name,description FROM rw.products_test order by id limit 3 ---- 101 RW Small 2-wheel scooter 102 RW 12V car battery diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 480c707fb6f42..3563ee6ed2a4b 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -41,7 +41,7 @@ create table rw.products_test ( id INT, name STRING, description STRING, PRIMARY KEY (id) -) from mysql_mytest table 'mytest.products'; +) include timestamp as commit_ts from mysql_mytest table 'mytest.products'; system ok mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food'); @@ -145,12 +145,25 @@ SELECT * from orders_test_cnt 4 query ITT -SELECT * FROM rw.products_test order by id limit 3 +SELECT id,name,description FROM rw.products_test order by id limit 3 ---- 101 scooter Small 2-wheel scooter 102 car battery 12V car battery 103 12-pack drill bits 12-pack of drill bits with sizes ranging from #40 to #3 +# commit_ts of historical records should be '1970-01-01 00:00:00+00:00' +query I +SELECT count(*) as cnt from rw.products_test where commit_ts = '1970-01-01 00:00:00+00:00' +---- +9 + +# commit_ts of new records should greater than '1970-01-01 00:00:00+00:00' +query TTT +SELECT name,description FROM rw.products_test where commit_ts > '1970-01-01 00:00:00+00:00' order by id +---- +Milk Milk is a white liquid food +Juice 100ml Juice + query ITTT SELECT order_id,order_date,customer_name,product_id FROM orders_test order by order_id limit 3 ---- @@ -230,7 +243,7 @@ CREATE TABLE person_new ( credit_card varchar, city varchar, PRIMARY KEY (id) -) FROM pg_source TABLE 'public.person'; +) INCLUDE TIMESTAMP AS commit_ts FROM pg_source TABLE 'public.person'; statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; @@ -260,7 +273,7 @@ SELECT * from person_new_cnt 6 query ITTTT -SELECT * from person_new order by id; +SELECT id,name,email_address,credit_card,city from person_new order by id; ---- 1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne 1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise @@ -269,6 +282,22 @@ SELECT * from person_new order by id; 1101 white myc@xpmpe.com 8157 6974 se 1102 spencer wip@dkaap.com 9481 6270 angeles +# historical data +query ITTTT +SELECT id,name,email_address,credit_card,city from person_new where commit_ts = '1970-01-01 00:00:00+00:00' order by id; +---- +1000 vicky noris yplkvgz@qbxfg.com 7878 5821 1864 2539 cheyenne +1001 peter white myckhsp@xpmpe.com 1781 2313 8157 6974 boise +1002 sarah spencer wipvdbm@dkaap.com 3453 4987 9481 6270 los angeles + +# incremental data +query ITTTT +SELECT id,name,email_address,credit_card,city from person_new where commit_ts > '1970-01-01 00:00:00+00:00' order by id; +---- +1100 noris ypl@qbxfg.com 1864 2539 enne +1101 white myc@xpmpe.com 8157 6974 se +1102 spencer wip@dkaap.com 9481 6270 angeles + statement ok CREATE TABLE numeric_to_rw_int256_shared ( id int, diff --git a/e2e_test/source/cdc/mongodb/mongodb_basic.slt b/e2e_test/source/cdc/mongodb/mongodb_basic.slt index f3a815df0572b..9eaad3cca41a3 100644 --- a/e2e_test/source/cdc/mongodb/mongodb_basic.slt +++ b/e2e_test/source/cdc/mongodb/mongodb_basic.slt @@ -2,7 +2,7 @@ control substitution on statement ok -CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH ( +CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) INCLUDE TIMESTAMP as commit_ts WITH ( connector = 'mongodb-cdc', mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0', collection.name = 'random_data.*' @@ -24,5 +24,11 @@ select count(*) from normalized_users; ---- 55 +# historical data +query I +select count(*) from users where commit_ts = '1970-01-01 00:00:00+00:00'; +---- +55 + statement ok DROP TABLE users cascade diff --git a/e2e_test/source/cdc_inline/postgres_create_drop.slt b/e2e_test/source/cdc_inline/postgres_create_drop.slt index 334f1eb2c9cce..ad7425bab3df9 100644 --- a/e2e_test/source/cdc_inline/postgres_create_drop.slt +++ b/e2e_test/source/cdc_inline/postgres_create_drop.slt @@ -23,6 +23,13 @@ create table tt1 (v1 int, slot.name = 'tt1_slot', ); +sleep 3s + +query IT +SELECT * FROM tt1; +---- +1 2023-10-23 10:00:00+00:00 + statement ok drop table tt1; diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java index ded49e003d19a..c7e9f7035f1e7 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/TableSchema.java @@ -20,12 +20,18 @@ import com.risingwave.proto.Data.DataType.TypeName; import com.risingwave.proto.PlanCommon; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TableSchema { + + static final Logger LOG = LoggerFactory.getLogger(TableSchema.class); + private final List columnNames; private final Map columns; private final Map columnIndices; @@ -80,16 +86,20 @@ public Object getFromRow(String columnName, SinkRow row) { } public static TableSchema fromProto(ConnectorServiceProto.TableSchema tableSchema) { - return new TableSchema( - tableSchema.getColumnsList().stream() - .map(PlanCommon.ColumnDesc::getName) - .collect(Collectors.toList()), - tableSchema.getColumnsList().stream() - .map(PlanCommon.ColumnDesc::getColumnType) - .collect(Collectors.toList()), - tableSchema.getPkIndicesList().stream() - .map(i -> tableSchema.getColumns(i).getName()) - .collect(Collectors.toList())); + // filter out additional columns + var instance = + new TableSchema( + tableSchema.getColumnsList().stream() + .map(PlanCommon.ColumnDesc::getName) + .collect(Collectors.toList()), + tableSchema.getColumnsList().stream() + .map(PlanCommon.ColumnDesc::getColumnType) + .collect(Collectors.toList()), + tableSchema.getPkIndicesList().stream() + .map(i -> tableSchema.getColumns(i).getName()) + .collect(Collectors.toList())); + LOG.info("table column names: {}", Arrays.toString(instance.getColumnNames())); + return instance; } public List getPrimaryKeys() { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index bd6720656811c..83c6d59fac921 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -153,8 +153,9 @@ private static boolean isStreamingRunning(String connector, String server, Strin mbeanServer.getAttribute( getStreamingMetricsObjectName(connector, server, contextName), "Connected"); - } catch (JMException ex) { - LOG.warn("Failed to get streaming metrics", ex); + } catch (JMException _ex) { + // ignore the exception, as it is expected when the streaming source + // (aka. binlog client) is not ready } return false; } diff --git a/src/common/src/catalog/external_table.rs b/src/common/src/catalog/external_table.rs index 3de38a29a499c..f9c6448358066 100644 --- a/src/common/src/catalog/external_table.rs +++ b/src/common/src/catalog/external_table.rs @@ -40,8 +40,6 @@ pub struct CdcTableDesc { /// Column indices for primary keys. pub stream_key: Vec, - pub value_indices: Vec, - /// properties will be passed into the `StreamScanNode` pub connect_properties: BTreeMap, } diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 76b2047cd4925..628c2ca22cf4b 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -183,7 +183,6 @@ async fn test_cdc_backfill() -> StreamResult<()> { table_schema.clone(), table_pk_order_types, table_pk_indices.clone(), - vec![0, 1], ); let actor_id = 0x1a; @@ -214,6 +213,11 @@ async fn test_cdc_backfill() -> StreamResult<()> { ) .await; + let output_columns = vec![ + ColumnDesc::named("id", ColumnId::new(1), DataType::Int64), // primary key + ColumnDesc::named("price", ColumnId::new(2), DataType::Float64), + ]; + let cdc_backfill = StreamExecutor::new( ExecutorInfo { schema: table_schema.clone(), @@ -225,6 +229,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { external_table, mock_offset_executor, vec![0, 1], + output_columns, None, Arc::new(StreamingMetrics::unused()), state_table, diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 59c47b06ee8b8..012b7214c7e70 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -28,6 +28,7 @@ use risingwave_pb::plan_common::{ }; use crate::error::ConnectorResult; +use crate::source::cdc::MONGODB_CDC_CONNECTOR; use crate::source::{ GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, @@ -55,9 +56,29 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = + LazyLock::new(|| Some(HashSet::from(["timestamp"]))); + +pub fn get_supported_additional_columns( + connector_name: &str, + is_cdc_backfill: bool, +) -> Option<&HashSet<&'static str>> { + if is_cdc_backfill { + CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref() + } else { + COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name) + } +} + pub fn gen_default_addition_col_name( connector_name: &str, additional_col_type: &str, @@ -87,9 +108,10 @@ pub fn build_additional_column_catalog( inner_field_name: Option<&str>, data_type: Option<&str>, reject_unknown_connector: bool, + is_cdc_backfill_table: bool, ) -> ConnectorResult { let compatible_columns = match ( - COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name), + get_supported_additional_columns(connector_name, is_cdc_backfill_table), reject_unknown_connector, ) { (Some(compat_cols), _) => compat_cols, @@ -190,7 +212,7 @@ pub fn build_additional_column_catalog( /// ## Returns /// - `columns_exist`: whether 1. `partition`/`file` and 2. `offset` columns are included in `columns`. /// - `additional_columns`: The `ColumnCatalog` for `partition`/`file` and `offset` columns. -pub fn add_partition_offset_cols( +pub fn source_add_partition_offset_cols( columns: &[ColumnCatalog], connector_name: &str, ) -> ([bool; 2], [ColumnCatalog; 2]) { @@ -219,6 +241,7 @@ pub fn add_partition_offset_cols( None, None, false, + false, ) .unwrap(), ) diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 52d1e4e4a15a2..817c2a788f2be 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -198,6 +198,11 @@ mod tests { use std::sync::Arc; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + use risingwave_common::row::Row; + use risingwave_common::types::Timestamptz; + use risingwave_pb::plan_common::{ + additional_column, AdditionalColumn, AdditionalColumnTimestamp, + }; use super::*; use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl}; @@ -266,4 +271,67 @@ mod tests { _ => panic!("unexpected parse result: {:?}", res), } } + + #[tokio::test] + async fn test_parse_additional_columns() { + let columns = vec![ + ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64), + ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64), + ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar), + ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal), + ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date), + ColumnDesc::named_with_additional_column( + "commit_ts", + ColumnId::new(6), + DataType::Timestamptz, + AdditionalColumn { + column_type: Some(additional_column::ColumnType::Timestamp( + AdditionalColumnTimestamp {}, + )), + }, + ), + ]; + + let columns = columns + .iter() + .map(SourceColumnDesc::from) + .collect::>(); + + let props = SpecificParserConfig { + key_encoding_config: None, + encoding_config: EncodingProperties::Json(JsonProperties { + use_schema_registry: false, + timestamptz_handling: None, + }), + protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), + }; + let source_ctx = SourceContext { + connector_props: ConnectorProperties::PostgresCdc(Box::default()), + ..SourceContext::dummy() + }; + let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) + .await + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + + let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#; + + let res = parser + .parse_one_with_txn( + None, + Some(payload.as_bytes().to_vec()), + builder.row_writer(), + ) + .await; + match res { + Ok(ParseResult::Rows) => { + let chunk = builder.finish(); + for (_, row) in chunk.rows() { + let commit_ts = row.datum_at(5).unwrap().into_timestamptz(); + assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap()); + } + } + _ => panic!("unexpected parse result: {:?}", res), + } + } } diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 2de46a9aa5460..67d990f54a664 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -23,7 +23,7 @@ use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder; use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ - AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, JsonProperties, ParserFormat, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, ParserFormat, SourceStreamChunkRowWriter, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -40,7 +40,7 @@ pub struct DebeziumMongoJsonParser { fn build_accessor_builder(config: EncodingProperties) -> anyhow::Result { match config { - EncodingProperties::MongoJson(_) => Ok(AccessBuilderImpl::DebeziumMongoJson( + EncodingProperties::MongoJson => Ok(AccessBuilderImpl::DebeziumMongoJson( DebeziumMongoJsonAccessBuilder::new()?, )), _ => bail!("unsupported encoding for DEBEZIUM_MONGO format"), @@ -72,15 +72,18 @@ impl DebeziumMongoJsonParser { .clone(); // _rw_{connector}_file/partition & _rw_{connector}_offset are created automatically. - if rw_columns.iter().filter(|desc| desc.is_visible()).count() != 2 { - bail!("Debezium Mongo needs no more columns except `_id` and `payload` in table"); + if rw_columns + .iter() + .filter(|desc| desc.is_visible() && desc.additional_column.column_type.is_none()) + .count() + != 2 + { + bail!("Debezium Mongo needs no more data columns except `_id` and `payload` in table"); } // encodings are fixed to MongoJson - let key_builder = - build_accessor_builder(EncodingProperties::MongoJson(JsonProperties::default()))?; - let payload_builder = - build_accessor_builder(EncodingProperties::MongoJson(JsonProperties::default()))?; + let key_builder = build_accessor_builder(EncodingProperties::MongoJson)?; + let payload_builder = build_accessor_builder(EncodingProperties::MongoJson)?; Ok(Self { rw_columns, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 87b6b5c4a782c..eacae886cb076 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -345,6 +345,35 @@ impl SourceStreamChunkRowWriter<'_> { &mut self, mut f: impl FnMut(&SourceColumnDesc) -> AccessResult, ) -> AccessResult<()> { + let mut parse_field = |desc: &SourceColumnDesc| { + match f(desc) { + Ok(output) => Ok(output), + + // Throw error for failed access to primary key columns. + Err(e) if desc.is_pk => Err(e), + // Ignore error for other columns and fill in `NULL` instead. + Err(error) => { + // TODO: figure out a way to fill in not-null default value if user specifies one + // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message) + // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, + // see #13105 + static LOG_SUPPERSSER: LazyLock = + LazyLock::new(LogSuppresser::default); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::warn!( + error = %error.as_report(), + split_id = self.row_meta.as_ref().map(|m| m.split_id), + offset = self.row_meta.as_ref().map(|m| m.offset), + column = desc.name, + suppressed_count, + "failed to parse non-pk column, padding with `NULL`" + ); + } + Ok(A::output_for(Datum::None)) + } + } + }; + let mut wrapped_f = |desc: &SourceColumnDesc| { match (&desc.column_type, &desc.additional_column.column_type) { (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => { @@ -370,14 +399,12 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap(), // handled all match cases in internal match, unwrap is safe )); } - (_, &Some(AdditionalColumnType::Timestamp(_))) => { - return Ok(A::output_for( - self.row_meta - .as_ref() - .and_then(|ele| extreact_timestamp_from_meta(ele.meta)) - .unwrap_or(None), - )) - } + (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta { + Some(row_meta) => Ok(A::output_for( + extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None), + )), + None => parse_field(desc), // parse from payload + }, (_, &Some(AdditionalColumnType::Partition(_))) => { // the meta info does not involve spec connector return Ok(A::output_for( @@ -426,32 +453,7 @@ impl SourceStreamChunkRowWriter<'_> { } (_, _) => { // For normal columns, call the user provided closure. - match f(desc) { - Ok(output) => Ok(output), - - // Throw error for failed access to primary key columns. - Err(e) if desc.is_pk => Err(e), - // Ignore error for other columns and fill in `NULL` instead. - Err(error) => { - // TODO: figure out a way to fill in not-null default value if user specifies one - // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message) - // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, - // see #13105 - static LOG_SUPPERSSER: LazyLock = - LazyLock::new(LogSuppresser::default); - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::warn!( - error = %error.as_report(), - split_id = self.row_meta.as_ref().map(|m| m.split_id), - offset = self.row_meta.as_ref().map(|m| m.offset), - column = desc.name, - suppressed_count, - "failed to parse non-pk column, padding with `NULL`" - ); - } - Ok(A::output_for(Datum::None)) - } - } + parse_field(desc) } } }; @@ -1021,7 +1023,7 @@ pub enum EncodingProperties { Protobuf(ProtobufProperties), Csv(CsvProperties), Json(JsonProperties), - MongoJson(JsonProperties), + MongoJson, Bytes(BytesProperties), Native, /// Encoding can't be specified because the source will determines it. Now only used in Iceberg. diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 589fdfc8cfc45..966c5f167474c 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -12,12 +12,46 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{DataType, Datum, ScalarImpl}; +use risingwave_common::types::{DataType, Datum, Scalar, ScalarImpl, Timestamptz}; +use risingwave_pb::plan_common::additional_column::ColumnType; use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; use crate::parser::TransactionControl; use crate::source::{ConnectorProperties, SourceColumnDesc}; +// Example of Debezium JSON value: +// { +// "payload": +// { +// "before": null, +// "after": +// { +// "O_ORDERKEY": 5, +// "O_CUSTKEY": 44485, +// "O_ORDERSTATUS": "F", +// "O_TOTALPRICE": "144659.20", +// "O_ORDERDATE": "1994-07-30" +// }, +// "source": +// { +// "version": "1.9.7.Final", +// "connector": "mysql", +// "name": "RW_CDC_1002", +// "ts_ms": 1695277757000, +// "db": "mydb", +// "sequence": null, +// "table": "orders", +// "server_id": 0, +// "gtid": null, +// "file": "binlog.000008", +// "pos": 3693, +// "row": 0, +// }, +// "op": "r", +// "ts_ms": 1695277757017, +// "transaction": null +// } +// } pub struct DebeziumChangeEvent { value_accessor: Option, key_accessor: Option, @@ -26,6 +60,8 @@ pub struct DebeziumChangeEvent { const BEFORE: &str = "before"; const AFTER: &str = "after"; +const SOURCE: &str = "source"; +const SOURCE_TS_MS: &str = "ts_ms"; const OP: &str = "op"; pub const TRANSACTION_STATUS: &str = "status"; pub const TRANSACTION_ID: &str = "id"; @@ -150,11 +186,37 @@ where } // value should not be None. - ChangeEventOperation::Upsert => self - .value_accessor - .as_ref() - .unwrap() - .access(&[AFTER, &desc.name], Some(&desc.data_type)), + ChangeEventOperation::Upsert => { + // For upsert operation, if desc is an additional column, access field in the `SOURCE` field. + desc.additional_column.column_type.as_ref().map_or_else( + || { + self.value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[AFTER, &desc.name], Some(&desc.data_type)) + }, + |additional_column_type| { + match additional_column_type { + &ColumnType::Timestamp(_) => { + // access payload.source.ts_ms + let ts_ms = self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_TS_MS], Some(&DataType::Int64))?; + Ok(ts_ms.map(|scalar| { + Timestamptz::from_millis(scalar.into_int64()) + .expect("source.ts_ms must in millisecond") + .to_scalar_value() + })) + } + _ => Err(AccessError::UnsupportedAdditionalColumn { + name: desc.name.clone(), + }), + } + }, + ) + } } } diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index 31667cd42e47d..f54d48e4e0b98 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -104,6 +104,9 @@ pub enum AccessError { #[error("Unsupported data type `{ty}`")] UnsupportedType { ty: String }, + #[error("Unsupported additional column `{name}`")] + UnsupportedAdditionalColumn { name: String }, + /// Errors that are not categorized into variants above. #[error("{message}")] Uncategorized { message: String }, diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 27eb7a8144bdd..3b416ef1309e9 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -130,6 +130,7 @@ pub(super) async fn bytes_from_url( pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { match meta { SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), + SourceMeta::DebeziumCdc(debezium_meta) => debezium_meta.extract_timestamp(), _ => None, } } diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 98018b6b6a113..bfe26c57a8341 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -27,7 +27,8 @@ use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, Valida use crate::error::ConnectorResult; use crate::source::cdc::{ - CdcProperties, CdcSourceTypeTrait, Citus, DebeziumCdcSplit, Mongodb, Mysql, Postgres, + table_schema_exclude_additional_columns, CdcProperties, CdcSourceTypeTrait, Citus, + DebeziumCdcSplit, Mongodb, Mysql, Postgres, }; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; @@ -77,7 +78,7 @@ where source_id: source_id as u64, source_type: props.get_source_type_pb() as _, properties: props.properties, - table_schema: Some(props.table_schema), + table_schema: Some(table_schema_exclude_additional_columns(&props.table_schema)), is_source_job: props.is_cdc_source_job, is_backfill_table: props.is_backfill_table, }; diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 4a1222a343e54..0ff35161e9ea9 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -99,6 +99,22 @@ pub struct CdcProperties { pub _phantom: PhantomData, } +pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema { + TableSchema { + columns: table_schema + .columns + .iter() + .filter(|col| { + col.additional_column + .as_ref() + .is_some_and(|val| val.column_type.is_none()) + }) + .cloned() + .collect(), + pk_indices: table_schema.pk_indices.clone(), + } +} + impl TryFromHashmap for CdcProperties { fn try_from_hashmap( properties: HashMap, diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index 5df937a83fbe8..f45a4e37e8ca8 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::types::{Datum, Scalar, Timestamptz}; use risingwave_pb::connector_service::CdcMessage; use crate::source::base::SourceMessage; @@ -26,6 +27,17 @@ pub struct DebeziumCdcMeta { pub is_transaction_meta: bool, } +impl DebeziumCdcMeta { + pub fn extract_timestamp(&self) -> Option { + Some( + Timestamptz::from_millis(self.source_ts_ms) + .unwrap() + .to_scalar_value(), + ) + .into() + } +} + impl From for SourceMessage { fn from(message: CdcMessage) -> Self { SourceMessage { diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index cbd63a2a4906a..a652c4054db60 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -26,7 +26,7 @@ use risingwave_pb::plan_common::PbColumnCatalog; use super::fs_reader::FsSourceReader; use super::reader::SourceReader; use crate::error::ConnectorResult; -use crate::parser::additional_columns::add_partition_offset_cols; +use crate::parser::additional_columns::source_add_partition_offset_cols; use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig}; use crate::source::monitor::SourceMetrics; use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY}; @@ -98,7 +98,7 @@ impl SourceDescBuilder { .map(|c| ColumnCatalog::from(c.clone())) .collect_vec(); let (columns_exist, additional_columns) = - add_partition_offset_cols(&columns, &connector_name); + source_add_partition_offset_cols(&columns, &connector_name); let mut columns: Vec<_> = self .columns diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 49f3126f53b96..ffacf05faf90d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -29,7 +29,7 @@ use risingwave_common::catalog::{ }; use risingwave_common::types::DataType; use risingwave_connector::parser::additional_columns::{ - build_additional_column_catalog, COMPATIBLE_ADDITIONAL_COLUMNS, + build_additional_column_catalog, get_supported_additional_columns, }; use risingwave_connector::parser::{ schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig, @@ -551,12 +551,11 @@ pub fn handle_addition_columns( with_properties: &HashMap, mut additional_columns: IncludeOption, columns: &mut Vec, + is_cdc_backfill_table: bool, ) -> Result<()> { let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source - if COMPATIBLE_ADDITIONAL_COLUMNS - .get(connector_name.as_str()) - .is_none() + if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none() && !additional_columns.is_empty() { return Err(RwError::from(ProtocolError(format!( @@ -595,6 +594,7 @@ pub fn handle_addition_columns( item.inner_field.as_deref(), data_type_name.as_deref(), true, + is_cdc_backfill_table, )?); } @@ -838,12 +838,6 @@ pub(crate) async fn bind_source_pk( } } (Format::DebeziumMongo, Encode::Json) => { - if !additional_column_names.is_empty() { - return Err(RwError::from(ProtocolError(format!( - "FORMAT DEBEZIUMMONGO forbids additional columns, but got {:?}", - additional_column_names - )))); - } if sql_defined_pk { sql_defined_pk_names } else { @@ -917,6 +911,7 @@ fn check_and_add_timestamp_column( None, None, true, + false, ) .unwrap(); catalog.is_hidden = true; @@ -1361,7 +1356,12 @@ pub async fn bind_create_source( )?; // add additional columns before bind pk, because `format upsert` requires the key column - handle_addition_columns(&with_properties, include_column_options, &mut columns)?; + handle_addition_columns( + &with_properties, + include_column_options, + &mut columns, + false, + )?; // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source if is_create_source { // must behind `handle_addition_columns` diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 41415d5987676..4f3b81a20e630 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -56,7 +56,7 @@ use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ bind_columns_from_source, bind_connector_props, bind_create_source, bind_source_watermark, - UPSTREAM_SOURCE_KEY, + handle_addition_columns, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; @@ -741,7 +741,24 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( mut col_id_gen: ColumnIdGenerator, on_conflict: Option, with_version_column: Option, + include_column_options: IncludeOption, ) -> Result<(PlanRef, PbTable)> { + if !constraints.iter().any(|c| { + matches!( + c, + TableConstraint::Unique { + is_primary: true, + .. + } + ) + }) { + return Err(ErrorCode::NotSupported( + "CDC table without primary key constraint is not supported".to_owned(), + "Please define a primary key".to_owned(), + ) + .into()); + } + let session = context.session_ctx().clone(); let db_name = session.database(); let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?; @@ -766,13 +783,16 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( }; let mut columns = bind_sql_columns(&column_defs)?; + let with_properties = source.with_properties.clone().into_iter().collect(); + // append additional columns to the end + handle_addition_columns(&with_properties, include_column_options, &mut columns, true)?; for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) } let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; - let (columns, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names, true)?; + let (columns, pk_column_ids, _row_id_index) = bind_pk_on_relation(columns, pk_names, true)?; let definition = context.normalized_sql().to_owned(); @@ -802,7 +822,6 @@ pub(crate) fn gen_create_table_plan_for_cdc_source( pk: table_pk, columns: columns.iter().map(|c| c.column_desc.clone()).collect(), stream_key: pk_column_indices, - value_indices: (0..columns.len()).collect_vec(), connect_properties, }; @@ -912,6 +931,7 @@ pub(super) async fn handle_create_table_plan( &handler_args.with_options, source_schema, &include_column_options, + &cdc_table_info, )?; let ((plan, source, table), job_type) = @@ -998,6 +1018,7 @@ pub(super) async fn handle_create_table_plan( col_id_gen, on_conflict, with_version_column, + include_column_options, )?; ((plan, None, table), TableJobType::SharedCdcSource) @@ -1091,7 +1112,12 @@ pub fn check_create_table_with_source( with_options: &WithOptions, source_schema: Option, include_column_options: &IncludeOption, + cdc_table_info: &Option, ) -> Result> { + // skip check for cdc table + if cdc_table_info.is_some() { + return Ok(source_schema); + } let defined_source = with_options.inner().contains_key(UPSTREAM_SOURCE_KEY); if !include_column_options.is_empty() && !defined_source { return Err(ErrorCode::InvalidInputSyntax( diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 7a5e9effbac9c..865d444f8c154 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::parser::additional_columns::add_partition_offset_cols; +use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{PbStreamSource, SourceNode}; @@ -46,8 +46,10 @@ impl StreamSource { if let Some(source_catalog) = &core.catalog && source_catalog.info.is_shared() { - let (columns_exist, additional_columns) = - add_partition_offset_cols(&core.column_catalog, &source_catalog.connector_name()); + let (columns_exist, additional_columns) = source_add_partition_offset_cols( + &core.column_catalog, + &source_catalog.connector_name(), + ); for (existed, mut c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { c.is_hidden = true; if !existed { diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 12fbbfd7a4b81..02f794fe55a9f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -21,7 +21,7 @@ use risingwave_common::catalog::Field; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::parser::additional_columns::add_partition_offset_cols; +use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::PbStreamNode; @@ -58,8 +58,10 @@ impl StreamSourceScan { if let Some(source_catalog) = &core.catalog && source_catalog.info.is_shared() { - let (columns_exist, additional_columns) = - add_partition_offset_cols(&core.column_catalog, &source_catalog.connector_name()); + let (columns_exist, additional_columns) = source_add_partition_offset_cols( + &core.column_catalog, + &source_catalog.connector_name(), + ); for (existed, mut c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { c.is_hidden = true; if !existed { diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 6ccff4874e751..584a74a0c03b6 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -20,12 +20,12 @@ use futures::stream::select_with_strategy; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::bail; -use risingwave_common::catalog::{ColumnDesc, ColumnId}; +use risingwave_common::catalog::ColumnDesc; use risingwave_common::row::RowExt; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::parser::{ - DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, - SourceStreamChunkBuilder, SpecificParserConfig, + ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, + ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; use risingwave_connector::source::cdc::external::CdcOffset; use risingwave_connector::source::{SourceColumnDesc, SourceContext}; @@ -56,9 +56,11 @@ pub struct CdcBackfillExecutor { upstream: Executor, /// The column indices need to be forwarded to the downstream from the upstream and table scan. - /// User may select a subset of columns from the upstream table. output_indices: Vec, + /// The schema of output chunk, including additional columns if any + output_columns: Vec, + /// State table of the `CdcBackfill` executor state_impl: CdcBackfillState, @@ -81,18 +83,19 @@ impl CdcBackfillExecutor { external_table: ExternalStorageTable, upstream: Executor, output_indices: Vec, + output_columns: Vec, progress: Option, metrics: Arc, state_table: StateTable, rate_limit_rps: Option, options: CdcScanOptions, ) -> Self { - let pk_in_output_indices = external_table.pk_in_output_indices().clone().unwrap(); + let pk_indices = external_table.pk_indices(); let upstream_table_id = external_table.table_id().table_id; let state_impl = CdcBackfillState::new( upstream_table_id, state_table, - pk_in_output_indices.len() + METADATA_STATE_LEN, + pk_indices.len() + METADATA_STATE_LEN, ); Self { @@ -100,6 +103,7 @@ impl CdcBackfillExecutor { external_table, upstream, output_indices, + output_columns, state_impl, progress, metrics, @@ -134,15 +138,21 @@ impl CdcBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { - // The primary key columns, in the output columns of the upstream_table scan. - let pk_in_output_indices = self.external_table.pk_in_output_indices().unwrap(); + // The indices to primary key columns + let pk_indices = self.external_table.pk_indices().to_vec(); let pk_order = self.external_table.pk_order_types().to_vec(); let upstream_table_id = self.external_table.table_id().table_id; let upstream_table_name = self.external_table.qualified_table_name(); - let upstream_table_schema = self.external_table.schema().clone(); let upstream_table_reader = UpstreamTableReader::new(self.external_table); + let additional_columns = self + .output_columns + .iter() + .filter(|col| col.additional_column.column_type.is_some()) + .cloned() + .collect_vec(); + let mut upstream = self.upstream.execute(); // Current position of the upstream_table storage primary key. @@ -158,7 +168,7 @@ impl CdcBackfillExecutor { // if not, we should bypass the backfill directly. let mut state_impl = self.state_impl; - let mut upstream = transform_upstream(upstream, &upstream_table_schema) + let mut upstream = transform_upstream(upstream, &self.output_columns) .boxed() .peekable(); @@ -259,7 +269,8 @@ impl CdcBackfillExecutor { let read_args = SnapshotReadArgs::new( current_pk_pos.clone(), self.rate_limit_rps, - pk_in_output_indices.clone(), + pk_indices.clone(), + additional_columns.clone(), ); let right_snapshot = pin!(upstream_table_reader @@ -423,8 +434,7 @@ impl CdcBackfillExecutor { // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. - current_pk_pos = - Some(get_new_pos(&chunk, &pk_in_output_indices)); + current_pk_pos = Some(get_new_pos(&chunk, &pk_indices)); tracing::trace!( "got a snapshot chunk: len {}, current_pk_pos {:?}", @@ -488,7 +498,7 @@ impl CdcBackfillExecutor { } Some(chunk) => { // Raise the current pk position. - current_pk_pos = Some(get_new_pos(&chunk, &pk_in_output_indices)); + current_pk_pos = Some(get_new_pos(&chunk, &pk_indices)); let row_count = chunk.cardinality() as u64; cur_barrier_snapshot_processed_rows += row_count; @@ -522,7 +532,7 @@ impl CdcBackfillExecutor { &offset_parse_func, chunk, current_pos, - &pk_in_output_indices, + &pk_indices, &pk_order, last_binlog_offset.clone(), )?, @@ -636,7 +646,7 @@ impl CdcBackfillExecutor { } #[try_stream(ok = Message, error = StreamExecutorError)] -pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { +pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) { let props = SpecificParserConfig { key_encoding_config: None, encoding_config: EncodingProperties::Json(JsonProperties { @@ -646,9 +656,16 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { // the cdc message is generated internally so the key must exist. protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()), }; + + // convert to source column desc to feed into parser + let columns_with_meta = output_columns + .iter() + .map(SourceColumnDesc::from) + .collect_vec(); + let mut parser = DebeziumParser::new( props, - get_rw_columns(schema), + columns_with_meta.clone(), Arc::new(SourceContext::dummy()), ) .await @@ -659,7 +676,7 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { for msg in upstream { let mut msg = msg?; if let Message::Chunk(chunk) = &mut msg { - let parsed_chunk = parse_debezium_chunk(&mut parser, chunk, schema).await?; + let parsed_chunk = parse_debezium_chunk(&mut parser, chunk).await?; let _ = std::mem::replace(chunk, parsed_chunk); } yield msg; @@ -669,14 +686,13 @@ pub async fn transform_upstream(upstream: BoxedMessageStream, schema: &Schema) { async fn parse_debezium_chunk( parser: &mut DebeziumParser, chunk: &StreamChunk, - schema: &Schema, ) -> StreamExecutorResult { // here we transform the input chunk in (payload varchar, _rw_offset varchar, _rw_table_name varchar) schema // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the // table job with `_rw_offset` in the end // see `gen_create_table_plan_for_cdc_source` for details - let column_descs = get_rw_columns(schema); - let mut builder = SourceStreamChunkBuilder::with_capacity(column_descs, chunk.capacity()); + let mut builder = + SourceStreamChunkBuilder::with_capacity(parser.columns().to_vec(), chunk.capacity()); // The schema of input chunk (payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id) // We should use the debezium parser to parse the first column, @@ -715,10 +731,10 @@ async fn parse_debezium_chunk( new_rows.push(combined); } - let data_types = schema - .fields + let data_types = parser + .columns() .iter() - .map(|field| field.data_type.clone()) + .map(|col| col.data_type.clone()) .chain(std::iter::once(DataType::Varchar)) // _rw_offset column .collect_vec(); @@ -728,21 +744,6 @@ async fn parse_debezium_chunk( )) } -fn get_rw_columns(schema: &Schema) -> Vec { - schema - .fields - .iter() - .map(|field| { - let column_desc = ColumnDesc::named( - field.name.clone(), - ColumnId::placeholder(), - field.data_type.clone(), - ); - SourceColumnDesc::from(&column_desc) - }) - .collect_vec() -} - impl Execute for CdcBackfillExecutor { fn execute(self: Box) -> BoxedMessageStream { self.execute_inner().boxed() @@ -755,7 +756,7 @@ mod tests { use futures::{pin_mut, StreamExt}; use risingwave_common::array::{DataChunk, Op, StreamChunk}; - use risingwave_common::catalog::{Field, Schema}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; use risingwave_common::types::{DataType, Datum, JsonbVal}; use risingwave_common::util::iter_util::ZipEqFast; @@ -798,16 +799,17 @@ mod tests { tx.push_chunk(chunk); let upstream = Box::new(source).execute(); - // schema of the CDC table - let rw_schema = Schema::new(vec![ - Field::with_name(DataType::Int64, "O_ORDERKEY"), // orderkey - Field::with_name(DataType::Int64, "O_CUSTKEY"), // custkey - Field::with_name(DataType::Varchar, "O_ORDERSTATUS"), // orderstatus - Field::with_name(DataType::Decimal, "O_TOTALPRICE"), // totalprice - Field::with_name(DataType::Date, "O_ORDERDATE"), // orderdate - ]); + // schema to the debezium parser + let columns = vec![ + ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64), + ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64), + ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar), + ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal), + ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date), + ColumnDesc::named("commit_ts", ColumnId::new(6), DataType::Timestamptz), + ]; - let parsed_stream = transform_upstream(upstream, &rw_schema); + let parsed_stream = transform_upstream(upstream, &columns); pin_mut!(parsed_stream); // the output chunk must contain the offset column if let Some(message) = parsed_stream.next().await { diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs index a09d8e2954d90..e486f8f9bfc4e 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs @@ -38,8 +38,6 @@ pub struct ExternalStorageTable { /// Indices of primary key. /// Note that the index is based on the all columns of the table. pk_indices: Vec, - - output_indices: Vec, } impl ExternalStorageTable { @@ -53,7 +51,6 @@ impl ExternalStorageTable { schema: Schema, pk_order_types: Vec, pk_indices: Vec, - output_indices: Vec, ) -> Self { Self { table_id, @@ -63,7 +60,6 @@ impl ExternalStorageTable { schema, pk_order_types, pk_indices, - output_indices, } } @@ -83,16 +79,6 @@ impl ExternalStorageTable { &self.pk_indices } - /// Get the indices of the primary key columns in the output columns. - /// - /// Returns `None` if any of the primary key columns is not in the output columns. - pub fn pk_in_output_indices(&self) -> Option> { - self.pk_indices - .iter() - .map(|&i| self.output_indices.iter().position(|&j| i == j)) - .collect() - } - pub fn schema_table_name(&self) -> SchemaTableName { SchemaTableName { schema_name: self.schema_name.clone(), diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index d81e9e46f3bb3..548b6a5feec74 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -21,9 +21,12 @@ use governor::clock::MonotonicClock; use governor::{Quota, RateLimiter}; use itertools::Itertools; use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::ColumnDesc; use risingwave_common::row::OwnedRow; +use risingwave_common::types::{Scalar, Timestamptz}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader}; +use risingwave_pb::plan_common::additional_column::ColumnType; use super::external::ExternalStorageTable; use crate::common::rate_limit::limited_chunk_size; @@ -46,19 +49,22 @@ pub trait UpstreamTableRead { pub struct SnapshotReadArgs { pub current_pos: Option, pub rate_limit_rps: Option, - pub pk_in_output_indices: Vec, + pub pk_indices: Vec, + pub additional_columns: Vec, } impl SnapshotReadArgs { pub fn new( current_pos: Option, rate_limit_rps: Option, - pk_in_output_indices: Vec, + pk_indices: Vec, + additional_columns: Vec, ) -> Self { Self { current_pos, rate_limit_rps, - pk_in_output_indices, + pk_indices, + additional_columns, } } } @@ -80,6 +86,30 @@ impl UpstreamTableReader { } } +/// Append additional columns with value as null to the snapshot chunk +fn with_additional_columns( + snapshot_chunk: StreamChunk, + additional_columns: &[ColumnDesc], +) -> StreamChunk { + let (ops, mut columns, visibility) = snapshot_chunk.into_inner(); + for desc in additional_columns { + let mut builder = desc.data_type.create_array_builder(visibility.len()); + match desc.additional_column.column_type.as_ref().unwrap() { + // set default value for timestamp + &ColumnType::Timestamp(_) => builder.append_n( + visibility.len(), + Some(Timestamptz::default().to_scalar_value()), + ), + // set null for other additional columns + _ => { + builder.append_n_null(visibility.len()); + } + } + columns.push(builder.finish().into()); + } + StreamChunk::with_visibility(ops, columns, visibility) +} + impl UpstreamTableRead for UpstreamTableReader { #[try_stream(ok = Option, error = StreamExecutorError)] async fn snapshot_read_full_table(&self, args: SnapshotReadArgs, batch_size: u32) { @@ -139,11 +169,14 @@ impl UpstreamTableRead for UpstreamTableReader { let chunk = chunk?; let chunk_size = chunk.capacity(); read_count += chunk.cardinality(); - current_pk_pos = get_new_pos(&chunk, &read_args.pk_in_output_indices); + current_pk_pos = get_new_pos(&chunk, &read_args.pk_indices); if read_args.rate_limit_rps.is_none() || chunk_size == 0 { // no limit, or empty chunk - yield Some(chunk); + yield Some(with_additional_columns( + chunk, + &read_args.additional_columns, + )); continue; } else { // Apply rate limit, see `risingwave_stream::executor::source::apply_rate_limit` for more. @@ -160,7 +193,10 @@ impl UpstreamTableRead for UpstreamTableReader { .until_n_ready(NonZeroU32::new(chunk_size as u32).unwrap()) .await .unwrap(); - yield Some(chunk); + yield Some(with_additional_columns( + chunk, + &read_args.additional_columns, + )); } } diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 43ddffda00607..55698c44e5e7f 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -45,9 +45,9 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { let table_desc: &ExternalTableDesc = node.get_cdc_table_desc()?; - let table_schema: Schema = table_desc.columns.iter().map(Into::into).collect(); - assert_eq!(output_indices, (0..table_schema.len()).collect_vec()); - assert_eq!(table_schema.data_types(), params.info.schema.data_types()); + let output_schema: Schema = table_desc.columns.iter().map(Into::into).collect(); + assert_eq!(output_indices, (0..output_schema.len()).collect_vec()); + assert_eq!(output_schema.data_types(), params.info.schema.data_types()); let properties: HashMap = table_desc .connect_properties @@ -75,6 +75,19 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { ..Default::default() }); let table_type = CdcTableType::from_properties(&properties); + + // Filter out additional columns to construct the external table schema + let table_schema: Schema = table_desc + .columns + .iter() + .filter(|col| { + !col.additional_column + .as_ref() + .is_some_and(|a_col| a_col.column_type.is_some()) + }) + .map(Into::into) + .collect(); + let table_reader = table_type .create_table_reader( properties.clone(), @@ -92,7 +105,6 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { table_schema, table_pk_order_types, table_pk_indices, - output_indices.clone(), ); let vnodes = params.vnode_bitmap.map(Arc::new); @@ -101,11 +113,13 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { let state_table = StateTable::from_table_catalog(node.get_state_table()?, state_store, vnodes).await; + let output_columns = table_desc.columns.iter().map(Into::into).collect_vec(); let exec = CdcBackfillExecutor::new( params.actor_context.clone(), external_table, upstream, output_indices, + output_columns, None, params.executor_stats, state_table,