Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mysql-cdc): enable heartbeat message to keep the offset up to date with upstream #12868

Merged
merged 6 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e_test/source/cdc/mysql_cdc.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DROP DATABASE IF EXISTS mydb;
DROP DATABASE IF EXISTS `my@db`;
CREATE DATABASE `my@db`;

USE `my@db`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,58 @@ public void handleBatch(
List<ChangeEvent<SourceRecord, SourceRecord>> events,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
var builder = GetEventStreamResponse.newBuilder();
var respBuilder = GetEventStreamResponse.newBuilder();
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
if (isHeartbeatEvent(record)) {
// skip heartbeat events
continue;
}
// ignore null record
if (record.value() == null) {
committer.markProcessed(event);
continue;
}
byte[] payload =
converter.fromConnectData(record.topic(), record.valueSchema(), record.value());

// serialize the offset to a JSON, so that kernel doesn't need to
// aware the layout of it
boolean isHeartbeat = isHeartbeatEvent(record);
DebeziumOffset offset =
new DebeziumOffset(record.sourcePartition(), record.sourceOffset());
new DebeziumOffset(
record.sourcePartition(), record.sourceOffset(), isHeartbeat);
// serialize the offset to a JSON, so that kernel doesn't need to
// aware its layout
String offsetStr = "";
try {
byte[] serialized = DebeziumOffsetSerializer.INSTANCE.serialize(offset);
offsetStr = new String(serialized, StandardCharsets.UTF_8);
} catch (IOException e) {
LOG.warn("failed to serialize debezium offset", e);
}
var message =

var msgBuilder =
CdcMessage.newBuilder()
.setOffset(offsetStr)
.setPartition(String.valueOf(sourceId))
.setPayload(new String(payload, StandardCharsets.UTF_8))
.build();
LOG.debug("record => {}", message.getPayload());
builder.addEvents(message);
committer.markProcessed(event);
.setPartition(String.valueOf(sourceId));

if (isHeartbeat) {
var message = msgBuilder.build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
} else {
// ignore null record
if (record.value() == null) {
committer.markProcessed(event);
continue;
}
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());

msgBuilder.setPayload(new String(payload, StandardCharsets.UTF_8)).build();
var message = msgBuilder.build();
LOG.debug("record => {}", message.getPayload());

respBuilder.addEvents(message);
committer.markProcessed(event);
}
}
builder.setSourceId(sourceId);
var response = builder.build();
outputChannel.put(response);

// skip empty batch
if (respBuilder.getEventsCount() > 0) {
respBuilder.setSourceId(sourceId);
var response = respBuilder.build();
outputChannel.put(response);
}

committer.markBatchFinished();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void start(long channelPtr) {
config.getSourceType().toString(),
String.valueOf(config.getSourceId()),
resp.getEventsCount());
LOG.info(
LOG.debug(
"Engine#{}: emit one chunk {} events to network ",
config.getSourceId(),
resp.getEventsCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ database.server.id=${server.id}
# set connector timezone to UTC(+00:00)
database.connectionTimeZone=+00:00

# default heartbeat interval 60 seconds
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000}
heartbeat.topics.prefix=${debezium.heartbeat.topics.prefix:-RW_CDC_HeartBeat_}
name=${hostname}:${port}:${database.name}.${table.name}

provide.transaction.metadata=${transactional:-false}
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ public void testLines() throws InterruptedException, SQLException {
int count = 0;
while (eventStream.hasNext()) {
List<CdcMessage> messages = eventStream.next().getEventsList();
for (CdcMessage ignored : messages) {
count++;
for (CdcMessage msg : messages) {
if (!msg.getPayload().isBlank()) {
count++;
}
}
if (count == 10000) {
if (count >= 10000) {
return count;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ public void testLines() throws Exception {
while (eventStream.hasNext()) {
List<ConnectorServiceProto.CdcMessage> messages =
eventStream.next().getEventsList();
for (ConnectorServiceProto.CdcMessage ignored : messages) {
count++;
for (ConnectorServiceProto.CdcMessage msg : messages) {
if (!msg.getPayload().isBlank()) {
count++;
}
}
if (count == 10000) {
if (count >= 10000) {
return count;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SourceTestClient {
static final Logger LOG = LoggerFactory.getLogger(SourceTestClient.class.getName());

// default port for connector service
static final int DEFAULT_PORT = 50051;
static final int DEFAULT_PORT = 60051;
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
private final ConnectorServiceGrpc.ConnectorServiceBlockingStub blockingStub;

public Properties sqlStmts = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ public class DebeziumOffset implements Serializable {

public Map<String, ?> sourcePartition;
public Map<String, ?> sourceOffset;
public boolean isHeartbeat;

public DebeziumOffset() {}

public DebeziumOffset(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
public DebeziumOffset(
Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, boolean isHeartbeat) {
this.sourcePartition = sourcePartition;
this.sourceOffset = sourceOffset;
this.isHeartbeat = isHeartbeat;
}

public void setSourcePartition(Map<String, ?> sourcePartition) {
Expand All @@ -59,10 +62,16 @@ public void setSourceOffset(Map<String, ?> sourceOffset) {
this.sourceOffset = sourceOffset;
}

public void setHeartbeat(boolean heartbeat) {
isHeartbeat = heartbeat;
}

@Override
public String toString() {
return "DebeziumOffset{"
+ "sourcePartition="
+ "isHeartbeat="
+ isHeartbeat
+ ", sourcePartition="
+ sourcePartition
+ ", sourceOffset="
+ sourceOffset
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl MockOffsetGenExecutor {
txid: None,
tx_usec: None,
},
is_heartbeat: false,
};

self.start_offset += 1;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use crate::parser::canal::operators::*;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::unified::ChangeEventOperation;
use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter};
use crate::parser::{
ByteStreamSourceParser, JsonProperties, ParserFormat, SourceStreamChunkRowWriter,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

const DATA: &str = "data";
Expand Down Expand Up @@ -121,6 +123,10 @@ impl ByteStreamSourceParser for CanalJsonParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::CanalJson
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::types::{Date, Decimal, Time, Timestamp, Timestamptz};
use super::unified::{AccessError, AccessResult};
use super::{ByteStreamSourceParser, CsvProperties};
use crate::only_parse_payload;
use crate::parser::SourceStreamChunkRowWriter;
use crate::parser::{ParserFormat, SourceStreamChunkRowWriter};
use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef};

macro_rules! parse {
Expand Down Expand Up @@ -152,6 +152,10 @@ impl ByteStreamSourceParser for CsvParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Csv
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::parser::unified::debezium::DebeziumChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties,
ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
ParseResult, ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter,
SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

Expand Down Expand Up @@ -127,6 +128,10 @@ impl ByteStreamSourceParser for DebeziumParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Debezium
}

#[allow(clippy::unused_async)] // false positive for `async_trait`
async fn parse_one<'a>(
&'a mut self,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/debezium/mongo_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::only_parse_payload;
use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjection};
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::parser::{ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -112,6 +112,10 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::DebeziumMongo
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use crate::parser::schema_registry::handle_sr_list;
use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer;
use crate::parser::unified::AccessImpl;
use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter};
use crate::parser::{
AccessBuilder, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -181,6 +183,10 @@ impl ByteStreamSourceParser for JsonParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Json
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/maxwell/maxwell_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::only_parse_payload;
use crate::parser::unified::maxwell::MaxwellChangeEvent;
use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
use crate::parser::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserFormat,
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
Expand Down Expand Up @@ -74,6 +74,10 @@ impl ByteStreamSourceParser for MaxwellParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Maxwell
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
24 changes: 24 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,18 @@ pub enum ParseResult {
TransactionControl(TransactionControl),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ParserFormat {
CanalJson,
Csv,
Json,
Maxwell,
Debezium,
DebeziumMongo,
Upsert,
Plain,
}

/// `ByteStreamSourceParser` is a new message parser, the parser should consume
/// the input data stream and return a stream of parsed msgs.
pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
Expand All @@ -405,6 +417,9 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
/// The source context, used to report parsing error.
fn source_ctx(&self) -> &SourceContext;

/// The format of the specific parser.
fn parser_format(&self) -> ParserFormat;

/// Parse one record from the given `payload` and write rows to the `writer`.
///
/// Returns error if **any** of the rows in the message failed to parse.
Expand Down Expand Up @@ -512,6 +527,15 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
if parser.parser_format() == ParserFormat::Debezium {
tracing::debug!("heartbeat message {}, skip parser", msg.offset);
// empty payload means a heartbeat in cdc source
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
.entry(msg.split_id)
.or_insert(msg.offset.clone());
Comment on lines +534 to +536
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should apply to all kinds of messages that have empty keys and payloads? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this patch only affect direct cdc use cases, because I am not sure when it would break other connectors.

}

continue;
}
let parse_span = tracing::info_span!(
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{
SourceStreamChunkRowWriter, SpecificParserConfig,
};
use crate::only_parse_payload;
use crate::parser::ParserFormat;
use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};

#[derive(Debug)]
Expand Down Expand Up @@ -75,6 +76,10 @@ impl ByteStreamSourceParser for PlainParser {
&self.source_ctx
}

fn parser_format(&self) -> ParserFormat {
ParserFormat::Plain
}

async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
Expand Down
Loading
Loading