From bce261b39ffa9b69e4acf7b397b2e50ee31d2d4d Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 8 Jan 2024 11:22:41 +0800 Subject: [PATCH] feat(cdc): support transaction for shared cdc source (#14375) --- .../connector/source/core/DbzCdcEngine.java | 10 +- .../source/core/DbzCdcEventConsumer.java | 132 +++++--- proto/connector_service.proto | 1 + src/batch/src/executor/source.rs | 1 + src/connector/src/parser/avro/parser.rs | 1 + .../src/parser/debezium/debezium_parser.rs | 77 ++++- src/connector/src/parser/mod.rs | 5 +- src/connector/src/parser/plain_parser.rs | 297 +++++++++++++++++- src/connector/src/parser/unified/debezium.rs | 77 +++-- src/connector/src/source/base.rs | 11 + .../src/source/cdc/source/message.rs | 3 + src/connector/src/source/cdc/source/reader.rs | 2 +- src/connector/src/source/test_source.rs | 2 +- .../plan_node/stream_cdc_table_scan.rs | 7 + .../src/executor/source/fetch_executor.rs | 1 + .../src/executor/source/fs_source_executor.rs | 1 + .../src/executor/source/source_executor.rs | 1 + 17 files changed, 551 insertions(+), 78 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java index 311d329ffeb57..61d1f6284a67f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java @@ -14,7 +14,8 @@ package com.risingwave.connector.source.core; -import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX; +import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX; +import static io.debezium.schema.AbstractTopicNamingStrategy.*; import com.risingwave.connector.api.source.CdcEngine; import com.risingwave.proto.ConnectorServiceProto; @@ -36,11 +37,14 @@ public DbzCdcEngine( long sourceId, Properties config, DebeziumEngine.CompletionCallback completionCallback) { - var dbzHeartbeatPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name()); + var heartbeatTopicPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name()); + var topicPrefix = config.getProperty(TOPIC_PREFIX.name()); + var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC); var consumer = new DbzCdcEventConsumer( sourceId, - dbzHeartbeatPrefix, + heartbeatTopicPrefix, + transactionTopic, new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY)); // Builds a debezium engine but not start it diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index ac46691780c39..f0880d52c8b57 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -34,6 +34,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +enum EventType { + HEARTBEAT, + TRANSACTION, + DATA, +} + public class DbzCdcEventConsumer implements DebeziumEngine.ChangeConsumer> { static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class); @@ -42,14 +48,18 @@ public class DbzCdcEventConsumer private final long sourceId; private final JsonConverter converter; private final String heartbeatTopicPrefix; + private final String transactionTopic; DbzCdcEventConsumer( long sourceId, String heartbeatTopicPrefix, - BlockingQueue store) { + String transactionTopic, + BlockingQueue queue) { this.sourceId = sourceId; - this.outputChannel = store; + this.outputChannel = queue; this.heartbeatTopicPrefix = heartbeatTopicPrefix; + this.transactionTopic = transactionTopic; + LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic); // The default JSON converter will output the schema field in the JSON which is unnecessary // to source parser, we use a customized JSON converter to avoid outputting the `schema` @@ -64,6 +74,16 @@ public class DbzCdcEventConsumer this.converter = jsonConverter; } + private EventType getEventType(SourceRecord record) { + if (isHeartbeatEvent(record)) { + return EventType.HEARTBEAT; + } else if (isTransactionMetaEvent(record)) { + return EventType.TRANSACTION; + } else { + return EventType.DATA; + } + } + private boolean isHeartbeatEvent(SourceRecord record) { String topic = record.topic(); return topic != null @@ -71,6 +91,11 @@ private boolean isHeartbeatEvent(SourceRecord record) { && topic.startsWith(heartbeatTopicPrefix); } + private boolean isTransactionMetaEvent(SourceRecord record) { + String topic = record.topic(); + return topic != null && topic.equals(transactionTopic); + } + @Override public void handleBatch( List> events, @@ -79,10 +104,12 @@ public void handleBatch( var respBuilder = GetEventStreamResponse.newBuilder(); for (ChangeEvent event : events) { var record = event.value(); - boolean isHeartbeat = isHeartbeatEvent(record); + EventType eventType = getEventType(record); DebeziumOffset offset = new DebeziumOffset( - record.sourcePartition(), record.sourceOffset(), isHeartbeat); + record.sourcePartition(), + record.sourceOffset(), + (eventType == EventType.HEARTBEAT)); // serialize the offset to a JSON, so that kernel doesn't need to // aware its layout String offsetStr = ""; @@ -98,43 +125,68 @@ var record = event.value(); .setOffset(offsetStr) .setPartition(String.valueOf(sourceId)); - if (isHeartbeat) { - var message = msgBuilder.build(); - LOG.debug("heartbeat => {}", message.getOffset()); - respBuilder.addEvents(message); - } else { - - // Topic naming conventions - // - PG: serverName.schemaName.tableName - // - MySQL: serverName.databaseName.tableName - // We can extract the full table name from the topic - var fullTableName = record.topic().substring(record.topic().indexOf('.') + 1); - - // ignore null record - if (record.value() == null) { - committer.markProcessed(event); - continue; - } - // get upstream event time from the "source" field - var sourceStruct = ((Struct) record.value()).getStruct("source"); - long sourceTsMs = - sourceStruct == null - ? System.currentTimeMillis() - : sourceStruct.getInt64("ts_ms"); - byte[] payload = - converter.fromConnectData( - record.topic(), record.valueSchema(), record.value()); - msgBuilder - .setFullTableName(fullTableName) - .setPayload(new String(payload, StandardCharsets.UTF_8)) - .setSourceTsMs(sourceTsMs) - .build(); - var message = msgBuilder.build(); - LOG.debug("record => {}", message.getPayload()); - - respBuilder.addEvents(message); - committer.markProcessed(event); + switch (eventType) { + case HEARTBEAT: + { + var message = msgBuilder.build(); + LOG.debug("heartbeat => {}", message.getOffset()); + respBuilder.addEvents(message); + break; + } + case TRANSACTION: + { + long trxTs = ((Struct) record.value()).getInt64("ts_ms"); + byte[] payload = + converter.fromConnectData( + record.topic(), record.valueSchema(), record.value()); + var message = + msgBuilder + .setIsTransactionMeta(true) + .setPayload(new String(payload, StandardCharsets.UTF_8)) + .setSourceTsMs(trxTs) + .build(); + LOG.debug("transaction => {}", message); + respBuilder.addEvents(message); + break; + } + case DATA: + { + // Topic naming conventions + // - PG: serverName.schemaName.tableName + // - MySQL: serverName.databaseName.tableName + // We can extract the full table name from the topic + var fullTableName = + record.topic().substring(record.topic().indexOf('.') + 1); + + // ignore null record + if (record.value() == null) { + break; + } + // get upstream event time from the "source" field + var sourceStruct = ((Struct) record.value()).getStruct("source"); + long sourceTsMs = + sourceStruct == null + ? System.currentTimeMillis() + : sourceStruct.getInt64("ts_ms"); + byte[] payload = + converter.fromConnectData( + record.topic(), record.valueSchema(), record.value()); + var message = + msgBuilder + .setFullTableName(fullTableName) + .setPayload(new String(payload, StandardCharsets.UTF_8)) + .setSourceTsMs(sourceTsMs) + .build(); + LOG.debug("record => {}", message.getPayload()); + respBuilder.addEvents(message); + break; + } + default: + break; } + + // mark the event as processed + committer.markProcessed(event); } // skip empty batch diff --git a/proto/connector_service.proto b/proto/connector_service.proto index ddda62c6aace5..c7603cbb3f504 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -160,6 +160,7 @@ message CdcMessage { string offset = 3; string full_table_name = 4; int64 source_ts_ms = 5; + bool is_transaction_meta = 6; } enum SourceType { diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index a60398ef12f7b..2714d5335b906 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -146,6 +146,7 @@ impl SourceExecutor { self.metrics, self.source_ctrl_opts.clone(), None, + ConnectorProperties::default(), )); let stream = self .connector_source diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 28e37fd0e0935..10e000a4fdab7 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -288,6 +288,7 @@ mod test { )?), rw_columns: Vec::default(), source_ctx: Default::default(), + transaction_meta_builder: None, }) } diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index b0a18e8c930d1..0f79677860f8d 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -109,7 +109,9 @@ impl DebeziumParser { Err(err) => { // Only try to access transaction control message if the row operation access failed // to make it a fast path. - if let Ok(transaction_control) = row_op.transaction_control() { + if let Ok(transaction_control) = + row_op.transaction_control(&self.source_ctx.connector_props) + { Ok(ParseResult::TransactionControl(transaction_control)) } else { Err(err)? @@ -151,3 +153,76 @@ impl ByteStreamSourceParser for DebeziumParser { self.parse_inner(key, payload, writer).await } } + +#[cfg(test)] +mod tests { + use std::ops::Deref; + use std::sync::Arc; + + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + + use super::*; + use crate::parser::{SourceStreamChunkBuilder, TransactionControl}; + use crate::source::{ConnectorProperties, DataType}; + + #[tokio::test] + async fn test_parse_transaction_metadata() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + let props = SpecificParserConfig { + key_encoding_config: None, + encoding_config: EncodingProperties::Json(JsonProperties { + use_schema_registry: false, + }), + protocol_config: ProtocolProperties::Debezium, + }; + let mut source_ctx = SourceContext::default(); + source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default()); + let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) + .await + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + + // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + let res = parser + .parse_one_with_txn( + None, + Some(begin_msg.as_bytes().to_vec()), + builder.row_writer(), + ) + .await; + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => { + assert_eq!(id.deref(), "35352"); + } + _ => panic!("unexpected parse result: {:?}", res), + } + let res = parser + .parse_one_with_txn( + None, + Some(commit_msg.as_bytes().to_vec()), + builder.row_writer(), + ) + .await; + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => { + assert_eq!(id.deref(), "35352"); + } + _ => panic!("unexpected parse result: {:?}", res), + } + } +} diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 8e878f19ef123..1c165b45660e9 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -158,7 +158,7 @@ pub struct SourceStreamChunkRowWriter<'a> { /// The meta data of the original message for a row writer. /// /// Extracted from the `SourceMessage`. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub struct MessageMeta<'a> { meta: &'a SourceMeta, split_id: &'a str, @@ -665,6 +665,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { tracing::warn!(current_id, id, "already in transaction"); } + tracing::debug!("begin upstream transaction: id={}", id); current_transaction = Some(Transaction { id, len: 0 }); } TransactionControl::Commit { id } => { @@ -672,6 +673,7 @@ async fn into_chunk_stream(mut parser: P, data_stream if current_id != Some(&id) { tracing::warn!(?current_id, id, "transaction id mismatch"); } + tracing::debug!("commit upstream transaction: id={}", id); current_transaction = None; } } @@ -692,6 +694,7 @@ async fn into_chunk_stream(mut parser: P, data_stream // If we are not in a transaction, we should yield the chunk now. if current_transaction.is_none() { yield_asap = false; + yield StreamChunkWithState { chunk: builder.take(0), split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)), diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index f113c279f2ef6..4f15234be8371 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -20,12 +20,14 @@ use super::{ SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::parser::bytes_parser::BytesAccessBuilder; +use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder; +use crate::parser::unified::debezium::parse_transaction_meta; use crate::parser::unified::upsert::UpsertChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer_with_op; use crate::parser::unified::{AccessImpl, ChangeEventOperation}; use crate::parser::upsert_parser::get_key_column_name; -use crate::parser::{BytesProperties, ParserFormat}; -use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; +use crate::parser::{BytesProperties, ParseResult, ParserFormat}; +use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta}; #[derive(Debug)] pub struct PlainParser { @@ -33,6 +35,8 @@ pub struct PlainParser { pub payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, pub source_ctx: SourceContextRef, + // parsing transaction metadata for shared cdc source + pub transaction_meta_builder: Option, } impl PlainParser { @@ -64,11 +68,16 @@ impl PlainParser { ))); } }; + + let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson( + DebeziumJsonAccessBuilder::new()?, + )); Ok(Self { key_builder, payload_builder, rw_columns, source_ctx, + transaction_meta_builder, }) } @@ -77,7 +86,25 @@ impl PlainParser { key: Option>, payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result<()> { + ) -> Result { + // if the message is transaction metadata, parse it and return + if let Some(msg_meta) = writer.row_meta + && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.meta + && cdc_meta.is_transaction_meta + && let Some(data) = payload + { + let accessor = self + .transaction_meta_builder + .as_mut() + .expect("expect transaction metadata access builder") + .generate_accessor(data) + .await?; + return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props) { + Ok(transaction_control) => Ok(ParseResult::TransactionControl(transaction_control)), + Err(err) => Err(err)?, + }; + } + // reuse upsert component but always insert let mut row_op: UpsertChangeEvent, AccessImpl<'_, '_>> = UpsertChangeEvent::default(); @@ -94,8 +121,14 @@ impl PlainParser { row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?); } - apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op) - .map_err(Into::into) + Ok( + apply_row_operation_on_stream_chunk_writer_with_op( + row_op, + &mut writer, + change_event_op, + ) + .map(|_| ParseResult::Rows)?, + ) } } @@ -113,11 +146,263 @@ impl ByteStreamSourceParser for PlainParser { } async fn parse_one<'a>( + &'a mut self, + _key: Option>, + _payload: Option>, + _writer: SourceStreamChunkRowWriter<'a>, + ) -> Result<()> { + unreachable!("should call `parse_one_with_txn` instead") + } + + async fn parse_one_with_txn<'a>( &'a mut self, key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, - ) -> Result<()> { + ) -> Result { + tracing::info!("parse_one_with_txn"); self.parse_inner(key, payload, writer).await } } + +#[cfg(test)] +mod tests { + use std::ops::Deref; + use std::sync::Arc; + + use futures::executor::block_on; + use futures::StreamExt; + use futures_async_stream::try_stream; + use itertools::Itertools; + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + + use super::*; + use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl}; + use crate::source::cdc::DebeziumCdcMeta; + use crate::source::{ConnectorProperties, DataType, SourceMessage, SplitId}; + + #[tokio::test] + async fn test_emit_transactional_chunk() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + let mut source_ctx = SourceContext::default(); + source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default()); + let source_ctx = Arc::new(source_ctx); + // format plain encode json parser + let parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + source_ctx.clone(), + ) + .await + .unwrap(); + + let mut transactional = false; + // for untransactional source, we expect emit a chunk for each message batch + let message_stream = source_message_stream(transactional); + let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed()); + let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) + .into_iter() + .collect(); + let output = output + .unwrap() + .into_iter() + .filter(|c| c.chunk.cardinality() > 0) + .enumerate() + .map(|(i, c)| { + if i == 0 { + // begin + 3 data messages + assert_eq!(4, c.chunk.cardinality()); + } + if i == 1 { + // 2 data messages + 1 end + assert_eq!(3, c.chunk.cardinality()); + } + c.chunk + }) + .collect_vec(); + + // 2 chunks for 2 message batches + assert_eq!(2, output.len()); + + // format plain encode json parser + let parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + source_ctx, + ) + .await + .unwrap(); + + // for transactional source, we expect emit a single chunk for the transaction + transactional = true; + let message_stream = source_message_stream(transactional); + let chunk_stream = crate::parser::into_chunk_stream(parser, message_stream.boxed()); + let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) + .into_iter() + .collect(); + let output = output + .unwrap() + .into_iter() + .filter(|c| c.chunk.cardinality() > 0) + .map(|c| { + // 5 data messages in a single chunk + assert_eq!(5, c.chunk.cardinality()); + c.chunk + }) + .collect_vec(); + + // a single transactional chunk + assert_eq!(1, output.len()); + } + + #[try_stream(ok = Vec, error = anyhow::Error)] + async fn source_message_stream(transactional: bool) { + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + let data_batches = vec![ + vec![ + r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + ], + vec![ + r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#, + ], + ]; + for (i, batch) in data_batches.iter().enumerate() { + let mut source_msg_batch = vec![]; + if i == 0 { + // put begin message at first + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: transactional, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(begin_msg.as_bytes().to_vec()), + }); + } + // put data messages + for data_msg in batch { + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: false, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(data_msg.as_bytes().to_vec()), + }); + } + if i == data_batches.len() - 1 { + // put commit message at last + source_msg_batch.push(SourceMessage { + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: transactional, + }), + split_id: SplitId::from("1001"), + offset: "0".into(), + key: None, + payload: Some(commit_msg.as_bytes().to_vec()), + }); + } + yield source_msg_batch; + } + } + + #[tokio::test] + async fn test_parse_transaction_metadata() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + // format plain encode json parser + let mut source_ctx = SourceContext::default(); + source_ctx.connector_props = ConnectorProperties::MysqlCdc(Box::default()); + let mut parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + Arc::new(source_ctx), + ) + .await + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + + // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN + let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; + let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; + + let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta { + full_table_name: "orders".to_string(), + source_ts_ms: 0, + is_transaction_meta: true, + }); + let msg_meta = MessageMeta { + meta: &cdc_meta, + split_id: "1001", + offset: "", + }; + + let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23"; + let res = parser + .parse_one_with_txn( + None, + Some(begin_msg.as_bytes().to_vec()), + builder.row_writer().with_meta(msg_meta), + ) + .await; + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => { + assert_eq!(id.deref(), expect_tx_id); + } + _ => panic!("unexpected parse result: {:?}", res), + } + let res = parser + .parse_one_with_txn( + None, + Some(commit_msg.as_bytes().to_vec()), + builder.row_writer().with_meta(msg_meta), + ) + .await; + match res { + Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => { + assert_eq!(id.deref(), expect_tx_id); + } + _ => panic!("unexpected parse result: {:?}", res), + } + + let output = builder.take(10); + assert_eq!(0, output.cardinality()); + } +} diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 6163cfe5c486c..7291b1b359735 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use super::{Access, AccessError, ChangeEvent, ChangeEventOperation}; use crate::parser::TransactionControl; -use crate::source::SourceColumnDesc; +use crate::source::{ConnectorProperties, SourceColumnDesc}; pub struct DebeziumChangeEvent { value_accessor: Option, @@ -26,8 +27,8 @@ pub struct DebeziumChangeEvent { const BEFORE: &str = "before"; const AFTER: &str = "after"; const OP: &str = "op"; -const TRANSACTION_STATUS: &str = "status"; -const TRANSACTION_ID: &str = "id"; +pub const TRANSACTION_STATUS: &str = "status"; +pub const TRANSACTION_ID: &str = "id"; pub const DEBEZIUM_READ_OP: &str = "r"; pub const DEBEZIUM_CREATE_OP: &str = "c"; @@ -37,6 +38,44 @@ pub const DEBEZIUM_DELETE_OP: &str = "d"; pub const DEBEZIUM_TRANSACTION_STATUS_BEGIN: &str = "BEGIN"; pub const DEBEZIUM_TRANSACTION_STATUS_COMMIT: &str = "END"; +pub fn parse_transaction_meta( + accessor: &impl Access, + connector_props: &ConnectorProperties, +) -> std::result::Result { + if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( + accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, + accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, + ) { + // The id field has different meanings for different databases: + // PG: txID:LSN + // MySQL: source_id:transaction_id (e.g. 3E11FA47-71CA-11E1-9E33-C80AA9429562:23) + match status.as_ref() { + DEBEZIUM_TRANSACTION_STATUS_BEGIN => match *connector_props { + ConnectorProperties::PostgresCdc(_) => { + let (tx_id, _) = id.split_once(':').unwrap(); + return Ok(TransactionControl::Begin { id: tx_id.into() }); + } + ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Begin { id }), + _ => {} + }, + DEBEZIUM_TRANSACTION_STATUS_COMMIT => match *connector_props { + ConnectorProperties::PostgresCdc(_) => { + let (tx_id, _) = id.split_once(':').unwrap(); + return Ok(TransactionControl::Commit { id: tx_id.into() }); + } + ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Commit { id }), + _ => {} + }, + _ => {} + } + } + + Err(AccessError::Undefined { + name: "transaction status".into(), + path: TRANSACTION_STATUS.into(), + }) +} + impl DebeziumChangeEvent where A: Access, @@ -61,28 +100,16 @@ where /// Returns the transaction metadata if exists. /// /// See the [doc](https://debezium.io/documentation/reference/2.3/connectors/postgresql.html#postgresql-transaction-metadata) of Debezium for more details. - pub(crate) fn transaction_control(&self) -> Result { - if let Some(accessor) = &self.value_accessor { - if let (Some(ScalarImpl::Utf8(status)), Some(ScalarImpl::Utf8(id))) = ( - accessor.access(&[TRANSACTION_STATUS], Some(&DataType::Varchar))?, - accessor.access(&[TRANSACTION_ID], Some(&DataType::Varchar))?, - ) { - match status.as_ref() { - DEBEZIUM_TRANSACTION_STATUS_BEGIN => { - return Ok(TransactionControl::Begin { id }) - } - DEBEZIUM_TRANSACTION_STATUS_COMMIT => { - return Ok(TransactionControl::Commit { id }) - } - _ => {} - } - } - } - - Err(AccessError::Undefined { - name: "transaction status".into(), - path: Default::default(), - }) + pub(crate) fn transaction_control( + &self, + connector_props: &ConnectorProperties, + ) -> Result { + let Some(accessor) = &self.value_accessor else { + return Err(AccessError::Other(anyhow!( + "value_accessor must be provided to parse transaction metadata" + ))); + }; + parse_transaction_meta(accessor, connector_props) } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1a4a594d44285..80c70c59842d5 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -159,6 +159,7 @@ pub struct SourceContext { pub source_info: SourceInfo, pub metrics: Arc, pub source_ctrl_opts: SourceCtrlOpts, + pub connector_props: ConnectorProperties, error_suppressor: Option>>, } impl SourceContext { @@ -169,6 +170,7 @@ impl SourceContext { metrics: Arc, source_ctrl_opts: SourceCtrlOpts, connector_client: Option, + connector_props: ConnectorProperties, ) -> Self { Self { connector_client, @@ -180,6 +182,7 @@ impl SourceContext { metrics, source_ctrl_opts, error_suppressor: None, + connector_props, } } @@ -191,6 +194,7 @@ impl SourceContext { source_ctrl_opts: SourceCtrlOpts, connector_client: Option, error_suppressor: Arc>, + connector_props: ConnectorProperties, ) -> Self { let mut ctx = Self::new( actor_id, @@ -199,6 +203,7 @@ impl SourceContext { metrics, source_ctrl_opts, connector_client, + connector_props, ); ctx.error_suppressor = Some(error_suppressor); ctx @@ -382,6 +387,12 @@ pub trait SplitReader: Sized + Send { for_all_sources!(impl_connector_properties); +impl Default for ConnectorProperties { + fn default() -> Self { + ConnectorProperties::Test(Box::default()) + } +} + impl ConnectorProperties { pub fn is_new_fs_connector_b_tree_map(with_properties: &BTreeMap) -> bool { with_properties diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index 04518f7088b4b..28fe52c52cd1e 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -22,6 +22,8 @@ pub struct DebeziumCdcMeta { pub full_table_name: String, // extracted from `payload.source.ts_ms`, the time that the change event was made in the database pub source_ts_ms: i64, + // Whether the message is a transaction metadata + pub is_transaction_meta: bool, } impl From for SourceMessage { @@ -38,6 +40,7 @@ impl From for SourceMessage { meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { full_table_name: message.full_table_name, source_ts_ms: message.source_ts_ms, + is_transaction_meta: message.is_transaction_meta, }), } } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index cbc39ee76290b..fae70f1e8d04b 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -205,7 +205,7 @@ impl CommonSplitReader for CdcSplitReader { while let Some(result) = rx.recv().await { let GetEventStreamResponse { events, .. } = result?; - tracing::trace!("receive events {:?}", events.len()); + tracing::trace!("receive {} cdc events ", events.len()); metrics .connector_source_rows_received .with_label_values(&[source_type.as_str_name(), &source_id]) diff --git a/src/connector/src/source/test_source.rs b/src/connector/src/source/test_source.rs index 26c51c63540b9..6c10ff9934eef 100644 --- a/src/connector/src/source/test_source.rs +++ b/src/connector/src/source/test_source.rs @@ -115,7 +115,7 @@ pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard { pub const TEST_CONNECTOR: &str = "test"; -#[derive(Clone, Debug, WithOptions)] +#[derive(Clone, Debug, Default, WithOptions)] pub struct TestSourceProperties { properties: HashMap, } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index c26a1ed41aaf0..24bc2dd5f0b60 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -326,6 +326,7 @@ mod tests { async fn test_cdc_filter_expr() { let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); + let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap(); let row1 = OwnedRow::new(vec![ Some(t1_json.into()), Some(r#"{"file": "1.binlog", "pos": 100}"#.into()), @@ -335,6 +336,11 @@ mod tests { Some(r#"{"file": "2.binlog", "pos": 100}"#.into()), ]); + let row3 = OwnedRow::new(vec![ + Some(trx_json.into()), + Some(r#"{"file": "3.binlog", "pos": 100}"#.into()), + ]); + let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("t1"); assert_eq!( filter_expr.eval_row(&row1).await.unwrap(), @@ -344,5 +350,6 @@ mod tests { filter_expr.eval_row(&row2).await.unwrap(), Some(ScalarImpl::Bool(false)) ); + assert_eq!(filter_expr.eval_row(&row3).await.unwrap(), None) } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 32cd08e9c87fe..0b002eec6d846 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -171,6 +171,7 @@ impl FsFetchExecutor { self.source_ctrl_opts.clone(), self.connector_params.connector_client.clone(), self.actor_ctx.error_suppressor.clone(), + source_desc.source.config.clone(), ) } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 85967a253ba91..6275ef5d116f6 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -104,6 +104,7 @@ impl FsSourceExecutor { self.source_ctrl_opts.clone(), None, self.actor_ctx.error_suppressor.clone(), + source_desc.source.config.clone(), ); source_desc .source diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 433d54431ced6..1bb61789f1359 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -106,6 +106,7 @@ impl SourceExecutor { self.source_ctrl_opts.clone(), self.connector_params.connector_client.clone(), self.actor_ctx.error_suppressor.clone(), + source_desc.source.config.clone(), ); source_desc .source