diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 89e5274ac3ee..81d7a46f1787 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -1,4 +1,4 @@ -DROP DATABASE IF EXISTS mydb; +DROP DATABASE IF EXISTS `my@db`; CREATE DATABASE `my@db`; USE `my@db`; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java index 9ec96c90f41f..9f3fc3d17e39 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java @@ -72,25 +72,15 @@ public void handleBatch( List> events, DebeziumEngine.RecordCommitter> committer) throws InterruptedException { - var builder = GetEventStreamResponse.newBuilder(); + var respBuilder = GetEventStreamResponse.newBuilder(); for (ChangeEvent event : events) { var record = event.value(); - if (isHeartbeatEvent(record)) { - // skip heartbeat events - continue; - } - // ignore null record - if (record.value() == null) { - committer.markProcessed(event); - continue; - } - byte[] payload = - converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); - - // serialize the offset to a JSON, so that kernel doesn't need to - // aware the layout of it + boolean isHeartbeat = isHeartbeatEvent(record); DebeziumOffset offset = - new DebeziumOffset(record.sourcePartition(), record.sourceOffset()); + new DebeziumOffset( + record.sourcePartition(), record.sourceOffset(), isHeartbeat); + // serialize the offset to a JSON, so that kernel doesn't need to + // aware its layout String offsetStr = ""; try { byte[] serialized = DebeziumOffsetSerializer.INSTANCE.serialize(offset); @@ -98,19 +88,42 @@ var record = event.value(); } catch (IOException e) { LOG.warn("failed to serialize debezium offset", e); } - var message = + + var msgBuilder = CdcMessage.newBuilder() .setOffset(offsetStr) - .setPartition(String.valueOf(sourceId)) - .setPayload(new String(payload, StandardCharsets.UTF_8)) - .build(); - LOG.debug("record => {}", message.getPayload()); - builder.addEvents(message); - committer.markProcessed(event); + .setPartition(String.valueOf(sourceId)); + + if (isHeartbeat) { + var message = msgBuilder.build(); + LOG.debug("heartbeat => {}", message.getOffset()); + respBuilder.addEvents(message); + } else { + // ignore null record + if (record.value() == null) { + committer.markProcessed(event); + continue; + } + byte[] payload = + converter.fromConnectData( + record.topic(), record.valueSchema(), record.value()); + + msgBuilder.setPayload(new String(payload, StandardCharsets.UTF_8)).build(); + var message = msgBuilder.build(); + LOG.debug("record => {}", message.getPayload()); + + respBuilder.addEvents(message); + committer.markProcessed(event); + } } - builder.setSourceId(sourceId); - var response = builder.build(); - outputChannel.put(response); + + // skip empty batch + if (respBuilder.getEventsCount() > 0) { + respBuilder.setSourceId(sourceId); + var response = respBuilder.build(); + outputChannel.put(response); + } + committer.markBatchFinished(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index b590fdf3da8b..73e8875f4581 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -78,7 +78,7 @@ public void start(long channelPtr) { config.getSourceType().toString(), String.valueOf(config.getSourceId()), resp.getEventsCount()); - LOG.info( + LOG.debug( "Engine#{}: emit one chunk {} events to network ", config.getSourceId(), resp.getEventsCount()); diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties index 6a269dc064a5..cc64723b66b6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties @@ -20,6 +20,9 @@ database.server.id=${server.id} # set connector timezone to UTC(+00:00) database.connectionTimeZone=+00:00 +# default heartbeat interval 60 seconds +heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000} +heartbeat.topics.prefix=${debezium.heartbeat.topics.prefix:-RW_CDC_HeartBeat_} name=${hostname}:${port}:${database.name}.${table.name} provide.transaction.metadata=${transactional:-false} diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java index 294ebffcc70e..92028255d28e 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/MySQLSourceTest.java @@ -124,10 +124,12 @@ public void testLines() throws InterruptedException, SQLException { int count = 0; while (eventStream.hasNext()) { List messages = eventStream.next().getEventsList(); - for (CdcMessage ignored : messages) { - count++; + for (CdcMessage msg : messages) { + if (!msg.getPayload().isBlank()) { + count++; + } } - if (count == 10000) { + if (count >= 10000) { return count; } } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index b673f533948e..4fff4e7f50ad 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -130,10 +130,12 @@ public void testLines() throws Exception { while (eventStream.hasNext()) { List messages = eventStream.next().getEventsList(); - for (ConnectorServiceProto.CdcMessage ignored : messages) { - count++; + for (ConnectorServiceProto.CdcMessage msg : messages) { + if (!msg.getPayload().isBlank()) { + count++; + } } - if (count == 10000) { + if (count >= 10000) { return count; } } diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java index 738d0f850a39..359746bc90f7 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java @@ -41,7 +41,7 @@ public class SourceTestClient { static final Logger LOG = LoggerFactory.getLogger(SourceTestClient.class.getName()); // default port for connector service - static final int DEFAULT_PORT = 50051; + static final int DEFAULT_PORT = 60051; private final ConnectorServiceGrpc.ConnectorServiceBlockingStub blockingStub; public Properties sqlStmts = new Properties(); diff --git a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/DebeziumOffset.java b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/DebeziumOffset.java index 6555df7d8649..670765105cf6 100644 --- a/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/DebeziumOffset.java +++ b/java/connector-node/risingwave-source-cdc/src/main/java/com/risingwave/connector/cdc/debezium/internal/DebeziumOffset.java @@ -43,12 +43,15 @@ public class DebeziumOffset implements Serializable { public Map sourcePartition; public Map sourceOffset; + public boolean isHeartbeat; public DebeziumOffset() {} - public DebeziumOffset(Map sourcePartition, Map sourceOffset) { + public DebeziumOffset( + Map sourcePartition, Map sourceOffset, boolean isHeartbeat) { this.sourcePartition = sourcePartition; this.sourceOffset = sourceOffset; + this.isHeartbeat = isHeartbeat; } public void setSourcePartition(Map sourcePartition) { @@ -59,10 +62,16 @@ public void setSourceOffset(Map sourceOffset) { this.sourceOffset = sourceOffset; } + public void setHeartbeat(boolean heartbeat) { + isHeartbeat = heartbeat; + } + @Override public String toString() { return "DebeziumOffset{" - + "sourcePartition=" + + "isHeartbeat=" + + isHeartbeat + + ", sourcePartition=" + sourcePartition + ", sourceOffset=" + sourceOffset diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index fbc7b419f13d..b3e39ece9500 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -83,6 +83,7 @@ impl MockOffsetGenExecutor { txid: None, tx_usec: None, }, + is_heartbeat: false, }; self.start_offset += 1; diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index f6ddbac07eca..f238af2053ce 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -22,7 +22,9 @@ use crate::parser::canal::operators::*; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::unified::ChangeEventOperation; -use crate::parser::{ByteStreamSourceParser, JsonProperties, SourceStreamChunkRowWriter}; +use crate::parser::{ + ByteStreamSourceParser, JsonProperties, ParserFormat, SourceStreamChunkRowWriter, +}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; const DATA: &str = "data"; @@ -121,6 +123,10 @@ impl ByteStreamSourceParser for CanalJsonParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::CanalJson + } + async fn parse_one<'a>( &'a mut self, _key: Option>, diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index da8882eaf1ed..6513b293239b 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -19,7 +19,7 @@ use risingwave_common::types::{Date, Decimal, Time, Timestamp, Timestamptz}; use super::unified::{AccessError, AccessResult}; use super::{ByteStreamSourceParser, CsvProperties}; use crate::only_parse_payload; -use crate::parser::SourceStreamChunkRowWriter; +use crate::parser::{ParserFormat, SourceStreamChunkRowWriter}; use crate::source::{DataType, SourceColumnDesc, SourceContext, SourceContextRef}; macro_rules! parse { @@ -152,6 +152,10 @@ impl ByteStreamSourceParser for CsvParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::Csv + } + async fn parse_one<'a>( &'a mut self, _key: Option>, diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index 0fab32bc1adb..58ac1d85857c 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -22,7 +22,8 @@ use crate::parser::unified::debezium::DebeziumChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, JsonProperties, - ParseResult, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig, + ParseResult, ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, + SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -127,6 +128,10 @@ impl ByteStreamSourceParser for DebeziumParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::Debezium + } + #[allow(clippy::unused_async)] // false positive for `async_trait` async fn parse_one<'a>( &'a mut self, diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index cd10fb2dc21f..1217a49c42c9 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -23,7 +23,7 @@ use crate::only_parse_payload; use crate::parser::unified::debezium::{DebeziumChangeEvent, MongoProjection}; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; -use crate::parser::{ByteStreamSourceParser, SourceStreamChunkRowWriter}; +use crate::parser::{ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -112,6 +112,10 @@ impl ByteStreamSourceParser for DebeziumMongoJsonParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::DebeziumMongo + } + async fn parse_one<'a>( &'a mut self, _key: Option>, diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index c173599f7d50..5d269337b0d5 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -32,7 +32,9 @@ use crate::parser::schema_registry::handle_sr_list; use crate::parser::unified::json::{JsonAccess, JsonParseOptions}; use crate::parser::unified::util::apply_row_accessor_on_stream_chunk_writer; use crate::parser::unified::AccessImpl; -use crate::parser::{AccessBuilder, ByteStreamSourceParser, SourceStreamChunkRowWriter}; +use crate::parser::{ + AccessBuilder, ByteStreamSourceParser, ParserFormat, SourceStreamChunkRowWriter, +}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -181,6 +183,10 @@ impl ByteStreamSourceParser for JsonParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::Json + } + async fn parse_one<'a>( &'a mut self, _key: Option>, diff --git a/src/connector/src/parser/maxwell/maxwell_parser.rs b/src/connector/src/parser/maxwell/maxwell_parser.rs index 8bee0bbfef1e..6fa2dab9eaa7 100644 --- a/src/connector/src/parser/maxwell/maxwell_parser.rs +++ b/src/connector/src/parser/maxwell/maxwell_parser.rs @@ -18,7 +18,7 @@ use crate::only_parse_payload; use crate::parser::unified::maxwell::MaxwellChangeEvent; use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer; use crate::parser::{ - AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, + AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParserFormat, SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -74,6 +74,10 @@ impl ByteStreamSourceParser for MaxwellParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::Maxwell + } + async fn parse_one<'a>( &'a mut self, _key: Option>, diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 45c62609f5bf..2ff661447d1b 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -396,6 +396,18 @@ pub enum ParseResult { TransactionControl(TransactionControl), } +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum ParserFormat { + CanalJson, + Csv, + Json, + Maxwell, + Debezium, + DebeziumMongo, + Upsert, + Plain, +} + /// `ByteStreamSourceParser` is a new message parser, the parser should consume /// the input data stream and return a stream of parsed msgs. pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { @@ -405,6 +417,9 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { /// The source context, used to report parsing error. fn source_ctx(&self) -> &SourceContext; + /// The format of the specific parser. + fn parser_format(&self) -> ParserFormat; + /// Parse one record from the given `payload` and write rows to the `writer`. /// /// Returns error if **any** of the rows in the message failed to parse. @@ -512,6 +527,15 @@ async fn into_chunk_stream(mut parser: P, data_stream for (i, msg) in batch.into_iter().enumerate() { if msg.key.is_none() && msg.payload.is_none() { + if parser.parser_format() == ParserFormat::Debezium { + tracing::debug!("heartbeat message {}, skip parser", msg.offset); + // empty payload means a heartbeat in cdc source + // heartbeat message offset should not overwrite data messages offset + split_offset_mapping + .entry(msg.split_id) + .or_insert(msg.offset.clone()); + } + continue; } let parse_span = tracing::info_span!( diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 0b42b1e66ab7..defb7ef54a1e 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -21,6 +21,7 @@ use super::{ SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::only_parse_payload; +use crate::parser::ParserFormat; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -75,6 +76,10 @@ impl ByteStreamSourceParser for PlainParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::Plain + } + async fn parse_one<'a>( &'a mut self, _key: Option>, diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index edc250947d75..214775851103 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -25,6 +25,7 @@ use super::{ SourceStreamChunkRowWriter, SpecificParserConfig, }; use crate::extract_key_config; +use crate::parser::ParserFormat; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] @@ -130,6 +131,10 @@ impl ByteStreamSourceParser for UpsertParser { &self.source_ctx } + fn parser_format(&self) -> ParserFormat { + ParserFormat::Upsert + } + async fn parse_one<'a>( &'a mut self, key: Option>, diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index 71046df258d6..f3890377e0bc 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -21,7 +21,11 @@ impl From for SourceMessage { fn from(message: CdcMessage) -> Self { SourceMessage { key: None, - payload: Some(message.payload.as_bytes().to_vec()), + payload: if message.payload.is_empty() { + None // heartbeat message + } else { + Some(message.payload.as_bytes().to_vec()) + }, offset: message.offset, split_id: message.partition.into(), meta: SourceMeta::Empty, diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index d6210b4146af..0c2fb2b3b268 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -161,7 +161,7 @@ impl CommonSplitReader for CdcSplitReader { }); while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { - tracing::debug!("receive events {:?}", events.len()); + tracing::trace!("receive events {:?}", events.len()); self.source_ctx .metrics .connector_source_rows_received diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 3bd24992f75f..1041f8adec69 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -74,12 +74,17 @@ impl MySqlCdcSplit { self.inner.split_id ) })?; - snapshot_done = match dbz_offset.source_offset.snapshot { - Some(val) => !val, - None => true, - }; + + // heartbeat event should not update the `snapshot_done` flag + if !dbz_offset.is_heartbeat { + snapshot_done = match dbz_offset.source_offset.snapshot { + Some(val) => !val, + None => true, + }; + } } self.inner.start_offset = Some(start_offset); + // if snapshot_done is already true, it won't be updated self.inner.snapshot_done = snapshot_done; Ok(()) } @@ -109,10 +114,14 @@ impl PostgresCdcSplit { self.inner.split_id ) })?; - snapshot_done = dbz_offset - .source_offset - .last_snapshot_record - .unwrap_or(false); + + // heartbeat event should not update the `snapshot_done` flag + if !dbz_offset.is_heartbeat { + snapshot_done = dbz_offset + .source_offset + .last_snapshot_record + .unwrap_or(false); + } } self.inner.start_offset = Some(start_offset); // if snapshot_done is already true, it won't be updated diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index 6297624283f1..9eff3991a4d4 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -160,6 +160,8 @@ pub struct DebeziumOffset { pub source_partition: HashMap, #[serde(rename = "sourceOffset")] pub source_offset: DebeziumSourceOffset, + #[serde(rename = "isHeartbeat")] + pub is_heartbeat: bool, } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -554,11 +556,11 @@ mod tests { #[test] fn test_mysql_binlog_offset() { - let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true } }"#; - let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true } }"#; - let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true } }"#; - let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true } }"#; - let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true } }"#; + let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#; + let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#; + let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#; + let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; + let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#; let off0 = CdcOffset::MySql(MySqlOffset::parse_str(off0_str).unwrap()); let off1 = CdcOffset::MySql(MySqlOffset::parse_str(off1_str).unwrap()); diff --git a/src/stream/src/executor/backfill/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc_backfill.rs index b383f2f5bc79..c17aad1d2d62 100644 --- a/src/stream/src/executor/backfill/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc_backfill.rs @@ -48,7 +48,7 @@ use crate::executor::{ }; use crate::task::{ActorId, CreateMviewProgress}; -const BACKFILL_STATE_KEY_SUFFIX: &str = "_backfill"; +pub const BACKFILL_STATE_KEY_SUFFIX: &str = "_backfill"; pub struct CdcBackfillExecutor { actor_ctx: ActorContextRef, @@ -349,6 +349,11 @@ impl CdcBackfillExecutor { break; } Message::Chunk(chunk) => { + // skip empty upstream chunk + if chunk.cardinality() == 0 { + continue; + } + let chunk_binlog_offset = get_cdc_chunk_last_offset( upstream_table_reader.inner().table_reader(), &chunk, @@ -528,6 +533,9 @@ impl CdcBackfillExecutor { "server".to_string() => server }, source_offset, + // upstream heartbeat event would not emit to the cdc backfill executor, + // since we don't parse heartbeat event in the source parser. + is_heartbeat: false, } }); diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index f1ee9f0c90d4..838c2d8c59c4 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -36,7 +36,7 @@ use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorError; -use crate::executor::StreamExecutorResult; +use crate::executor::{StreamExecutorResult, BACKFILL_STATE_KEY_SUFFIX}; const COMPLETE_SPLIT_PREFIX: &str = "SsGLdzRDqBuKzMf9bDap"; @@ -203,16 +203,41 @@ impl SourceStateTableHandler { &mut self, stream_source_split: &SplitImpl, ) -> StreamExecutorResult> { - Ok(match self.get(stream_source_split.id()).await? { + let split_id = stream_source_split.id(); + Ok(match self.get(split_id.clone()).await? { None => None, Some(row) => match row.datum_at(1) { Some(ScalarRefImpl::Jsonb(jsonb_ref)) => { - Some(SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?) + let mut split_impl = SplitImpl::restore_from_json(jsonb_ref.to_owned_scalar())?; + if let SplitImpl::MysqlCdc(ref mut split) = split_impl && let Some(mysql_split) = split.mysql_split.as_mut() { + // if the snapshot_done is not set, we should check whether the backfill is finished + if !mysql_split.inner.snapshot_done { + mysql_split.inner.snapshot_done = self.recover_cdc_snapshot_state(split_id).await?; + } + } + Some(split_impl) } _ => unreachable!(), }, }) } + + async fn recover_cdc_snapshot_state( + &mut self, + split_id: SplitId, + ) -> StreamExecutorResult { + let mut key = split_id.to_string(); + key.push_str(BACKFILL_STATE_KEY_SUFFIX); + + let flag = match self.get(key.into()).await? { + Some(row) => match row.datum_at(1) { + Some(ScalarRefImpl::Jsonb(jsonb_ref)) => jsonb_ref.as_bool()?, + _ => unreachable!("invalid cdc backfill persistent state"), + }, + None => false, + }; + Ok(flag) + } } // align with schema defined in `LogicalSource::infer_internal_table_catalog`. The function is used