Skip to content

Commit

Permalink
fix(mysql-cdc): enable heartbeat message to keep the offset up to dat…
Browse files Browse the repository at this point in the history
…e with upstream (#12868) (#12886)

Co-authored-by: StrikeW <[email protected]>
  • Loading branch information
github-actions[bot] and StrikeW authored Oct 17, 2023
1 parent 98512f0 commit b25a080
Show file tree
Hide file tree
Showing 24 changed files with 203 additions and 62 deletions.
2 changes: 1 addition & 1 deletion e2e_test/source/cdc/mysql_cdc.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP DATABASE IF EXISTS mydb;
DROP DATABASE IF EXISTS `my@db`;
CREATE DATABASE `my@db`;

USE `my@db`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,58 @@ public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
var builder = GetEventStreamResponse.newBuilder();
var respBuilder = GetEventStreamResponse.newBuilder();
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
if (isHeartbeatEvent(record)) {
// skip heartbeat events
continue;
}
// ignore null record
if (record.value() == null) {
committer.markProcessed(event);
continue;
}
byte[] payload =
converter.fromConnectData(record.topic(), record.valueSchema(), record.value());

// serialize the offset to a JSON, so that kernel doesn't need to
// aware the layout of it
boolean isHeartbeat = isHeartbeatEvent(record);
DebeziumOffset offset =
new DebeziumOffset(record.sourcePartition(), record.sourceOffset());
new DebeziumOffset(
record.sourcePartition(), record.sourceOffset(), isHeartbeat);
// serialize the offset to a JSON, so that kernel doesn't need to
// aware its layout
String offsetStr = "";
try {
byte[] serialized = DebeziumOffsetSerializer.INSTANCE.serialize(offset);
offsetStr = new String(serialized, StandardCharsets.UTF_8);
} catch (IOException e) {
LOG.warn("failed to serialize debezium offset", e);
}
var message =

var msgBuilder =
CdcMessage.newBuilder()
.setOffset(offsetStr)
.setPartition(String.valueOf(sourceId))
.setPayload(new String(payload, StandardCharsets.UTF_8))
.build();
LOG.debug("record => {}", message.getPayload());
builder.addEvents(message);
committer.markProcessed(event);
.setPartition(String.valueOf(sourceId));

if (isHeartbeat) {
var message = msgBuilder.build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
} else {
// ignore null record
if (record.value() == null) {
committer.markProcessed(event);
continue;
}
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());

msgBuilder.setPayload(new String(payload, StandardCharsets.UTF_8)).build();
var message = msgBuilder.build();
LOG.debug("record => {}", message.getPayload());

respBuilder.addEvents(message);
committer.markProcessed(event);
}
}
builder.setSourceId(sourceId);
var response = builder.build();
outputChannel.put(response);

// skip empty batch
if (respBuilder.getEventsCount() > 0) {
respBuilder.setSourceId(sourceId);
var response = respBuilder.build();
outputChannel.put(response);
}

committer.markBatchFinished();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void start(long channelPtr) {
config.getSourceType().toString(),
String.valueOf(config.getSourceId()),
resp.getEventsCount());
LOG.info(
LOG.debug(
"Engine#{}: emit one chunk {} events to network ",
config.getSourceId(),
resp.getEventsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ database.server.id=${server.id}
# set connector timezone to UTC(+00:00)
database.connectionTimeZone=+00:00

# default heartbeat interval 60 seconds
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
heartbeat.topics.prefix=${debezium.heartbeat.topics.prefix:-RW_CDC_HeartBeat_}
name=${hostname}:${port}:${database.name}.${table.name}

provide.transaction.metadata=${transactional:-false}
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ public void testLines() throws InterruptedException, SQLException {
int count = 0;
while (eventStream.hasNext()) {
List<CdcMessage> messages = eventStream.next().getEventsList();
for (CdcMessage ignored : messages) {
count++;
for (CdcMessage msg : messages) {
if (!msg.getPayload().isBlank()) {
count++;
}
}
if (count == 10000) {
if (count >= 10000) {
return count;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ public void testLines() throws Exception {
while (eventStream.hasNext()) {
List<ConnectorServiceProto.CdcMessage> messages =
eventStream.next().getEventsList();
for (ConnectorServiceProto.CdcMessage ignored : messages) {
count++;
for (ConnectorServiceProto.CdcMessage msg : messages) {
if (!msg.getPayload().isBlank()) {
count++;
}
}
if (count == 10000) {
if (count >= 10000) {
return count;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SourceTestClient {
static final Logger LOG = LoggerFactory.getLogger(SourceTestClient.class.getName());

// default port for connector service
static final int DEFAULT_PORT = 50051;
static final int DEFAULT_PORT = 60051;
private final ConnectorServiceGrpc.ConnectorServiceBlockingStub blockingStub;

public Properties sqlStmts = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ public class DebeziumOffset implements Serializable {

public Map<String, ?> sourcePartition;
public Map<String, ?> sourceOffset;
public boolean isHeartbeat;

public DebeziumOffset() {}

public DebeziumOffset(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
public DebeziumOffset(
Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, boolean isHeartbeat) {
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
this.isHeartbeat = isHeartbeat;
}

public void setSourcePartition(Map<String, ?> sourcePartition) {
Expand All @@ -59,10 +62,16 @@ public void setSourceOffset(Map<String, ?> sourceOffset) {
this.sourceOffset = sourceOffset;
}

public void setHeartbeat(boolean heartbeat) {
isHeartbeat = heartbeat;
}

@Override
public String toString() {
return "DebeziumOffset{"
+ "sourcePartition="
+ "isHeartbeat="
+ isHeartbeat
+ ", sourcePartition="
+ sourcePartition
+ ", sourceOffset="
+ sourceOffset
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl MockOffsetGenExecutor {
txid: None,
tx_usec: None,
},
is_heartbeat: false,
};

self.start_offset += 1;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use crate::parser::canal::operators::*;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::unified::ChangeEventOperation;
use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter};
use crate::parser::{
ByteStreamSourceParser, JsonProperties, ParserFormat, SourceStreamChunkRowWriter,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

const DATA: &str = "data";
Expand Down Expand Up @@ -121,6 +123,10 @@ impl ByteStreamSourceParser for CanalJsonParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::CanalJson
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::types::{Date, Decimal, Time, Timestamp, Timestamptz};
use super::unified::{AccessError, AccessResult};
use super::{ByteStreamSourceParser, CsvProperties};
use crate::only_parse_payload;
use crate::parser::SourceStreamChunkRowWriter;
use crate::parser::{ParserFormat, SourceStreamChunkRowWriter};
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};

macro_rules! parse {
Expand Down Expand Up @@ -152,6 +152,10 @@ impl ByteStreamSourceParser for CsvParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Csv
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties,
ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
ParseResult, ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter,
SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -127,6 +128,10 @@ impl ByteStreamSourceParser for DebeziumParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Debezium
}

#[allow(clippy::unused_async)] // false positive for `async_trait`
async fn parse_one<'a>(
&'a mut self,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::only_parse_payload;
use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjection};
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::parser::{ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -112,6 +112,10 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::DebeziumMongo
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use crate::parser::schema_registry::handle_sr_list;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::parser::{
AccessBuilder, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -181,6 +183,10 @@ impl ByteStreamSourceParser for JsonParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Json
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/maxwell/maxwell_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::only_parse_payload;
use crate::parser::unified::maxwell::MaxwellChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserFormat,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
Expand Down Expand Up @@ -74,6 +74,10 @@ impl ByteStreamSourceParser for MaxwellParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Maxwell
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
24 changes: 24 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,18 @@ pub enum ParseResult {
TransactionControl(TransactionControl),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ParserFormat {
CanalJson,
Csv,
Json,
Maxwell,
Debezium,
DebeziumMongo,
Upsert,
Plain,
}

/// `ByteStreamSourceParser` is a new message parser, the parser should consume
/// the input data stream and return a stream of parsed msgs.
pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
Expand All @@ -405,6 +417,9 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
/// The source context, used to report parsing error.
fn source_ctx(&self) -> &SourceContext;

/// The format of the specific parser.
fn parser_format(&self) -> ParserFormat;

/// Parse one record from the given `payload` and write rows to the `writer`.
///
/// Returns error if **any** of the rows in the message failed to parse.
Expand Down Expand Up @@ -512,6 +527,15 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
if parser.parser_format() == ParserFormat::Debezium {
tracing::debug!("heartbeat message {}, skip parser", msg.offset);
// empty payload means a heartbeat in cdc source
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());
}

continue;
}
let parse_span = tracing::info_span!(
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::only_parse_payload;
use crate::parser::ParserFormat;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -75,6 +76,10 @@ impl ByteStreamSourceParser for PlainParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Plain
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
Loading

0 comments on commit b25a080

Please sign in to comment.