Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): support transaction for shared cdc source #14375

Merged
merged 7 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

package com.risingwave.connector.source.core;

import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX;
import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX;
import static io.debezium.schema.AbstractTopicNamingStrategy.*;

import com.risingwave.connector.api.source.CdcEngine;
import com.risingwave.proto.ConnectorServiceProto;
Expand All @@ -36,11 +37,14 @@ public DbzCdcEngine(
long sourceId,
Properties config,
DebeziumEngine.CompletionCallback completionCallback) {
var dbzHeartbeatPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name());
var heartbeatTopicPrefix = config.getProperty(TOPIC_HEARTBEAT_PREFIX.name());
var topicPrefix = config.getProperty(TOPIC_PREFIX.name());
var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC);
var consumer =
new DbzCdcEventConsumer(
sourceId,
dbzHeartbeatPrefix,
heartbeatTopicPrefix,
transactionTopic,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

enum EventType {
HEARTBEAT,
TRANSACTION,
DATA,
}

public class DbzCdcEventConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class);
Expand All @@ -42,14 +48,18 @@ public class DbzCdcEventConsumer
private final long sourceId;
private final JsonConverter converter;
private final String heartbeatTopicPrefix;
private final String transactionTopic;

DbzCdcEventConsumer(
long sourceId,
String heartbeatTopicPrefix,
BlockingQueue<GetEventStreamResponse> store) {
String transactionTopic,
BlockingQueue<GetEventStreamResponse> queue) {
this.sourceId = sourceId;
this.outputChannel = store;
this.outputChannel = queue;
this.heartbeatTopicPrefix = heartbeatTopicPrefix;
this.transactionTopic = transactionTopic;
LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic);

// The default JSON converter will output the schema field in the JSON which is unnecessary
// to source parser, we use a customized JSON converter to avoid outputting the `schema`
Expand All @@ -64,13 +74,28 @@ public class DbzCdcEventConsumer
this.converter = jsonConverter;
}

private EventType getEventType(SourceRecord record) {
if (isHeartbeatEvent(record)) {
return EventType.HEARTBEAT;
} else if (isTransactionMetaEvent(record)) {
return EventType.TRANSACTION;
} else {
return EventType.DATA;
}
}

private boolean isHeartbeatEvent(SourceRecord record) {
String topic = record.topic();
return topic != null
&& heartbeatTopicPrefix != null
&& topic.startsWith(heartbeatTopicPrefix);
}

private boolean isTransactionMetaEvent(SourceRecord record) {
String topic = record.topic();
return topic != null && topic.equals(transactionTopic);
}

@Override
public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
Expand All @@ -79,10 +104,12 @@ public void handleBatch(
var respBuilder = GetEventStreamResponse.newBuilder();
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
boolean isHeartbeat = isHeartbeatEvent(record);
EventType eventType = getEventType(record);
DebeziumOffset offset =
new DebeziumOffset(
record.sourcePartition(), record.sourceOffset(), isHeartbeat);
record.sourcePartition(),
record.sourceOffset(),
(eventType == EventType.HEARTBEAT));
// serialize the offset to a JSON, so that kernel doesn't need to
// aware its layout
String offsetStr = "";
Expand All @@ -98,43 +125,68 @@ var record = event.value();
.setOffset(offsetStr)
.setPartition(String.valueOf(sourceId));

if (isHeartbeat) {
var message = msgBuilder.build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
} else {

// Topic naming conventions
// - PG: serverName.schemaName.tableName
// - MySQL: serverName.databaseName.tableName
// We can extract the full table name from the topic
var fullTableName = record.topic().substring(record.topic().indexOf('.') + 1);

// ignore null record
if (record.value() == null) {
committer.markProcessed(event);
continue;
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
var message = msgBuilder.build();
LOG.debug("record => {}", message.getPayload());

respBuilder.addEvents(message);
committer.markProcessed(event);
switch (eventType) {
case HEARTBEAT:
{
var message = msgBuilder.build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
break;
}
case TRANSACTION:
{
long trxTs = ((Struct) record.value()).getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
.setIsTransactionMeta(true)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(trxTs)
.build();
LOG.debug("transaction => {}", message);
respBuilder.addEvents(message);
break;
}
case DATA:
{
// Topic naming conventions
// - PG: serverName.schemaName.tableName
// - MySQL: serverName.databaseName.tableName
// We can extract the full table name from the topic
var fullTableName =
record.topic().substring(record.topic().indexOf('.') + 1);

// ignore null record
if (record.value() == null) {
break;
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug("record => {}", message.getPayload());
respBuilder.addEvents(message);
break;
}
default:
break;
}

// mark the event as processed
committer.markProcessed(event);
}

// skip empty batch
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ message CdcMessage {
string offset = 3;
string full_table_name = 4;
int64 source_ts_ms = 5;
bool is_transaction_meta = 6;
}

enum SourceType {
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl SourceExecutor {
self.metrics,
self.source_ctrl_opts.clone(),
None,
ConnectorProperties::default(),
));
let stream = self
.connector_source
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ mod test {
)?),
rw_columns: Vec::default(),
source_ctx: Default::default(),
transaction_meta_builder: None,
})
}

Expand Down
77 changes: 76 additions & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ impl DebeziumParser {
Err(err) => {
// Only try to access transaction control message if the row operation access failed
// to make it a fast path.
if let Ok(transaction_control) = row_op.transaction_control() {
if let Ok(transaction_control) =
row_op.transaction_control(&self.source_ctx.connector_props)
{
Ok(ParseResult::TransactionControl(transaction_control))
} else {
Err(err)?
Expand Down Expand Up @@ -151,3 +153,76 @@ impl ByteStreamSourceParser for DebeziumParser {
self.parse_inner(key, payload, writer).await
}
}

#[cfg(test)]
mod tests {
use std::ops::Deref;
use std::sync::Arc;

use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId};

use super::*;
use crate::parser::{SourceStreamChunkBuilder, TransactionControl};
use crate::source::{ConnectorProperties, DataType};

#[tokio::test]
async fn test_parse_transaction_metadata() {
let schema = vec![
ColumnCatalog {
column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb),
is_hidden: false,
},
ColumnCatalog::offset_column(),
ColumnCatalog::cdc_table_name_column(),
];

let columns = schema
.iter()
.map(|c| SourceColumnDesc::from(&c.column_desc))
.collect::<Vec<_>>();

let props = SpecificParserConfig {
key_encoding_config: None,
encoding_config: EncodingProperties::Json(JsonProperties {
use_schema_registry: false,
}),
protocol_config: ProtocolProperties::Debezium,
};
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

// "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
let res = parser
.parse_one_with_txn(
None,
Some(begin_msg.as_bytes().to_vec()),
builder.row_writer(),
)
.await;
match res {
Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
assert_eq!(id.deref(), "35352");
}
_ => panic!("unexpected parse result: {:?}", res),
}
let res = parser
.parse_one_with_txn(
None,
Some(commit_msg.as_bytes().to_vec()),
builder.row_writer(),
)
.await;
match res {
Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
assert_eq!(id.deref(), "35352");
}
_ => panic!("unexpected parse result: {:?}", res),
}
}
}
5 changes: 4 additions & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub struct SourceStreamChunkRowWriter<'a> {
/// The meta data of the original message for a row writer.
///
/// Extracted from the `SourceMessage`.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug)]
pub struct MessageMeta<'a> {
meta: &'a SourceMeta,
split_id: &'a str,
Expand Down Expand Up @@ -665,13 +665,15 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
if let Some(Transaction { id: current_id, .. }) = &current_transaction {
tracing::warn!(current_id, id, "already in transaction");
}
tracing::debug!("begin upstream transaction: id={}", id);
current_transaction = Some(Transaction { id, len: 0 });
}
TransactionControl::Commit { id } => {
let current_id = current_transaction.as_ref().map(|t| &t.id);
if current_id != Some(&id) {
tracing::warn!(?current_id, id, "transaction id mismatch");
}
tracing::debug!("commit upstream transaction: id={}", id);
current_transaction = None;
}
}
Expand All @@ -692,6 +694,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield_asap = false;

yield StreamChunkWithState {
chunk: builder.take(0),
split_offset_mapping: Some(std::mem::take(&mut split_offset_mapping)),
Expand Down
Loading
Loading