Skip to content

Commit

Permalink
feat(cdc): support transaction for shared cdc source (#14375)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored and StrikeW committed Jan 8, 2024
1 parent f10f7b5 commit bce261b
Show file tree
Hide file tree
Showing 17 changed files with 551 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

package com.risingwave.connector.source.core;

import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX;
import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX;
import static io.debezium.schema.AbstractTopicNamingStrategy.*;

import com.risingwave.connector.api.source.CdcEngine;
import com.risingwave.proto.ConnectorServiceProto;
Expand All @@ -36,11 +37,14 @@ public DbzCdcEngine(
long sourceId,
Properties config,
DebeziumEngine.CompletionCallback completionCallback) {
var dbzHeartbeatPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name());
var heartbeatTopicPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name());
var topicPrefix = config.getProperty(TOPIC_PREFIX.name());
var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC);
var consumer =
new DbzCdcEventConsumer(
sourceId,
dbzHeartbeatPrefix,
heartbeatTopicPrefix,
transactionTopic,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

enum EventType {
HEARTBEAT,
TRANSACTION,
DATA,
}

public class DbzCdcEventConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class);
Expand All @@ -42,14 +48,18 @@ public class DbzCdcEventConsumer
private final long sourceId;
private final JsonConverter converter;
private final String heartbeatTopicPrefix;
private final String transactionTopic;

DbzCdcEventConsumer(
long sourceId,
String heartbeatTopicPrefix,
BlockingQueue<GetEventStreamResponse> store) {
String transactionTopic,
BlockingQueue<GetEventStreamResponse> queue) {
this.sourceId = sourceId;
this.outputChannel = store;
this.outputChannel = queue;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.transactionTopic = transactionTopic;
LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic);

// The default JSON converter will output the schema field in the JSON which is unnecessary
// to source parser, we use a customized JSON converter to avoid outputting the `schema`
Expand All @@ -64,13 +74,28 @@ public class DbzCdcEventConsumer
this.converter = jsonConverter;
}

private EventType getEventType(SourceRecord record) {
if (isHeartbeatEvent(record)) {
return EventType.HEARTBEAT;
} else if (isTransactionMetaEvent(record)) {
return EventType.TRANSACTION;
} else {
return EventType.DATA;
}
}

private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic();
return topic != null
&& heartbeatTopicPrefix != null
&& topic.startsWith(heartbeatTopicPrefix);
}

private boolean isTransactionMetaEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.equals(transactionTopic);
}

@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
Expand All @@ -79,10 +104,12 @@ public void handleBatch(
var respBuilder = GetEventStreamResponse.newBuilder();
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
boolean isHeartbeat = isHeartbeatEvent(record);
EventType eventType = getEventType(record);
DebeziumOffset offset =
new DebeziumOffset(
record.sourcePartition(), record.sourceOffset(), isHeartbeat);
record.sourcePartition(),
record.sourceOffset(),
(eventType == EventType.HEARTBEAT));
// serialize the offset to a JSON, so that kernel doesn't need to
// aware its layout
String offsetStr = "";
Expand All @@ -98,43 +125,68 @@ var record = event.value();
.setOffset(offsetStr)
.setPartition(String.valueOf(sourceId));

if (isHeartbeat) {
var message = msgBuilder.build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
} else {

// Topic naming conventions
// - PG: serverName.schemaName.tableName
// - MySQL: serverName.databaseName.tableName
// We can extract the full table name from the topic
var fullTableName = record.topic().substring(record.topic().indexOf('.') + 1);

// ignore null record
if (record.value() == null) {
committer.markProcessed(event);
continue;
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
var message = msgBuilder.build();
LOG.debug("record => {}", message.getPayload());

respBuilder.addEvents(message);
committer.markProcessed(event);
switch (eventType) {
case HEARTBEAT:
{
var message = msgBuilder.build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
break;
}
case TRANSACTION:
{
long trxTs = ((Struct) record.value()).getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
.setIsTransactionMeta(true)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(trxTs)
.build();
LOG.debug("transaction => {}", message);
respBuilder.addEvents(message);
break;
}
case DATA:
{
// Topic naming conventions
// - PG: serverName.schemaName.tableName
// - MySQL: serverName.databaseName.tableName
// We can extract the full table name from the topic
var fullTableName =
record.topic().substring(record.topic().indexOf('.') + 1);

// ignore null record
if (record.value() == null) {
break;
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug("record => {}", message.getPayload());
respBuilder.addEvents(message);
break;
}
default:
break;
}

// mark the event as processed
committer.markProcessed(event);
}

// skip empty batch
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ message CdcMessage {
string offset = 3;
string full_table_name = 4;
int64 source_ts_ms = 5;
bool is_transaction_meta = 6;
}

enum SourceType {
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl SourceExecutor {
self.metrics,
self.source_ctrl_opts.clone(),
None,
ConnectorProperties::default(),
));
let stream = self
.connector_source
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ mod test {
)?),
rw_columns: Vec::default(),
source_ctx: Default::default(),
transaction_meta_builder: None,
})
}

Expand Down
77 changes: 76 additions & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ impl DebeziumParser {
Err(err) => {
// Only try to access transaction control message if the row operation access failed
// to make it a fast path.
if let Ok(transaction_control) = row_op.transaction_control() {
if let Ok(transaction_control) =
row_op.transaction_control(&self.source_ctx.connector_props)
{
Ok(ParseResult::TransactionControl(transaction_control))
} else {
Err(err)?
Expand Down Expand Up @@ -151,3 +153,76 @@ impl ByteStreamSourceParser for DebeziumParser {
self.parse_inner(key, payload, writer).await
}
}

#[cfg(test)]
mod tests {
use std::ops::Deref;
use std::sync::Arc;

use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};

use super::*;
use crate::parser::{SourceStreamChunkBuilder, TransactionControl};
use crate::source::{ConnectorProperties, DataType};

#[tokio::test]
async fn test_parse_transaction_metadata() {
let schema = vec![
ColumnCatalog {
column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb),
is_hidden: false,
},
ColumnCatalog::offset_column(),
ColumnCatalog::cdc_table_name_column(),
];

let columns = schema
.iter()
.map(|c| SourceColumnDesc::from(&c.column_desc))
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
};
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

// "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
let res = parser
.parse_one_with_txn(
None,
Some(begin_msg.as_bytes().to_vec()),
builder.row_writer(),
)
.await;
match res {
Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
assert_eq!(id.deref(), "35352");
}
_ => panic!("unexpected parse result: {:?}", res),
}
let res = parser
.parse_one_with_txn(
None,
Some(commit_msg.as_bytes().to_vec()),
builder.row_writer(),
)
.await;
match res {
Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
assert_eq!(id.deref(), "35352");
}
_ => panic!("unexpected parse result: {:?}", res),
}
}
}
5 changes: 4 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub struct SourceStreamChunkRowWriter<'a> {
/// The meta data of the original message for a row writer.
///
/// Extracted from the `SourceMessage`.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug)]
pub struct MessageMeta<'a> {
meta: &'a SourceMeta,
split_id: &'a str,
Expand Down Expand Up @@ -665,13 +665,15 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
if let Some(Transaction { id: current_id, .. }) = &current_transaction {
tracing::warn!(current_id, id, "already in transaction");
}
tracing::debug!("begin upstream transaction: id={}", id);
current_transaction = Some(Transaction { id, len: 0 });
}
TransactionControl::Commit { id } => {
let current_id = current_transaction.as_ref().map(|t| &t.id);
if current_id != Some(&id) {
tracing::warn!(?current_id, id, "transaction id mismatch");
}
tracing::debug!("commit upstream transaction: id={}", id);
current_transaction = None;
}
}
Expand All @@ -692,6 +694,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield_asap = false;

yield StreamChunkWithState {
chunk: builder.take(0),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
Expand Down
Loading

0 comments on commit bce261b

Please sign in to comment.