From b5aedaae170f8a87478c8047ca6ae43923869376 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 3 Jan 2024 17:16:18 +0800 Subject: [PATCH 1/6] WIP: eval transaction --- e2e_test/source/cdc/cdc.share_stream.slt | 15 +- e2e_test/source/cdc/postgres_cdc.sql | 33 +++++ .../connector/source/core/DbzCdcEngine.java | 10 +- .../source/core/DbzCdcEventConsumer.java | 135 ++++++++++++------ .../src/main/resources/debezium.properties | 2 +- .../tracing/TracingSlf4jAdapter.java | 14 +- src/config/test-transaction.toml | 17 +++ .../src/parser/debezium/debezium_parser.rs | 68 +++++++++ src/connector/src/parser/mod.rs | 28 +++- src/connector/src/parser/unified/debezium.rs | 5 +- src/connector/src/source/cdc/source/reader.rs | 2 +- .../plan_node/stream_cdc_table_scan.rs | 13 ++ 12 files changed, 286 insertions(+), 56 deletions(-) create mode 100644 src/config/test-transaction.toml diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index babfb685a945..a4ef61a717ea 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -193,7 +193,7 @@ CREATE TABLE person_new ( ) FROM pg_source TABLE 'person'; statement ok -CREATE TABLE person_new ( +CREATE TABLE person_tx ( id int, name varchar, email_address varchar, @@ -202,6 +202,19 @@ CREATE TABLE person_new ( PRIMARY KEY (id) ) FROM pg_source TABLE 'public.person'; +CREATE TABLE orders_tx ( + order_id INTEGER, + order_date DATE, + customer_name VARCHAR, + price DECIMAL, + product_id INTEGER, + order_status BOOLEAN, + PRIMARY KEY (order_id) +) FROM pg_source TABLE 'public.orders_tx'; + +create materialized view mv1 as select 'person' as table_name, count(*) as count from person_tx union all select 'orders' as table_name, count(*) as count from orders_tx; + + statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 43dba14950b3..3f52363d9565 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -14,6 +14,28 @@ VALUES (default,10001,'Beijing','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false); +CREATE TABLE orders_tx ( + order_id SERIAL NOT NULL PRIMARY KEY, + order_date DATE NOT NULL, + customer_name VARCHAR(255) NOT NULL, + price DECIMAL NOT NULL, + product_id INTEGER NOT NULL, + order_status BOOLEAN NOT NULL +); +ALTER SEQUENCE public.orders_tx_order_id_seq RESTART WITH 10001; + +INSERT INTO orders_tx +VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), + (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false); + +INSERT INTO orders_tx +VALUES (default, '2023-12-30 19:11:09', '张三', 15.00, 105, true), + (default, '2020-07-30 12:00:30', '李四', 25.25, 106, false); + +INSERT INTO orders_tx +VALUES (default, '2024-01-01 17:00:00', 'Sam', 1000.20, 110, false); + + CREATE TABLE person ( "id" int, "name" varchar(64), @@ -30,6 +52,17 @@ INSERT INTO person VALUES (1000, 'vicky noris', 'yplkvgz@qbxfg.com', '7878 5821 INSERT INTO person VALUES (1001, 'peter white', 'myckhsp@xpmpe.com', '1781 2313 8157 6974', 'boise'); INSERT INTO person VALUES (1002, 'sarah spencer', 'wipvdbm@dkaap.com', '3453 4987 9481 6270', 'los angeles'); +INSERT INTO person VALUES (1003, 'Kafka', 'ypl@qbxfg.com', '1864 2539', 'Shanghai'); +INSERT INTO person VALUES (1100, 'noris', 'ypl@qbxfg.com', '1864 2539', 'enne'); +INSERT INTO person VALUES (1101, 'white', 'myc@xpmpe.com', '8157 6974', 'se'); + + +SELECT COUNT(*) AS person_count +FROM person_tx +UNION ALL +SELECT COUNT(*) AS order_count +FROM orders_tx; + create schema abs; create table abs.t1 (v1 int primary key, v2 double precision, v3 varchar, v4 numeric); create publication my_publicaton for table abs.t1 (v1, v3); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java index 311d329ffeb5..61d1f6284a67 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java @@ -14,7 +14,8 @@ package com.risingwave.connector.source.core; -import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX; +import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX; +import static io.debezium.schema.AbstractTopicNamingStrategy.*; import com.risingwave.connector.api.source.CdcEngine; import com.risingwave.proto.ConnectorServiceProto; @@ -36,11 +37,14 @@ public DbzCdcEngine( long sourceId, Properties config, DebeziumEngine.CompletionCallback completionCallback) { - var dbzHeartbeatPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name()); + var heartbeatTopicPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name()); + var topicPrefix = config.getProperty(TOPIC_PREFIX.name()); + var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC); var consumer = new DbzCdcEventConsumer( sourceId, - dbzHeartbeatPrefix, + heartbeatTopicPrefix, + transactionTopic, new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY)); // Builds a debezium engine but not start it diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index ac46691780c3..1f7bd94081bf 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -34,6 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +enum EventType { + HEARTBEAT, + TRANSACTION, + DATA, +} + public class DbzCdcEventConsumer implements DebeziumEngine.ChangeConsumer> { static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class); @@ -42,14 +48,18 @@ public class DbzCdcEventConsumer private final long sourceId; private final JsonConverter converter; private final String heartbeatTopicPrefix; + private final String transactionTopic; DbzCdcEventConsumer( long sourceId, String heartbeatTopicPrefix, - BlockingQueue store) { + String transactionTopic, + BlockingQueue queue) { this.sourceId = sourceId; - this.outputChannel = store; + this.outputChannel = queue; this.heartbeatTopicPrefix = heartbeatTopicPrefix; + this.transactionTopic = transactionTopic; + LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic); // The default JSON converter will output the schema field in the JSON which is unnecessary // to source parser, we use a customized JSON converter to avoid outputting the `schema` @@ -64,6 +74,17 @@ public class DbzCdcEventConsumer this.converter = jsonConverter; } + private EventType getEventType(SourceRecord record) { + LOG.info("event topic: {}", record.topic()); + if (isHeartbeatEvent(record)) { + return EventType.HEARTBEAT; + } else if (isTransactionMetaEvent(record)) { + return EventType.TRANSACTION; + } else { + return EventType.DATA; + } + } + private boolean isHeartbeatEvent(SourceRecord record) { String topic = record.topic(); return topic != null @@ -71,18 +92,26 @@ private boolean isHeartbeatEvent(SourceRecord record) { && topic.startsWith(heartbeatTopicPrefix); } + private boolean isTransactionMetaEvent(SourceRecord record) { + String topic = record.topic(); + return topic != null && topic.equals(transactionTopic); + } + @Override public void handleBatch( List> events, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { var respBuilder = GetEventStreamResponse.newBuilder(); + LOG.info("event batch size => {}", events.size()); for (ChangeEvent event : events) { var record = event.value(); - boolean isHeartbeat = isHeartbeatEvent(record); + EventType eventType = getEventType(record); DebeziumOffset offset = new DebeziumOffset( - record.sourcePartition(), record.sourceOffset(), isHeartbeat); + record.sourcePartition(), + record.sourceOffset(), + (eventType == EventType.HEARTBEAT)); // serialize the offset to a JSON, so that kernel doesn't need to // aware its layout String offsetStr = ""; @@ -98,43 +127,69 @@ var record = event.value(); .setOffset(offsetStr) .setPartition(String.valueOf(sourceId)); - if (isHeartbeat) { - var message = msgBuilder.build(); - LOG.debug("heartbeat => {}", message.getOffset()); - respBuilder.addEvents(message); - } else { - - // Topic naming conventions - // - PG: serverName.schemaName.tableName - // - MySQL: serverName.databaseName.tableName - // We can extract the full table name from the topic - var fullTableName = record.topic().substring(record.topic().indexOf('.') + 1); - - // ignore null record - if (record.value() == null) { - committer.markProcessed(event); - continue; - } - // get upstream event time from the "source" field - var sourceStruct = ((Struct) record.value()).getStruct("source"); - long sourceTsMs = - sourceStruct == null - ? System.currentTimeMillis() - : sourceStruct.getInt64("ts_ms"); - byte[] payload = - converter.fromConnectData( - record.topic(), record.valueSchema(), record.value()); - msgBuilder - .setFullTableName(fullTableName) - .setPayload(new String(payload, StandardCharsets.UTF_8)) - .setSourceTsMs(sourceTsMs) - .build(); - var message = msgBuilder.build(); - LOG.debug("record => {}", message.getPayload()); - - respBuilder.addEvents(message); - committer.markProcessed(event); + switch (eventType) { + case HEARTBEAT: + { + var message = msgBuilder.build(); + LOG.debug("heartbeat => {}", message.getOffset()); + respBuilder.addEvents(message); + + break; + } + case TRANSACTION: + { + long trxTs = ((Struct) record.value()).getInt64("ts_ms"); + byte[] payload = + converter.fromConnectData( + record.topic(), record.valueSchema(), record.value()); + var message = + msgBuilder + .setPayload(new String(payload, StandardCharsets.UTF_8)) + .setSourceTsMs(trxTs) + .build(); + LOG.info("transaction => {}", message.getPayload()); + respBuilder.addEvents(message); + + break; + } + case DATA: + { + // Topic naming conventions + // - PG: serverName.schemaName.tableName + // - MySQL: serverName.databaseName.tableName + // We can extract the full table name from the topic + var fullTableName = + record.topic().substring(record.topic().indexOf('.') + 1); + + // ignore null record + if (record.value() == null) { + break; + } + // get upstream event time from the "source" field + var sourceStruct = ((Struct) record.value()).getStruct("source"); + long sourceTsMs = + sourceStruct == null + ? System.currentTimeMillis() + : sourceStruct.getInt64("ts_ms"); + byte[] payload = + converter.fromConnectData( + record.topic(), record.valueSchema(), record.value()); + var message = + msgBuilder + .setFullTableName(fullTableName) + .setPayload(new String(payload, StandardCharsets.UTF_8)) + .setSourceTsMs(sourceTsMs) + .build(); + LOG.debug("record => {}", message.getPayload()); + respBuilder.addEvents(message); + break; + } + default: + break; } + + // mark the event as processed + committer.markProcessed(event); } // skip empty batch diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties index 22b7f0c4689c..a6af436cedcc 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties @@ -9,7 +9,7 @@ datetime.type=com.risingwave.connector.cdc.debezium.converters.DatetimeTypeConve # parse decimal in the "precise" mode decimal.handling.mode=${debezium.decimal.handling.mode:-string} interval.handling.mode=string -max.batch.size=${debezium.max.batch.size:-1024} +max.batch.size=${debezium.max.batch.size:-5} max.queue.size=${debezium.max.queue.size:-8192} time.precision.mode=adaptive_time_microseconds # The maximum number of retries on connection errors before failing diff --git a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java index 43bea387900e..243f8742178d 100644 --- a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java +++ b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java @@ -23,6 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.slf4j.Logger; import org.slf4j.Marker; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; public class TracingSlf4jAdapter implements Logger { @@ -57,7 +59,6 @@ public void trace(String format, Object arg) { @Override public void trace(String format, Object arg1, Object arg2) { - new ParameterizedMessage(format, arg1, arg2).getFormattedMessage(); TracingSlf4jImpl.event( name, TracingSlf4jImpl.TRACE, @@ -394,10 +395,13 @@ public void error(String format, Object arg1, Object arg2) { @Override public void error(String format, Object... arguments) { - TracingSlf4jImpl.event( - name, - TracingSlf4jImpl.ERROR, - new ParameterizedMessage(format, arguments).getFormattedMessage()); + FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments); + if (ft.getThrowable() != null) { + this.error(ft.getMessage(), ft.getThrowable()); + } else { + var pm = new ParameterizedMessage(ft.getMessage(), ft.getArgArray()); + TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, pm.getFormattedMessage()); + } } @Override diff --git a/src/config/test-transaction.toml b/src/config/test-transaction.toml new file mode 100644 index 000000000000..cfcd5a2fc69f --- /dev/null +++ b/src/config/test-transaction.toml @@ -0,0 +1,17 @@ +[streaming.developer] +stream_enable_executor_row_count = false +stream_connector_message_buffer_size = 16 +stream_unsafe_extreme_cache_size = 10 +stream_chunk_size = 5 + +[system] +barrier_interval_ms = 2000 +checkpoint_frequency = 1 +sstable_size_mb = 256 +parallel_compact_size_mb = 512 +block_size_kb = 64 +bloom_false_positive = 0.001 +backup_storage_url = "memory" +backup_storage_directory = "backup" +max_concurrent_creating_streaming_jobs = 1 +pause_on_next_bootstrap = false diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index b0a18e8c930d..33d230647bba 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -151,3 +151,71 @@ impl ByteStreamSourceParser for DebeziumParser { self.parse_inner(key, payload, writer).await } } + +#[cfg(test)] +mod tests { + use std::ops::Deref; + use std::str::FromStr; + + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{JsonbVal, ScalarImpl}; + + use super::*; + use crate::parser::{SourceStreamChunkBuilder, TransactionControl}; + use crate::source::DataType; + + #[tokio::test] + async fn test_parse_transaction_metadata() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + let mut parser = DebeziumParser::new_for_test(columns.clone()).await.unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + + // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + + let res = parser + .parse_one_with_txn( + None, + Some(begin_msg.as_bytes().to_vec()), + builder.row_writer(), + ) + .await; + println!("res: {:?}", res); + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => { + assert_eq!(id.deref(), "35352"); + } + _ => panic!("unexpected parse result: {:?}", res), + } + + let res = parser + .parse_one_with_txn( + None, + Some(commit_msg.as_bytes().to_vec()), + builder.row_writer(), + ) + .await; + println!("res: {:?}", res); + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => { + assert_eq!(id.deref(), "35352"); + } + _ => panic!("unexpected parse result: {:?}", res), + } + } +} diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 8e878f19ef12..60795c7e10f8 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::LazyLock; +use std::time::Duration; use anyhow::anyhow; use auto_enums::auto_enum; @@ -575,10 +576,15 @@ async fn into_chunk_stream(mut parser: P, data_stream ); *len = 0; // reset `len` while keeping `id` yield_asap = false; - yield StreamChunkWithState { + let parsed_chunk = StreamChunkWithState { chunk: builder.take(batch_len), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), }; + tracing::info!( + "transaction is larger than max row limit, emit parsed chunk: {} rows", + parsed_chunk.chunk.cardinality() + ); + yield parsed_chunk; } else { // Normal transaction. After the transaction is committed, we should yield the last // batch immediately, so set `yield_asap` to true. @@ -665,6 +671,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { tracing::warn!(current_id, id, "already in transaction"); } + tracing::info!("begin upstream transaction: id={}", id); current_transaction = Some(Transaction { id, len: 0 }); } TransactionControl::Commit { id } => { @@ -672,6 +679,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if current_id != Some(&id) { tracing::warn!(?current_id, id, "transaction id mismatch"); } + tracing::info!("commit upstream transaction: id={}", id); current_transaction = None; } } @@ -680,22 +688,36 @@ async fn into_chunk_stream(mut parser: P, data_stream // chunk now. if current_transaction.is_none() && yield_asap { yield_asap = false; - yield StreamChunkWithState { + let parsed_chunk = StreamChunkWithState { chunk: builder.take(batch_len - (i + 1)), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), }; + tracing::info!( + "Not in a trxn and yield_asap, emit parsed chunk: {} rows", + parsed_chunk.chunk.cardinality() + ); + yield parsed_chunk; } } } + tracing::info!("parsed one message, sleep for 1 second"); + // FAKE: process 1msg needs 1 sec + tokio::time::sleep(Duration::from_secs(1)).await; } // If we are not in a transaction, we should yield the chunk now. if current_transaction.is_none() { yield_asap = false; - yield StreamChunkWithState { + + let schunk = StreamChunkWithState { chunk: builder.take(0), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), }; + tracing::info!( + "Not in a trxn, emit parsed chunk: {} rows", + schunk.chunk.cardinality() + ); + yield schunk; } } } diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 6163cfe5c486..db80367ff804 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -67,12 +67,13 @@ where accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, ) { + let (tx_id, _) = id.split_once(':').unwrap(); match status.as_ref() { DEBEZIUM_TRANSACTION_STATUS_BEGIN => { - return Ok(TransactionControl::Begin { id }) + return Ok(TransactionControl::Begin { id: tx_id.into() }) } DEBEZIUM_TRANSACTION_STATUS_COMMIT => { - return Ok(TransactionControl::Commit { id }) + return Ok(TransactionControl::Commit { id: tx_id.into() }) } _ => {} } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index cbc39ee76290..66cac9d6e564 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -205,7 +205,7 @@ impl CommonSplitReader for CdcSplitReader { while let Some(result) = rx.recv().await { let GetEventStreamResponse { events, .. } = result?; - tracing::trace!("receive events {:?}", events.len()); + tracing::debug!("receive {} cdc events ", events.len()); metrics .connector_source_rows_received .with_label_values(&[source_type.as_str_name(), &source_id]) diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index c26a1ed41aaf..dc711a26243b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -326,6 +326,7 @@ mod tests { async fn test_cdc_filter_expr() { let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); + let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap(); let row1 = OwnedRow::new(vec![ Some(t1_json.into()), Some(r#"{"file": "1.binlog", "pos": 100}"#.into()), @@ -335,6 +336,11 @@ mod tests { Some(r#"{"file": "2.binlog", "pos": 100}"#.into()), ]); + let row3 = OwnedRow::new(vec![ + Some(trx_json.into()), + Some(r#"{"file": "3.binlog", "pos": 100}"#.into()), + ]); + let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("t1"); assert_eq!( filter_expr.eval_row(&row1).await.unwrap(), @@ -344,5 +350,12 @@ mod tests { filter_expr.eval_row(&row2).await.unwrap(), Some(ScalarImpl::Bool(false)) ); + assert_eq!( + filter_expr + .eval_row(&row3) + .await + .unwrap_or(Some(ScalarImpl::Bool(false))), + Some(ScalarImpl::Bool(false)) + ) } } From 21a60b36b94272e7f8c998ebae3ca0b8381a0721 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 4 Jan 2024 18:43:32 +0800 Subject: [PATCH 2/6] support transaction for shared cdc source --- .../source/core/DbzCdcEventConsumer.java | 3 +- .../tracing/TracingSlf4jAdapter.java | 11 +- proto/connector_service.proto | 1 + src/config/test-transaction.toml | 17 - src/connector/src/parser/avro/parser.rs | 1 + .../src/parser/debezium/debezium_parser.rs | 5 - src/connector/src/parser/mod.rs | 6 +- src/connector/src/parser/plain_parser.rs | 291 +++++++++++++++++- src/connector/src/parser/unified/debezium.rs | 58 ++-- .../src/source/cdc/source/message.rs | 3 + 10 files changed, 329 insertions(+), 67 deletions(-) delete mode 100644 src/config/test-transaction.toml diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index 1f7bd94081bf..427c29d9e1e1 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -144,10 +144,11 @@ var record = event.value(); record.topic(), record.valueSchema(), record.value()); var message = msgBuilder + .setIsTransactionMeta(true) .setPayload(new String(payload, StandardCharsets.UTF_8)) .setSourceTsMs(trxTs) .build(); - LOG.info("transaction => {}", message.getPayload()); + LOG.debug("transaction => {}", message); respBuilder.addEvents(message); break; diff --git a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java index bf9e95c75821..d5592285896b 100644 --- a/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java +++ b/java/connector-node/tracing/src/main/java/com/risingwave/tracing/TracingSlf4jAdapter.java @@ -23,8 +23,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.slf4j.Logger; import org.slf4j.Marker; -import org.slf4j.helpers.FormattingTuple; -import org.slf4j.helpers.MessageFormatter; public class TracingSlf4jAdapter implements Logger { @@ -54,13 +52,8 @@ private void logIfEnabled(int level, String format, Object arg1, Object arg2) { private void logIfEnabled(int level, String format, Object... arguments) { if (TracingSlf4jImpl.isEnabled(level)) { - FormattingTuple ft = MessageFormatter.arrayFormat(format, arguments); - if (ft.getThrowable() != null) { - this.error(ft.getMessage(), ft.getThrowable()); - } else { - var pm = new ParameterizedMessage(ft.getMessage(), ft.getArgArray()); - TracingSlf4jImpl.event(name, TracingSlf4jImpl.ERROR, pm.getFormattedMessage()); - } + TracingSlf4jImpl.event( + name, level, new ParameterizedMessage(format, arguments).getFormattedMessage()); } } diff --git a/proto/connector_service.proto b/proto/connector_service.proto index ddda62c6aace..c7603cbb3f50 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -160,6 +160,7 @@ message CdcMessage { string offset = 3; string full_table_name = 4; int64 source_ts_ms = 5; + bool is_transaction_meta = 6; } enum SourceType { diff --git a/src/config/test-transaction.toml b/src/config/test-transaction.toml deleted file mode 100644 index cfcd5a2fc69f..000000000000 --- a/src/config/test-transaction.toml +++ /dev/null @@ -1,17 +0,0 @@ -[streaming.developer] -stream_enable_executor_row_count = false -stream_connector_message_buffer_size = 16 -stream_unsafe_extreme_cache_size = 10 -stream_chunk_size = 5 - -[system] -barrier_interval_ms = 2000 -checkpoint_frequency = 1 -sstable_size_mb = 256 -parallel_compact_size_mb = 512 -block_size_kb = 64 -bloom_false_positive = 0.001 -backup_storage_url = "memory" -backup_storage_directory = "backup" -max_concurrent_creating_streaming_jobs = 1 -pause_on_next_bootstrap = false diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 28e37fd0e093..10e000a4fdab 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -288,6 +288,7 @@ mod test { )?), rw_columns: Vec::default(), source_ctx: Default::default(), + transaction_meta_builder: None, }) } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 33d230647bba..1ee0835349c9 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -155,11 +155,8 @@ impl ByteStreamSourceParser for DebeziumParser { #[cfg(test)] mod tests { use std::ops::Deref; - use std::str::FromStr; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; - use risingwave_common::row::OwnedRow; - use risingwave_common::types::{JsonbVal, ScalarImpl}; use super::*; use crate::parser::{SourceStreamChunkBuilder, TransactionControl}; @@ -195,7 +192,6 @@ mod tests { builder.row_writer(), ) .await; - println!("res: {:?}", res); match res { Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => { assert_eq!(id.deref(), "35352"); @@ -210,7 +206,6 @@ mod tests { builder.row_writer(), ) .await; - println!("res: {:?}", res); match res { Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => { assert_eq!(id.deref(), "35352"); diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 60795c7e10f8..594c08cd4c69 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::LazyLock; -use std::time::Duration; use anyhow::anyhow; use auto_enums::auto_enum; @@ -159,7 +158,7 @@ pub struct SourceStreamChunkRowWriter<'a> { /// The meta data of the original message for a row writer. /// /// Extracted from the `SourceMessage`. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub struct MessageMeta<'a> { meta: &'a SourceMeta, split_id: &'a str, @@ -700,9 +699,6 @@ async fn into_chunk_stream(mut parser: P, data_stream } } } - tracing::info!("parsed one message, sleep for 1 second"); - // FAKE: process 1msg needs 1 sec - tokio::time::sleep(Duration::from_secs(1)).await; } // If we are not in a transaction, we should yield the chunk now. diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index f113c279f2ef..da8ac9ba1648 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -20,12 +20,14 @@ use super::{ SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::parser::bytes_parser::BytesAccessBuilder; +use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder; +use crate::parser::unified::debezium::parse_transaction_meta; use crate::parser::unified::upsert::UpsertChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer_with_op; use crate::parser::unified::{AccessImpl, ChangeEventOperation}; use crate::parser::upsert_parser::get_key_column_name; -use crate::parser::{BytesProperties, ParserFormat}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +use crate::parser::{BytesProperties, ParseResult, ParserFormat}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta}; #[derive(Debug)] pub struct PlainParser { @@ -33,6 +35,8 @@ pub struct PlainParser { pub payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, pub source_ctx: SourceContextRef, + // parsing transaction metadata for shared cdc source + pub transaction_meta_builder: Option, } impl PlainParser { @@ -64,11 +68,16 @@ impl PlainParser { ))); } }; + + let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson( + DebeziumJsonAccessBuilder::new()?, + )); Ok(Self { key_builder, payload_builder, rw_columns, source_ctx, + transaction_meta_builder, }) } @@ -77,7 +86,25 @@ impl PlainParser { key: Option>, payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result<()> { + ) -> Result { + // if the message is transaction metadata, parse it and return + if let Some(msg_meta) = writer.row_meta + && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.meta + && cdc_meta.is_transaction_meta + && let Some(data) = payload + { + let accessor = self + .transaction_meta_builder + .as_mut() + .expect("expect transaction metadata access builder") + .generate_accessor(data) + .await?; + return match parse_transaction_meta(&accessor) { + Ok(transaction_control) => Ok(ParseResult::TransactionControl(transaction_control)), + Err(err) => Err(err)?, + }; + } + // reuse upsert component but always insert let mut row_op: UpsertChangeEvent, AccessImpl<'_, '_>> = UpsertChangeEvent::default(); @@ -94,8 +121,14 @@ impl PlainParser { row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?); } - apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op) - .map_err(Into::into) + Ok( + apply_row_operation_on_stream_chunk_writer_with_op( + row_op, + &mut writer, + change_event_op, + ) + .map(|_| ParseResult::Rows)?, + ) } } @@ -113,11 +146,257 @@ impl ByteStreamSourceParser for PlainParser { } async fn parse_one<'a>( + &'a mut self, + _key: Option>, + _payload: Option>, + _writer: SourceStreamChunkRowWriter<'a>, + ) -> Result<()> { + unreachable!("should call `parse_one_with_txn` instead") + } + + async fn parse_one_with_txn<'a>( &'a mut self, key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result<()> { + ) -> Result { + tracing::info!("parse_one_with_txn"); self.parse_inner(key, payload, writer).await } } + +#[cfg(test)] +mod tests { + use std::ops::Deref; + use std::sync::Arc; + + use futures::executor::block_on; + use futures::StreamExt; + use futures_async_stream::try_stream; + use itertools::Itertools; + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + + use super::*; + use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl}; + use crate::source::cdc::DebeziumCdcMeta; + use crate::source::{DataType, SourceMessage, SplitId}; + + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn source_message_stream(transactional: bool) { + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + let data_batches = vec![ + vec![ + r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + ], + vec![ + r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + ], + ]; + for (i, batch) in data_batches.iter().enumerate() { + let mut source_msg_batch = vec![]; + if i == 0 { + // put begin message at first + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: transactional, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(begin_msg.as_bytes().to_vec()), + }); + } + // put data messages + for data_msg in batch { + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: false, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(data_msg.as_bytes().to_vec()), + }); + } + if i == data_batches.len() - 1 { + // put commit message at last + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: transactional, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(commit_msg.as_bytes().to_vec()), + }); + } + yield source_msg_batch; + } + } + + #[tokio::test] + async fn test_emit_transactional_chunk() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + // format plain encode json parser + let parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + Arc::new(SourceContext::default()), + ) + .await + .unwrap(); + + let mut transactional = false; + // for untransactional source, we expect emit a chunk for each message batch + let message_stream = source_message_stream(transactional); + let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed()); + let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) + .into_iter() + .collect(); + let output = output + .unwrap() + .into_iter() + .filter(|c| c.chunk.cardinality() > 0) + .enumerate() + .map(|(i, c)| { + if i == 0 { + // begin + 3 data messages + assert_eq!(4, c.chunk.cardinality()); + } + if i == 1 { + // 2 data messages + 1 end + assert_eq!(3, c.chunk.cardinality()); + } + c.chunk + }) + .collect_vec(); + + // 2 chunks for 2 message batches + assert_eq!(2, output.len()); + + // format plain encode json parser + let parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + Arc::new(SourceContext::default()), + ) + .await + .unwrap(); + + // for transactional source, we expect emit a single chunk for the transaction + transactional = true; + let message_stream = source_message_stream(transactional); + let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed()); + let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) + .into_iter() + .collect(); + let output = output + .unwrap() + .into_iter() + .filter(|c| c.chunk.cardinality() > 0) + .map(|c| { + // 5 data messages in a single chunk + assert_eq!(5, c.chunk.cardinality()); + c.chunk + }) + .collect_vec(); + + // a single transactional chunk + assert_eq!(1, output.len()); + } + + #[tokio::test] + async fn test_parse_transaction_metadata() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + // format plain encode json parser + let mut parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + Arc::new(SourceContext::default()), + ) + .await + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + + // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + + let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: true, + }); + let msg_meta = MessageMeta { + meta: &cdc_meta, + split_id: "1001", + offset: "", + }; + + let res = parser + .parse_one_with_txn( + None, + Some(begin_msg.as_bytes().to_vec()), + builder.row_writer().with_meta(msg_meta), + ) + .await; + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => { + assert_eq!(id.deref(), "35352"); + } + _ => panic!("unexpected parse result: {:?}", res), + } + let res = parser + .parse_one_with_txn( + None, + Some(commit_msg.as_bytes().to_vec()), + builder.row_writer().with_meta(msg_meta), + ) + .await; + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => { + assert_eq!(id.deref(), "35352"); + } + _ => panic!("unexpected parse result: {:?}", res), + } + + let output = builder.take(10); + assert_eq!(0, output.cardinality()); + } +} diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index db80367ff804..4e28f924d0d1 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use super::{Access, AccessError, ChangeEvent, ChangeEventOperation}; @@ -26,8 +27,8 @@ pub struct DebeziumChangeEvent { const BEFORE: &str = "before"; const AFTER: &str = "after"; const OP: &str = "op"; -const TRANSACTION_STATUS: &str = "status"; -const TRANSACTION_ID: &str = "id"; +pub const TRANSACTION_STATUS: &str = "status"; +pub const TRANSACTION_ID: &str = "id"; pub const DEBEZIUM_READ_OP: &str = "r"; pub const DEBEZIUM_CREATE_OP: &str = "c"; @@ -37,6 +38,31 @@ pub const DEBEZIUM_DELETE_OP: &str = "d"; pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN"; pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END"; +pub fn parse_transaction_meta( + accessor: &impl Access, +) -> std::result::Result { + if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( + accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, + accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, + ) { + let (tx_id, _) = id.split_once(':').unwrap(); + match status.as_ref() { + DEBEZIUM_TRANSACTION_STATUS_BEGIN => { + return Ok(TransactionControl::Begin { id: tx_id.into() }) + } + DEBEZIUM_TRANSACTION_STATUS_COMMIT => { + return Ok(TransactionControl::Commit { id: tx_id.into() }) + } + _ => {} + } + } + + Err(AccessError::Undefined { + name: "transaction status".into(), + path: TRANSACTION_STATUS.into(), + }) +} + impl DebeziumChangeEvent where A: Access, @@ -62,28 +88,12 @@ where /// /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details. pub(crate) fn transaction_control(&self) -> Result { - if let Some(accessor) = &self.value_accessor { - if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( - accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, - accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, - ) { - let (tx_id, _) = id.split_once(':').unwrap(); - match status.as_ref() { - DEBEZIUM_TRANSACTION_STATUS_BEGIN => { - return Ok(TransactionControl::Begin { id: tx_id.into() }) - } - DEBEZIUM_TRANSACTION_STATUS_COMMIT => { - return Ok(TransactionControl::Commit { id: tx_id.into() }) - } - _ => {} - } - } - } - - Err(AccessError::Undefined { - name: "transaction status".into(), - path: Default::default(), - }) + let Some(accessor) = &self.value_accessor else { + return Err(AccessError::Other(anyhow!( + "value_accessor must be provided to parse transaction metadata" + ))); + }; + parse_transaction_meta(accessor) } } diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index 04518f7088b4..28fe52c52cd1 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -22,6 +22,8 @@ pub struct DebeziumCdcMeta { pub full_table_name: String, // extracted from `payload.source.ts_ms`, the time that the change event was made in the database pub source_ts_ms: i64, + // Whether the message is a transaction metadata + pub is_transaction_meta: bool, } impl From for SourceMessage { @@ -38,6 +40,7 @@ impl From for SourceMessage { meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { full_table_name: message.full_table_name, source_ts_ms: message.source_ts_ms, + is_transaction_meta: message.is_transaction_meta, }), } } From aac2b39cede2998d5ba9fec90f1ca6f55728ee38 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 4 Jan 2024 20:30:54 +0800 Subject: [PATCH 3/6] minor --- e2e_test/source/cdc/cdc.share_stream.slt | 15 +-- e2e_test/source/cdc/postgres_cdc.sql | 33 ----- .../source/core/DbzCdcEventConsumer.java | 4 - .../src/main/resources/debezium.properties | 2 +- .../src/parser/debezium/debezium_parser.rs | 2 - src/connector/src/parser/mod.rs | 25 +--- src/connector/src/parser/plain_parser.rs | 126 +++++++++--------- 7 files changed, 70 insertions(+), 137 deletions(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index a4ef61a717ea..babfb685a945 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -193,7 +193,7 @@ CREATE TABLE person_new ( ) FROM pg_source TABLE 'person'; statement ok -CREATE TABLE person_tx ( +CREATE TABLE person_new ( id int, name varchar, email_address varchar, @@ -202,19 +202,6 @@ CREATE TABLE person_tx ( PRIMARY KEY (id) ) FROM pg_source TABLE 'public.person'; -CREATE TABLE orders_tx ( - order_id INTEGER, - order_date DATE, - customer_name VARCHAR, - price DECIMAL, - product_id INTEGER, - order_status BOOLEAN, - PRIMARY KEY (order_id) -) FROM pg_source TABLE 'public.orders_tx'; - -create materialized view mv1 as select 'person' as table_name, count(*) as count from person_tx union all select 'orders' as table_name, count(*) as count from orders_tx; - - statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index 3f52363d9565..43dba14950b3 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -14,28 +14,6 @@ VALUES (default,10001,'Beijing','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false); -CREATE TABLE orders_tx ( - order_id SERIAL NOT NULL PRIMARY KEY, - order_date DATE NOT NULL, - customer_name VARCHAR(255) NOT NULL, - price DECIMAL NOT NULL, - product_id INTEGER NOT NULL, - order_status BOOLEAN NOT NULL -); -ALTER SEQUENCE public.orders_tx_order_id_seq RESTART WITH 10001; - -INSERT INTO orders_tx -VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), - (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false); - -INSERT INTO orders_tx -VALUES (default, '2023-12-30 19:11:09', '张三', 15.00, 105, true), - (default, '2020-07-30 12:00:30', '李四', 25.25, 106, false); - -INSERT INTO orders_tx -VALUES (default, '2024-01-01 17:00:00', 'Sam', 1000.20, 110, false); - - CREATE TABLE person ( "id" int, "name" varchar(64), @@ -52,17 +30,6 @@ INSERT INTO person VALUES (1000, 'vicky noris', 'yplkvgz@qbxfg.com', '7878 5821 INSERT INTO person VALUES (1001, 'peter white', 'myckhsp@xpmpe.com', '1781 2313 8157 6974', 'boise'); INSERT INTO person VALUES (1002, 'sarah spencer', 'wipvdbm@dkaap.com', '3453 4987 9481 6270', 'los angeles'); -INSERT INTO person VALUES (1003, 'Kafka', 'ypl@qbxfg.com', '1864 2539', 'Shanghai'); -INSERT INTO person VALUES (1100, 'noris', 'ypl@qbxfg.com', '1864 2539', 'enne'); -INSERT INTO person VALUES (1101, 'white', 'myc@xpmpe.com', '8157 6974', 'se'); - - -SELECT COUNT(*) AS person_count -FROM person_tx -UNION ALL -SELECT COUNT(*) AS order_count -FROM orders_tx; - create schema abs; create table abs.t1 (v1 int primary key, v2 double precision, v3 varchar, v4 numeric); create publication my_publicaton for table abs.t1 (v1, v3); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index 427c29d9e1e1..f0880d52c8b5 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -75,7 +75,6 @@ public class DbzCdcEventConsumer } private EventType getEventType(SourceRecord record) { - LOG.info("event topic: {}", record.topic()); if (isHeartbeatEvent(record)) { return EventType.HEARTBEAT; } else if (isTransactionMetaEvent(record)) { @@ -103,7 +102,6 @@ public void handleBatch( DebeziumEngine.RecordCommitter> committer) throws InterruptedException { var respBuilder = GetEventStreamResponse.newBuilder(); - LOG.info("event batch size => {}", events.size()); for (ChangeEvent event : events) { var record = event.value(); EventType eventType = getEventType(record); @@ -133,7 +131,6 @@ var record = event.value(); var message = msgBuilder.build(); LOG.debug("heartbeat => {}", message.getOffset()); respBuilder.addEvents(message); - break; } case TRANSACTION: @@ -150,7 +147,6 @@ var record = event.value(); .build(); LOG.debug("transaction => {}", message); respBuilder.addEvents(message); - break; } case DATA: diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties index a6af436cedcc..22b7f0c4689c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/debezium.properties @@ -9,7 +9,7 @@ datetime.type=com.risingwave.connector.cdc.debezium.converters.DatetimeTypeConve # parse decimal in the "precise" mode decimal.handling.mode=${debezium.decimal.handling.mode:-string} interval.handling.mode=string -max.batch.size=${debezium.max.batch.size:-5} +max.batch.size=${debezium.max.batch.size:-1024} max.queue.size=${debezium.max.queue.size:-8192} time.precision.mode=adaptive_time_microseconds # The maximum number of retries on connection errors before failing diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 1ee0835349c9..5427dddd4b98 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -184,7 +184,6 @@ mod tests { // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; - let res = parser .parse_one_with_txn( None, @@ -198,7 +197,6 @@ mod tests { } _ => panic!("unexpected parse result: {:?}", res), } - let res = parser .parse_one_with_txn( None, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 594c08cd4c69..1c165b45660e 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -575,15 +575,10 @@ async fn into_chunk_stream(mut parser: P, data_stream ); *len = 0; // reset `len` while keeping `id` yield_asap = false; - let parsed_chunk = StreamChunkWithState { + yield StreamChunkWithState { chunk: builder.take(batch_len), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), }; - tracing::info!( - "transaction is larger than max row limit, emit parsed chunk: {} rows", - parsed_chunk.chunk.cardinality() - ); - yield parsed_chunk; } else { // Normal transaction. After the transaction is committed, we should yield the last // batch immediately, so set `yield_asap` to true. @@ -670,7 +665,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { tracing::warn!(current_id, id, "already in transaction"); } - tracing::info!("begin upstream transaction: id={}", id); + tracing::debug!("begin upstream transaction: id={}", id); current_transaction = Some(Transaction { id, len: 0 }); } TransactionControl::Commit { id } => { @@ -678,7 +673,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if current_id != Some(&id) { tracing::warn!(?current_id, id, "transaction id mismatch"); } - tracing::info!("commit upstream transaction: id={}", id); + tracing::debug!("commit upstream transaction: id={}", id); current_transaction = None; } } @@ -687,15 +682,10 @@ async fn into_chunk_stream(mut parser: P, data_stream // chunk now. if current_transaction.is_none() && yield_asap { yield_asap = false; - let parsed_chunk = StreamChunkWithState { + yield StreamChunkWithState { chunk: builder.take(batch_len - (i + 1)), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), }; - tracing::info!( - "Not in a trxn and yield_asap, emit parsed chunk: {} rows", - parsed_chunk.chunk.cardinality() - ); - yield parsed_chunk; } } } @@ -705,15 +695,10 @@ async fn into_chunk_stream(mut parser: P, data_stream if current_transaction.is_none() { yield_asap = false; - let schunk = StreamChunkWithState { + yield StreamChunkWithState { chunk: builder.take(0), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), }; - tracing::info!( - "Not in a trxn, emit parsed chunk: {} rows", - schunk.chunk.cardinality() - ); - yield schunk; } } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index da8ac9ba1648..0df7c05bb7e1 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -181,69 +181,6 @@ mod tests { use crate::source::cdc::DebeziumCdcMeta; use crate::source::{DataType, SourceMessage, SplitId}; - #[try_stream(ok = Vec, error = anyhow::Error)] - async fn source_message_stream(transactional: bool) { - let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; - let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; - let data_batches = vec![ - vec![ - r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, - r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, - r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, - ], - vec![ - r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, - r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35392, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, - ], - ]; - for (i, batch) in data_batches.iter().enumerate() { - let mut source_msg_batch = vec![]; - if i == 0 { - // put begin message at first - source_msg_batch.push(SourceMessage { - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: transactional, - }), - split_id: SplitId::from("1001"), - offset: "0".into(), - key: None, - payload: Some(begin_msg.as_bytes().to_vec()), - }); - } - // put data messages - for data_msg in batch { - source_msg_batch.push(SourceMessage { - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: false, - }), - split_id: SplitId::from("1001"), - offset: "0".into(), - key: None, - payload: Some(data_msg.as_bytes().to_vec()), - }); - } - if i == data_batches.len() - 1 { - // put commit message at last - source_msg_batch.push(SourceMessage { - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: transactional, - }), - split_id: SplitId::from("1001"), - offset: "0".into(), - key: None, - payload: Some(commit_msg.as_bytes().to_vec()), - }); - } - yield source_msg_batch; - } - } - #[tokio::test] async fn test_emit_transactional_chunk() { let schema = vec![ @@ -328,6 +265,69 @@ mod tests { assert_eq!(1, output.len()); } + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn source_message_stream(transactional: bool) { + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + let data_batches = vec![ + vec![ + r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + ], + vec![ + r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + ], + ]; + for (i, batch) in data_batches.iter().enumerate() { + let mut source_msg_batch = vec![]; + if i == 0 { + // put begin message at first + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: transactional, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(begin_msg.as_bytes().to_vec()), + }); + } + // put data messages + for data_msg in batch { + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: false, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(data_msg.as_bytes().to_vec()), + }); + } + if i == data_batches.len() - 1 { + // put commit message at last + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: transactional, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(commit_msg.as_bytes().to_vec()), + }); + } + yield source_msg_batch; + } + } + #[tokio::test] async fn test_parse_transaction_metadata() { let schema = vec![ From e0f36a3a78dc327b6889382b509ac431c6e8d371 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 4 Jan 2024 21:39:39 +0800 Subject: [PATCH 4/6] parse transaction meta based on connector type --- src/batch/src/executor/source.rs | 6 +++ .../src/parser/debezium/debezium_parser.rs | 18 ++++++++- src/connector/src/parser/plain_parser.rs | 22 +++++++---- src/connector/src/parser/unified/debezium.rs | 37 ++++++++++++++----- src/connector/src/source/base.rs | 5 +++ src/source/src/connector_source.rs | 3 ++ src/source/src/source_desc.rs | 5 +++ .../src/executor/source/fetch_executor.rs | 1 + .../src/executor/source/fs_source_executor.rs | 7 ++++ .../src/executor/source/source_executor.rs | 3 +- src/stream/src/from_proto/source/fs_fetch.rs | 6 +++ .../src/from_proto/source/trad_source.rs | 12 +++--- 12 files changed, 100 insertions(+), 25 deletions(-) diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index a60398ef12f7..2c0ba069e3d8 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -63,6 +63,10 @@ impl BoxedExecutorBuilder for SourceExecutor { // prepare connector source let source_props: HashMap = HashMap::from_iter(source_node.with_properties.clone()); + let connector = source_props + .get("connector") + .map(|c| c.to_ascii_lowercase()) + .unwrap_or_default(); let config = ConnectorProperties::extract(source_props, false).map_err(BatchError::connector)?; @@ -84,6 +88,7 @@ impl BoxedExecutorBuilder for SourceExecutor { .get_config() .developer .connector_message_buffer_size, + connector, }; let source_ctrl_opts = SourceCtrlOpts { chunk_size: source.context().get_config().developer.chunk_size, @@ -146,6 +151,7 @@ impl SourceExecutor { self.metrics, self.source_ctrl_opts.clone(), None, + "".into(), )); let stream = self .connector_source diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 5427dddd4b98..59c89ce6e3c8 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -109,7 +109,9 @@ impl DebeziumParser { Err(err) => { // Only try to access transaction control message if the row operation access failed // to make it a fast path. - if let Ok(transaction_control) = row_op.transaction_control() { + if let Ok(transaction_control) = + row_op.transaction_control(&self.source_ctx.connector) + { Ok(ParseResult::TransactionControl(transaction_control)) } else { Err(err)? @@ -155,6 +157,7 @@ impl ByteStreamSourceParser for DebeziumParser { #[cfg(test)] mod tests { use std::ops::Deref; + use std::sync::Arc; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; @@ -178,7 +181,18 @@ mod tests { .map(|c| SourceColumnDesc::from(&c.column_desc)) .collect::>(); - let mut parser = DebeziumParser::new_for_test(columns.clone()).await.unwrap(); + let props = SpecificParserConfig { + key_encoding_config: None, + encoding_config: EncodingProperties::Json(JsonProperties { + use_schema_registry: false, + }), + protocol_config: ProtocolProperties::Debezium, + }; + let mut source_ctx = SourceContext::default(); + source_ctx.connector = "postgres-cdc".into(); + let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) + .await + .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 0df7c05bb7e1..e61f5bc600da 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -99,7 +99,7 @@ impl PlainParser { .expect("expect transaction metadata access builder") .generate_accessor(data) .await?; - return match parse_transaction_meta(&accessor) { + return match parse_transaction_meta(&accessor, &self.source_ctx.connector) { Ok(transaction_control) => Ok(ParseResult::TransactionControl(transaction_control)), Err(err) => Err(err)?, }; @@ -197,11 +197,14 @@ mod tests { .map(|c| SourceColumnDesc::from(&c.column_desc)) .collect::>(); + let mut source_ctx = SourceContext::default(); + source_ctx.connector = "postgres-cdc".into(); + let source_ctx = Arc::new(source_ctx); // format plain encode json parser let parser = PlainParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, columns.clone(), - Arc::new(SourceContext::default()), + source_ctx.clone(), ) .await .unwrap(); @@ -238,7 +241,7 @@ mod tests { let parser = PlainParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, columns.clone(), - Arc::new(SourceContext::default()), + source_ctx, ) .await .unwrap(); @@ -345,18 +348,20 @@ mod tests { .collect::>(); // format plain encode json parser + let mut source_ctx = SourceContext::default(); + source_ctx.connector = "mysql-cdc".into(); let mut parser = PlainParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, columns.clone(), - Arc::new(SourceContext::default()), + Arc::new(source_ctx), ) .await .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN - let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; - let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta { full_table_name: "orders".to_string(), @@ -369,6 +374,7 @@ mod tests { offset: "", }; + let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23"; let res = parser .parse_one_with_txn( None, @@ -378,7 +384,7 @@ mod tests { .await; match res { Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => { - assert_eq!(id.deref(), "35352"); + assert_eq!(id.deref(), expect_tx_id); } _ => panic!("unexpected parse result: {:?}", res), } @@ -391,7 +397,7 @@ mod tests { .await; match res { Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => { - assert_eq!(id.deref(), "35352"); + assert_eq!(id.deref(), expect_tx_id); } _ => panic!("unexpected parse result: {:?}", res), } diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 4e28f924d0d1..117623f88b57 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -38,21 +38,37 @@ pub const DEBEZIUM_DELETE_OP: &str = "d"; pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN"; pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END"; +const PG_CDC: &str = "postgres-cdc"; +const MYSQL_CDC: &str = "mysql-cdc"; + pub fn parse_transaction_meta( accessor: &impl Access, + connector: &str, ) -> std::result::Result { if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, ) { - let (tx_id, _) = id.split_once(':').unwrap(); + // The id field has different meanings for different databases: + // PG: txID:LSN + // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23) match status.as_ref() { - DEBEZIUM_TRANSACTION_STATUS_BEGIN => { - return Ok(TransactionControl::Begin { id: tx_id.into() }) - } - DEBEZIUM_TRANSACTION_STATUS_COMMIT => { - return Ok(TransactionControl::Commit { id: tx_id.into() }) - } + DEBEZIUM_TRANSACTION_STATUS_BEGIN => match connector { + PG_CDC => { + let (tx_id, _) = id.split_once(':').unwrap(); + return Ok(TransactionControl::Begin { id: tx_id.into() }); + } + MYSQL_CDC => return Ok(TransactionControl::Begin { id }), + _ => {} + }, + DEBEZIUM_TRANSACTION_STATUS_COMMIT => match connector { + PG_CDC => { + let (tx_id, _) = id.split_once(':').unwrap(); + return Ok(TransactionControl::Commit { id: tx_id.into() }); + } + MYSQL_CDC => return Ok(TransactionControl::Commit { id }), + _ => {} + }, _ => {} } } @@ -87,13 +103,16 @@ where /// Returns the transaction metadata if exists. /// /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details. - pub(crate) fn transaction_control(&self) -> Result { + pub(crate) fn transaction_control( + &self, + connector: &str, + ) -> Result { let Some(accessor) = &self.value_accessor else { return Err(AccessError::Other(anyhow!( "value_accessor must be provided to parse transaction metadata" ))); }; - parse_transaction_meta(accessor) + parse_transaction_meta(accessor, connector) } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1a4a594d4428..1d80aee6ca21 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -159,6 +159,7 @@ pub struct SourceContext { pub source_info: SourceInfo, pub metrics: Arc, pub source_ctrl_opts: SourceCtrlOpts, + pub connector: String, error_suppressor: Option>>, } impl SourceContext { @@ -169,6 +170,7 @@ impl SourceContext { metrics: Arc, source_ctrl_opts: SourceCtrlOpts, connector_client: Option, + connector: String, ) -> Self { Self { connector_client, @@ -180,6 +182,7 @@ impl SourceContext { metrics, source_ctrl_opts, error_suppressor: None, + connector, } } @@ -191,6 +194,7 @@ impl SourceContext { source_ctrl_opts: SourceCtrlOpts, connector_client: Option, error_suppressor: Arc>, + connector: String, ) -> Self { let mut ctx = Self::new( actor_id, @@ -199,6 +203,7 @@ impl SourceContext { metrics, source_ctrl_opts, connector_client, + connector, ); ctx.error_suppressor = Some(error_suppressor); ctx diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index ecb1846c0124..96570e37b6e4 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -46,6 +46,7 @@ pub struct ConnectorSource { pub columns: Vec, pub parser_config: SpecificParserConfig, pub connector_message_buffer_size: usize, + pub connector: String, } #[derive(Clone, Debug)] @@ -63,6 +64,7 @@ impl ConnectorSource { columns: Vec, connector_message_buffer_size: usize, parser_config: SpecificParserConfig, + connector: String, ) -> Result { let config = ConnectorProperties::extract(properties, false) .map_err(|e| ConnectorError(e.into()))?; @@ -72,6 +74,7 @@ impl ConnectorSource { columns, parser_config, connector_message_buffer_size, + connector, }) } diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index aed602aa71ff..1b85acf9e4db 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -56,6 +56,7 @@ pub struct SourceDescBuilder { connector_params: ConnectorParams, connector_message_buffer_size: usize, pk_indices: Vec, + connector: String, } impl SourceDescBuilder { @@ -69,6 +70,7 @@ impl SourceDescBuilder { connector_params: ConnectorParams, connector_message_buffer_size: usize, pk_indices: Vec, + connector: String, ) -> Self { Self { columns, @@ -79,6 +81,7 @@ impl SourceDescBuilder { connector_params, connector_message_buffer_size, pk_indices, + connector, } } @@ -107,6 +110,7 @@ impl SourceDescBuilder { columns.clone(), self.connector_message_buffer_size, psrser_config, + self.connector, )?; Ok(SourceDesc { @@ -200,6 +204,7 @@ pub mod test_utils { connector_params: Default::default(), connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, pk_indices, + connector: "test".to_string(), } } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 32cd08e9c87f..88b7cf30606c 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -171,6 +171,7 @@ impl FsFetchExecutor { self.source_ctrl_opts.clone(), self.connector_params.connector_client.clone(), self.actor_ctx.error_suppressor.clone(), + source_desc.source.connector.clone(), ) } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 85967a253ba9..552a03646f7d 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -96,6 +96,12 @@ impl FsSourceExecutor { .iter() .map(|column_desc| column_desc.column_id) .collect_vec(); + let connector = source_desc + .source + .properties + .get("connector") + .map(|c| c.to_ascii_lowercase()) + .unwrap_or_default(); let source_ctx = SourceContext::new_with_suppressor( self.actor_ctx.id, self.stream_source_core.source_id, @@ -104,6 +110,7 @@ impl FsSourceExecutor { self.source_ctrl_opts.clone(), None, self.actor_ctx.error_suppressor.clone(), + connector, ); source_desc .source diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 433d54431ced..ff81527ad815 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -25,7 +25,7 @@ use risingwave_connector::source::{ BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState, }; -use risingwave_connector::ConnectorParams; +use risingwave_connector::{dispatch_source_prop, ConnectorParams}; use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; use risingwave_storage::StateStore; use thiserror_ext::AsReport; @@ -106,6 +106,7 @@ impl SourceExecutor { self.source_ctrl_opts.clone(), self.connector_params.connector_client.clone(), self.actor_ctx.error_suppressor.clone(), + source_desc.source.connector.clone(), ); source_desc .source diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index dc6b718f566e..c90cf4344d54 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -48,6 +48,11 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { let source_name = source.source_name.clone(); let source_info = source.get_info()?; let properties = ConnectorProperties::extract(source.with_properties.clone(), false)?; + let connector = source + .with_properties + .get("connector") + .map(|c| c.to_ascii_lowercase()) + .unwrap_or_default(); let source_desc_builder = SourceDescBuilder::new( source.columns.clone(), params.env.source_metrics(), @@ -57,6 +62,7 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { params.env.connector_params(), params.env.config().developer.connector_message_buffer_size, params.info.pk_indices.clone(), + connector, ); let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 8234188e9a4f..2d5480ce4ea5 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -83,6 +83,12 @@ impl ExecutorBuilder for SourceExecutorBuilder { } } + let connector = source + .with_properties + .get("connector") + .map(|c| c.to_ascii_lowercase()) + .unwrap_or_default(); + let source_desc_builder = SourceDescBuilder::new( source_columns.clone(), params.env.source_metrics(), @@ -101,6 +107,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { // We should consdier add back the "pk_column_ids" field removed by #8841 in // StreamSource params.info.pk_indices.clone(), + connector.clone(), ); let source_ctrl_opts = SourceCtrlOpts { @@ -125,11 +132,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { state_table_handler, ); - let connector = source - .with_properties - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); let is_fs_v2_connector = ConnectorProperties::is_new_fs_connector_hash_map(&source.with_properties); From 1b0b01d413bc47a852f8a8afd709df412f4f40e5 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 5 Jan 2024 13:07:26 +0800 Subject: [PATCH 5/6] refactor --- src/batch/src/executor/source.rs | 7 +----- .../src/parser/debezium/debezium_parser.rs | 6 ++--- src/connector/src/parser/plain_parser.rs | 8 +++---- src/connector/src/parser/unified/debezium.rs | 23 ++++++++----------- src/connector/src/source/base.rs | 16 +++++++++---- src/connector/src/source/cdc/source/reader.rs | 2 +- src/connector/src/source/test_source.rs | 2 +- src/source/src/connector_source.rs | 3 --- src/source/src/source_desc.rs | 5 ---- .../src/executor/source/fetch_executor.rs | 2 +- .../src/executor/source/fs_source_executor.rs | 8 +------ .../src/executor/source/source_executor.rs | 4 ++-- src/stream/src/from_proto/source/fs_fetch.rs | 6 ----- .../src/from_proto/source/trad_source.rs | 12 ++++------ 14 files changed, 40 insertions(+), 64 deletions(-) diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 2c0ba069e3d8..2714d5335b90 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -63,10 +63,6 @@ impl BoxedExecutorBuilder for SourceExecutor { // prepare connector source let source_props: HashMap = HashMap::from_iter(source_node.with_properties.clone()); - let connector = source_props - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); let config = ConnectorProperties::extract(source_props, false).map_err(BatchError::connector)?; @@ -88,7 +84,6 @@ impl BoxedExecutorBuilder for SourceExecutor { .get_config() .developer .connector_message_buffer_size, - connector, }; let source_ctrl_opts = SourceCtrlOpts { chunk_size: source.context().get_config().developer.chunk_size, @@ -151,7 +146,7 @@ impl SourceExecutor { self.metrics, self.source_ctrl_opts.clone(), None, - "".into(), + ConnectorProperties::default(), )); let stream = self .connector_source diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 59c89ce6e3c8..0f79677860f8 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -110,7 +110,7 @@ impl DebeziumParser { // Only try to access transaction control message if the row operation access failed // to make it a fast path. if let Ok(transaction_control) = - row_op.transaction_control(&self.source_ctx.connector) + row_op.transaction_control(&self.source_ctx.connector_props) { Ok(ParseResult::TransactionControl(transaction_control)) } else { @@ -163,7 +163,7 @@ mod tests { use super::*; use crate::parser::{SourceStreamChunkBuilder, TransactionControl}; - use crate::source::DataType; + use crate::source::{ConnectorProperties, DataType}; #[tokio::test] async fn test_parse_transaction_metadata() { @@ -189,7 +189,7 @@ mod tests { protocol_config: ProtocolProperties::Debezium, }; let mut source_ctx = SourceContext::default(); - source_ctx.connector = "postgres-cdc".into(); + source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default()); let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) .await .unwrap(); diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index e61f5bc600da..4f15234be837 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -99,7 +99,7 @@ impl PlainParser { .expect("expect transaction metadata access builder") .generate_accessor(data) .await?; - return match parse_transaction_meta(&accessor, &self.source_ctx.connector) { + return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props) { Ok(transaction_control) => Ok(ParseResult::TransactionControl(transaction_control)), Err(err) => Err(err)?, }; @@ -179,7 +179,7 @@ mod tests { use super::*; use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl}; use crate::source::cdc::DebeziumCdcMeta; - use crate::source::{DataType, SourceMessage, SplitId}; + use crate::source::{ConnectorProperties, DataType, SourceMessage, SplitId}; #[tokio::test] async fn test_emit_transactional_chunk() { @@ -198,7 +198,7 @@ mod tests { .collect::>(); let mut source_ctx = SourceContext::default(); - source_ctx.connector = "postgres-cdc".into(); + source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default()); let source_ctx = Arc::new(source_ctx); // format plain encode json parser let parser = PlainParser::new( @@ -349,7 +349,7 @@ mod tests { // format plain encode json parser let mut source_ctx = SourceContext::default(); - source_ctx.connector = "mysql-cdc".into(); + source_ctx.connector_props = ConnectorProperties::MysqlCdc(Box::default()); let mut parser = PlainParser::new( SpecificParserConfig::DEFAULT_PLAIN_JSON, columns.clone(), diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 117623f88b57..7291b1b35973 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -17,7 +17,7 @@ use risingwave_common::types::{DataType, Datum, ScalarImpl}; use super::{Access, AccessError, ChangeEvent, ChangeEventOperation}; use crate::parser::TransactionControl; -use crate::source::SourceColumnDesc; +use crate::source::{ConnectorProperties, SourceColumnDesc}; pub struct DebeziumChangeEvent { value_accessor: Option, @@ -38,12 +38,9 @@ pub const DEBEZIUM_DELETE_OP: &str = "d"; pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN"; pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END"; -const PG_CDC: &str = "postgres-cdc"; -const MYSQL_CDC: &str = "mysql-cdc"; - pub fn parse_transaction_meta( accessor: &impl Access, - connector: &str, + connector_props: &ConnectorProperties, ) -> std::result::Result { if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, @@ -53,20 +50,20 @@ pub fn parse_transaction_meta( // PG: txID:LSN // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23) match status.as_ref() { - DEBEZIUM_TRANSACTION_STATUS_BEGIN => match connector { - PG_CDC => { + DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props { + ConnectorProperties::PostgresCdc(_) => { let (tx_id, _) = id.split_once(':').unwrap(); return Ok(TransactionControl::Begin { id: tx_id.into() }); } - MYSQL_CDC => return Ok(TransactionControl::Begin { id }), + ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Begin { id }), _ => {} }, - DEBEZIUM_TRANSACTION_STATUS_COMMIT => match connector { - PG_CDC => { + DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props { + ConnectorProperties::PostgresCdc(_) => { let (tx_id, _) = id.split_once(':').unwrap(); return Ok(TransactionControl::Commit { id: tx_id.into() }); } - MYSQL_CDC => return Ok(TransactionControl::Commit { id }), + ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Commit { id }), _ => {} }, _ => {} @@ -105,14 +102,14 @@ where /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details. pub(crate) fn transaction_control( &self, - connector: &str, + connector_props: &ConnectorProperties, ) -> Result { let Some(accessor) = &self.value_accessor else { return Err(AccessError::Other(anyhow!( "value_accessor must be provided to parse transaction metadata" ))); }; - parse_transaction_meta(accessor, connector) + parse_transaction_meta(accessor, connector_props) } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1d80aee6ca21..80c70c59842d 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -159,7 +159,7 @@ pub struct SourceContext { pub source_info: SourceInfo, pub metrics: Arc, pub source_ctrl_opts: SourceCtrlOpts, - pub connector: String, + pub connector_props: ConnectorProperties, error_suppressor: Option>>, } impl SourceContext { @@ -170,7 +170,7 @@ impl SourceContext { metrics: Arc, source_ctrl_opts: SourceCtrlOpts, connector_client: Option, - connector: String, + connector_props: ConnectorProperties, ) -> Self { Self { connector_client, @@ -182,7 +182,7 @@ impl SourceContext { metrics, source_ctrl_opts, error_suppressor: None, - connector, + connector_props, } } @@ -194,7 +194,7 @@ impl SourceContext { source_ctrl_opts: SourceCtrlOpts, connector_client: Option, error_suppressor: Arc>, - connector: String, + connector_props: ConnectorProperties, ) -> Self { let mut ctx = Self::new( actor_id, @@ -203,7 +203,7 @@ impl SourceContext { metrics, source_ctrl_opts, connector_client, - connector, + connector_props, ); ctx.error_suppressor = Some(error_suppressor); ctx @@ -387,6 +387,12 @@ pub trait SplitReader: Sized + Send { for_all_sources!(impl_connector_properties); +impl Default for ConnectorProperties { + fn default() -> Self { + ConnectorProperties::Test(Box::default()) + } +} + impl ConnectorProperties { pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { with_properties diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 7d68d6ab6ffb..cb9c7dae3d11 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -203,7 +203,7 @@ impl CommonSplitReader for CdcSplitReader { while let Some(result) = rx.recv().await { let GetEventStreamResponse { events, .. } = result?; - tracing::debug!("receive {} cdc events ", events.len()); + tracing::trace!("receive {} cdc events ", events.len()); metrics .connector_source_rows_received .with_label_values(&[source_type.as_str_name(), &source_id]) diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index 26c51c63540b..6c10ff9934ee 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -115,7 +115,7 @@ pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard { pub const TEST_CONNECTOR: &str = "test"; -#[derive(Clone, Debug, WithOptions)] +#[derive(Clone, Debug, Default, WithOptions)] pub struct TestSourceProperties { properties: HashMap, } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 96570e37b6e4..ecb1846c0124 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -46,7 +46,6 @@ pub struct ConnectorSource { pub columns: Vec, pub parser_config: SpecificParserConfig, pub connector_message_buffer_size: usize, - pub connector: String, } #[derive(Clone, Debug)] @@ -64,7 +63,6 @@ impl ConnectorSource { columns: Vec, connector_message_buffer_size: usize, parser_config: SpecificParserConfig, - connector: String, ) -> Result { let config = ConnectorProperties::extract(properties, false) .map_err(|e| ConnectorError(e.into()))?; @@ -74,7 +72,6 @@ impl ConnectorSource { columns, parser_config, connector_message_buffer_size, - connector, }) } diff --git a/src/source/src/source_desc.rs b/src/source/src/source_desc.rs index 1b85acf9e4db..aed602aa71ff 100644 --- a/src/source/src/source_desc.rs +++ b/src/source/src/source_desc.rs @@ -56,7 +56,6 @@ pub struct SourceDescBuilder { connector_params: ConnectorParams, connector_message_buffer_size: usize, pk_indices: Vec, - connector: String, } impl SourceDescBuilder { @@ -70,7 +69,6 @@ impl SourceDescBuilder { connector_params: ConnectorParams, connector_message_buffer_size: usize, pk_indices: Vec, - connector: String, ) -> Self { Self { columns, @@ -81,7 +79,6 @@ impl SourceDescBuilder { connector_params, connector_message_buffer_size, pk_indices, - connector, } } @@ -110,7 +107,6 @@ impl SourceDescBuilder { columns.clone(), self.connector_message_buffer_size, psrser_config, - self.connector, )?; Ok(SourceDesc { @@ -204,7 +200,6 @@ pub mod test_utils { connector_params: Default::default(), connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, pk_indices, - connector: "test".to_string(), } } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 88b7cf30606c..0b002eec6d84 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -171,7 +171,7 @@ impl FsFetchExecutor { self.source_ctrl_opts.clone(), self.connector_params.connector_client.clone(), self.actor_ctx.error_suppressor.clone(), - source_desc.source.connector.clone(), + source_desc.source.config.clone(), ) } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 552a03646f7d..6275ef5d116f 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -96,12 +96,6 @@ impl FsSourceExecutor { .iter() .map(|column_desc| column_desc.column_id) .collect_vec(); - let connector = source_desc - .source - .properties - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); let source_ctx = SourceContext::new_with_suppressor( self.actor_ctx.id, self.stream_source_core.source_id, @@ -110,7 +104,7 @@ impl FsSourceExecutor { self.source_ctrl_opts.clone(), None, self.actor_ctx.error_suppressor.clone(), - connector, + source_desc.source.config.clone(), ); source_desc .source diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index ff81527ad815..1bb61789f135 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -25,7 +25,7 @@ use risingwave_connector::source::{ BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState, }; -use risingwave_connector::{dispatch_source_prop, ConnectorParams}; +use risingwave_connector::ConnectorParams; use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; use risingwave_storage::StateStore; use thiserror_ext::AsReport; @@ -106,7 +106,7 @@ impl SourceExecutor { self.source_ctrl_opts.clone(), self.connector_params.connector_client.clone(), self.actor_ctx.error_suppressor.clone(), - source_desc.source.connector.clone(), + source_desc.source.config.clone(), ); source_desc .source diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index c90cf4344d54..dc6b718f566e 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -48,11 +48,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { let source_name = source.source_name.clone(); let source_info = source.get_info()?; let properties = ConnectorProperties::extract(source.with_properties.clone(), false)?; - let connector = source - .with_properties - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); let source_desc_builder = SourceDescBuilder::new( source.columns.clone(), params.env.source_metrics(), @@ -62,7 +57,6 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { params.env.connector_params(), params.env.config().developer.connector_message_buffer_size, params.info.pk_indices.clone(), - connector, ); let source_ctrl_opts = SourceCtrlOpts { chunk_size: params.env.config().developer.chunk_size, diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 2d5480ce4ea5..8234188e9a4f 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -83,12 +83,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { } } - let connector = source - .with_properties - .get("connector") - .map(|c| c.to_ascii_lowercase()) - .unwrap_or_default(); - let source_desc_builder = SourceDescBuilder::new( source_columns.clone(), params.env.source_metrics(), @@ -107,7 +101,6 @@ impl ExecutorBuilder for SourceExecutorBuilder { // We should consdier add back the "pk_column_ids" field removed by #8841 in // StreamSource params.info.pk_indices.clone(), - connector.clone(), ); let source_ctrl_opts = SourceCtrlOpts { @@ -132,6 +125,11 @@ impl ExecutorBuilder for SourceExecutorBuilder { state_table_handler, ); + let connector = source + .with_properties + .get("connector") + .map(|c| c.to_ascii_lowercase()) + .unwrap_or_default(); let is_fs_connector = FS_CONNECTORS.contains(&connector.as_str()); let is_fs_v2_connector = ConnectorProperties::is_new_fs_connector_hash_map(&source.with_properties); From 475d870789e2989e1cd0274b8028ea9d93b4d16b Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 5 Jan 2024 16:24:02 +0800 Subject: [PATCH 6/6] minor --- .../src/optimizer/plan_node/stream_cdc_table_scan.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index dc711a26243b..24bc2dd5f0b6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -350,12 +350,6 @@ mod tests { filter_expr.eval_row(&row2).await.unwrap(), Some(ScalarImpl::Bool(false)) ); - assert_eq!( - filter_expr - .eval_row(&row3) - .await - .unwrap_or(Some(ScalarImpl::Bool(false))), - Some(ScalarImpl::Bool(false)) - ) + assert_eq!(filter_expr.eval_row(&row3).await.unwrap(), None) } }