Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Jul 30, 2024
1 parent 950a09d commit 27a7017
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ var record = event.value();
switch (eventType) {
case HEARTBEAT:
{
var message = msgBuilder.build();
var message =
msgBuilder.setMsgType(CdcMessage.CdcMessageType.HEARTBEAT).build();
LOG.debug("heartbeat => {}", message.getOffset());
respBuilder.addEvents(message);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ table.include.list=${database.name}.${table.name:-*}
schema.history.internal.store.only.captured.tables.ddl=true
schema.history.internal.store.only.captured.databases.ddl=true
# default to disable schema change events
include.schema.changes=${debezium.include.schema.changes:-true}
include.schema.changes=${debezium.include.schema.changes:-false}
database.server.id=${server.id}
# default to use unencrypted connection
database.ssl.mode=${ssl.mode:-disabled}
Expand Down
9 changes: 5 additions & 4 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ message SinkCoordinatorStreamResponse {
message CdcMessage {
enum CdcMessageType {
UNSPECIFIED = 0;
DATA = 1;
TRANSACTION_META = 2;
SCHEMA_CHANGE = 3;
HEARTBEAT = 1;
DATA = 2;
TRANSACTION_META = 3;
SCHEMA_CHANGE = 4;
}

// The value of the Debezium message
Expand All @@ -161,7 +162,7 @@ message CdcMessage {
string full_table_name = 4;
int64 source_ts_ms = 5;

// Deprecated: use `type` instead
// Deprecated: use `msg_type` instead
reserved "is_transaction_meta";
reserved 6;

Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl PlainParser {
&& let Some(data) = payload
{
match cdc_meta.msg_type {
CdcMessageType::Data => {
CdcMessageType::Data | CdcMessageType::Heartbeat => {
return self.parse_rows(key, Some(data), writer).await;
}
CdcMessageType::TransactionMeta => {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl PlainParser {
Err(err) => Err(err)?,
};
}
CdcMessageType::Unknown => {
CdcMessageType::Unspecified => {
unreachable!()
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::source::SourceMeta;

#[derive(Clone, Debug)]
pub enum CdcMessageType {
Unknown,
Unspecified,
Heartbeat,
Data,
TransactionMeta,
SchemaChange,
Expand All @@ -30,9 +31,10 @@ impl From<cdc_message::CdcMessageType> for CdcMessageType {
fn from(msg_type: cdc_message::CdcMessageType) -> Self {
match msg_type {
cdc_message::CdcMessageType::Data => CdcMessageType::Data,
cdc_message::CdcMessageType::Heartbeat => CdcMessageType::Heartbeat,
cdc_message::CdcMessageType::TransactionMeta => CdcMessageType::TransactionMeta,
cdc_message::CdcMessageType::SchemaChange => CdcMessageType::SchemaChange,
cdc_message::CdcMessageType::Unspecified => CdcMessageType::Unknown,
cdc_message::CdcMessageType::Unspecified => CdcMessageType::Unspecified,
}
}
}
Expand Down

0 comments on commit 27a7017

Please sign in to comment.