diff --git a/e2e_test/source/cdc/cdc.check_new_rows.slt b/e2e_test/source/cdc/cdc.check_new_rows.slt index cccf0272d9ff4..271cbb56cc2f1 100644 --- a/e2e_test/source/cdc/cdc.check_new_rows.slt +++ b/e2e_test/source/cdc/cdc.check_new_rows.slt @@ -52,12 +52,12 @@ select v1, v2, v3 from mytable order by v1; query I SELECT * from products_test_cnt ---- -12 +13 query I SELECT * from orders_test_cnt ---- -4 +5 query ITT SELECT * FROM products_test order by id limit 3 diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 91afac4f1893e..1e0e4637d5017 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -34,6 +34,10 @@ create table products_test ( id INT, PRIMARY KEY (id) ) from mysql_mytest table 'mytest.products'; +system ok +mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food'); + INSERT INTO orders VALUES (default, '2023-11-28 15:08:22', 'Bob', 10.52, 100, false);" + # check the fragment distribution query TT select distribution_type,flags from rw_fragments order by fragment_id; @@ -70,7 +74,7 @@ statement ok create materialized view orders_test_cnt as select count(*) as cnt from orders_test; system ok -mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Milk', '100ml Milk');" +mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');" sleep 5s @@ -78,12 +82,12 @@ sleep 5s query I SELECT * from products_test_cnt ---- -10 +11 query I SELECT * from orders_test_cnt ---- -3 +4 query ITT SELECT * FROM products_test order by id limit 3 diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/CdcEngineRunner.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/CdcEngineRunner.java index 92648fa228081..bfcd2d3843143 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/CdcEngineRunner.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/source/CdcEngineRunner.java @@ -15,7 +15,7 @@ package com.risingwave.connector.api.source; public interface CdcEngineRunner { - void start() throws Exception; + boolean start() throws Exception; void stop() throws Exception; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index e2755370727de..2f27579082396 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -79,10 +79,9 @@ private static Map extractDebeziumProperties( } private final long sourceId; - private final SourceTypeE sourceType; - private final Properties resolvedDbzProps; + private final boolean isBackfillSource; public long getSourceId() { return sourceId; @@ -96,6 +95,10 @@ public Properties getResolvedDebeziumProps() { return resolvedDbzProps; } + public boolean isBackfillSource() { + return isBackfillSource; + } + public DbzConnectorConfig( SourceTypeE source, long sourceId, @@ -190,6 +193,7 @@ public DbzConnectorConfig( this.sourceId = sourceId; this.sourceType = source; this.resolvedDbzProps = dbzProps; + this.isBackfillSource = isCdcBackfill; } private void adjustConfigForSharedCdcStream(Properties dbzProps) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java new file mode 100644 index 0000000000000..d5e09b03825c9 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -0,0 +1,90 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.risingwave.connector.source.common; + +import com.risingwave.connector.api.source.SourceTypeE; +import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeUnit; +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbzSourceUtils { + static final Logger LOG = LoggerFactory.getLogger(DbzSourceUtils.class); + + public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) { + // Right now, we only needs to wait for MySQL source, as it's the only source that support + // backfill. After we support backfill for other sources, we need to wait for all sources + // too + if (sourceType == SourceTypeE.MYSQL) { + LOG.info("Waiting for streaming source of {} to start", dbServerName); + return waitForStreamingRunningInner("mysql", dbServerName); + } else { + LOG.info("Unsupported backfill source, just return true for {}", dbServerName); + return true; + } + } + + private static boolean waitForStreamingRunningInner(String connector, String dbServerName) { + int maxPollCount = 10; + while (!isStreamingRunning(connector, dbServerName, "streaming")) { + maxPollCount--; + if (maxPollCount == 0) { + LOG.error("Debezium streaming source of {} failed to start", dbServerName); + return false; + } + try { + TimeUnit.SECONDS.sleep(1); // poll interval + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for streaming source to start", e); + } + } + + LOG.info("Debezium streaming source of {} started", dbServerName); + return true; + } + + // Copy from debezium test suite: io.debezium.embedded.AbstractConnectorTest + // Notes: although this method is recommended by the community + // (https://debezium.zulipchat.com/#narrow/stream/302529-community-general/topic/.E2.9C.94.20Embedded.20engine.20has.20started.20StreamingChangeEventSource/near/405121659), + // but it is not solid enough. As the jmx bean metric is marked as true before starting the + // binlog client, which may fail to connect the upstream database. + private static boolean isStreamingRunning(String connector, String server, String contextName) { + final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); + try { + return (boolean) + mbeanServer.getAttribute( + getStreamingMetricsObjectName(connector, server, contextName), + "Connected"); + } catch (JMException ex) { + LOG.warn("Failed to get streaming metrics", ex); + } + return false; + } + + private static ObjectName getStreamingMetricsObjectName( + String connector, String server, String context) throws MalformedObjectNameException { + return new ObjectName( + "debezium." + + connector + + ":type=connector-metrics,context=" + + context + + ",server=" + + server); + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java index 8137da8b7fea1..4f4a13f3027a8 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java @@ -31,19 +31,24 @@ public class DbzCdcEngine implements CdcEngine { private final long id; /** If config is not valid will throw exceptions */ - public DbzCdcEngine(long id, Properties config, DebeziumEngine.CompletionCallback callback) { + public DbzCdcEngine( + long sourceId, + Properties config, + DebeziumEngine.CompletionCallback completionCallback) { var dbzHeartbeatPrefix = config.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name()); var consumer = new DbzCdcEventConsumer( - id, dbzHeartbeatPrefix, new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY)); + sourceId, + dbzHeartbeatPrefix, + new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY)); // Builds a debezium engine but not start it - this.id = id; + this.id = sourceId; this.consumer = consumer; this.engine = DebeziumEngine.create(Connect.class) .using(config) - .using(callback) + .using(completionCallback) .notifying(consumer) .build(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index de71296dcc491..caf8ed4b87285 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -16,6 +16,7 @@ import com.risingwave.connector.api.source.*; import com.risingwave.connector.source.common.DbzConnectorConfig; +import com.risingwave.connector.source.common.DbzSourceUtils; import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; import io.grpc.stub.StreamObserver; import java.util.concurrent.ExecutorService; @@ -31,13 +32,7 @@ public class DbzCdcEngineRunner implements CdcEngineRunner { private final ExecutorService executor; private final AtomicBoolean running = new AtomicBoolean(false); private final CdcEngine engine; - - private DbzCdcEngineRunner(CdcEngine engine) { - this.executor = - Executors.newSingleThreadExecutor( - r -> new Thread(r, "rw-dbz-engine-runner-" + engine.getId())); - this.engine = engine; - } + private final DbzConnectorConfig config; public static CdcEngineRunner newCdcEngineRunner( DbzConnectorConfig config, StreamObserver responseObserver) { @@ -62,14 +57,14 @@ public static CdcEngineRunner newCdcEngineRunner( } }); - runner = new DbzCdcEngineRunner(engine); + runner = new DbzCdcEngineRunner(engine, config); } catch (Exception e) { LOG.error("failed to create the CDC engine", e); } return runner; } - public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) { + public static CdcEngineRunner create(DbzConnectorConfig config) { DbzCdcEngineRunner runner = null; try { var sourceId = config.getSourceId(); @@ -89,32 +84,44 @@ public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) { } }); - runner = new DbzCdcEngineRunner(engine); + runner = new DbzCdcEngineRunner(engine, config); } catch (Exception e) { LOG.error("failed to create the CDC engine", e); } return runner; } + // private constructor + private DbzCdcEngineRunner(CdcEngine engine, DbzConnectorConfig config) { + this.executor = + Executors.newSingleThreadExecutor( + r -> new Thread(r, "rw-dbz-engine-runner-" + engine.getId())); + this.engine = engine; + this.config = config; + } + /** Start to run the cdc engine */ - public void start() throws InterruptedException { + public boolean start() throws InterruptedException { if (isRunning()) { LOG.info("engine#{} already started", engine.getId()); - return; + return true; } - // put a handshake message to notify the Source executor - var controlInfo = - GetEventStreamResponse.ControlInfo.newBuilder().setHandshakeOk(true).build(); - engine.getOutputChannel() - .put( - GetEventStreamResponse.newBuilder() - .setSourceId(engine.getId()) - .setControl(controlInfo) - .build()); executor.execute(engine); + + boolean startOk = true; + // For backfill source, we need to wait for the streaming source to start before proceeding + if (config.isBackfillSource()) { + var databaseServerName = + config.getResolvedDebeziumProps().getProperty("database.server.name"); + startOk = + DbzSourceUtils.waitForStreamingRunning( + config.getSourceType(), databaseServerName); + } + running.set(true); LOG.info("engine#{} started", engine.getId()); + return startOk; } public void stop() throws Exception { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index 7f8708fedf591..bbf99d3677926 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -14,11 +14,13 @@ package com.risingwave.connector.source.core; +import com.risingwave.connector.api.source.CdcEngineRunner; import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.connector.source.common.DbzConnectorConfig; import com.risingwave.java.binding.Binding; import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.proto.ConnectorServiceProto; +import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -27,7 +29,7 @@ /** handler for starting a debezium source connectors for jni */ public class JniDbzSourceHandler { - static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class); + static final Logger LOG = LoggerFactory.getLogger(JniDbzSourceHandler.class); private final DbzConnectorConfig config; @@ -61,16 +63,22 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long } public void start(long channelPtr) { - var runner = DbzCdcEngineRunner.newCdcEngineRunner(config); + var runner = DbzCdcEngineRunner.create(config); if (runner == null) { return; } try { // Start the engine - runner.start(); - LOG.info("Start consuming events of table {}", config.getSourceId()); + var startOk = runner.start(); + if (!sendHandshakeMessage(runner, channelPtr, startOk)) { + LOG.error( + "Failed to send handshake message to channel. sourceId={}", + config.getSourceId()); + return; + } + LOG.info("Start consuming events of table {}", config.getSourceId()); while (runner.isRunning()) { // check whether the send queue has room for new messages // Thread will block on the channel to get output from engine @@ -107,4 +115,25 @@ public void start(long channelPtr) { } } } + + private boolean sendHandshakeMessage(CdcEngineRunner runner, long channelPtr, boolean startOk) + throws Exception { + // send a handshake message to notify the Source executor + // if the handshake is not ok, the split reader will return error to source actor + var controlInfo = + GetEventStreamResponse.ControlInfo.newBuilder().setHandshakeOk(startOk).build(); + + var handshakeMsg = + GetEventStreamResponse.newBuilder() + .setSourceId(config.getSourceId()) + .setControl(controlInfo) + .build(); + var success = Binding.sendCdcSourceMsgToChannel(channelPtr, handshakeMsg.toByteArray()); + if (!success) { + LOG.info( + "Engine#{}: JNI sender broken detected, stop the engine", config.getSourceId()); + runner.stop(); + } + return success; + } } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 26cfa944ae4a3..1fb93beb54563 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -143,11 +143,15 @@ impl SplitReader for CdcSplitReader { } }); + // wait for the handshake message if let Some(res) = rx.recv().await { let resp: GetEventStreamResponse = res?; let inited = match resp.control { Some(info) => info.handshake_ok, - None => false, + None => { + tracing::error!(?source_id, "handshake message not received. {:?}", resp); + false + } }; if !inited { return Err(anyhow!("failed to start cdc connector")); diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index 34d510bb22416..f867c29623a06 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -542,6 +542,10 @@ mod tests { #[test] fn test_mysql_filter_expr() { + let cols = vec!["id".to_string()]; + let expr = MySqlExternalTableReader::filter_expression(&cols); + assert_eq!(expr, "(id > :id)"); + let cols = vec!["aa".to_string(), "bb".to_string(), "cc".to_string()]; let expr = MySqlExternalTableReader::filter_expression(&cols); assert_eq!( diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 44127aee88d74..a8e9337ed0703 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::anyhow; use either::Either; use futures::stream::select_with_strategy; -use futures::{pin_mut, stream, StreamExt, TryStreamExt}; +use futures::{pin_mut, stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, StreamChunk}; @@ -136,7 +136,6 @@ impl CdcBackfillExecutor { // Poll the upstream to get the first barrier. let first_barrier = expect_first_barrier(&mut upstream).await?; - let init_epoch = first_barrier.epoch.prev; // Check whether this parallelism has been assigned splits, // if not, we should bypass the backfill directly. @@ -206,37 +205,19 @@ impl CdcBackfillExecutor { upstream.peekable() }; - tracing::debug!(?upstream_table_id, ?shared_cdc_source, "start cdc backfill"); state_impl.init_epoch(first_barrier.epoch); // restore backfill state let state = state_impl.restore_state().await?; current_pk_pos = state.current_pk_pos.clone(); - // If the snapshot is empty, we don't need to backfill. - let is_snapshot_empty: bool = { - if state.is_finished { - // It is finished, so just assign a value to avoid accessing storage table again. - false - } else { - let args = SnapshotReadArgs::new(init_epoch, None, false, self.chunk_size); - let snapshot = upstream_table_reader.snapshot_read(args); - pin_mut!(snapshot); - snapshot.try_next().await?.unwrap().is_none() - } - }; - - // | backfill_is_finished | snapshot_empty | need_to_backfill | - // | t | t/f | f | - // | f | t | f | - // | f | f | t | - let to_backfill = !state.is_finished && !is_snapshot_empty; + let to_backfill = !state.is_finished; // The first barrier message should be propagated. yield Message::Barrier(first_barrier); // Keep track of rows from the snapshot. - let mut total_snapshot_row_count: u64 = 0; + let mut total_snapshot_row_count = state.row_count as u64; let mut snapshot_read_epoch; let mut last_binlog_offset: Option = state @@ -245,6 +226,15 @@ impl CdcBackfillExecutor { let mut consumed_binlog_offset: Option = None; + tracing::info!( + upstream_table_id, + shared_cdc_source, + ?current_pk_pos, + is_finished = state.is_finished, + snapshot_row_count = total_snapshot_row_count, + "start cdc backfill" + ); + // CDC Backfill Algorithm: // // When the first barrier comes from upstream: @@ -269,11 +259,29 @@ impl CdcBackfillExecutor { // otherwise the upstream changelog may be blocked by the snapshot read stream let _ = Pin::new(&mut upstream).peek().await; - tracing::info!( - upstream_table_id, - initial_binlog_offset = ?last_binlog_offset, - "start the bacfill loop"); + // wait for a barrier to make sure the backfill starts after upstream source + #[for_await] + for msg in upstream.by_ref() { + match msg? { + Message::Barrier(barrier) => { + // commit state just to bump the epoch of state table + state_impl.commit_state(barrier.epoch).await?; + yield Message::Barrier(barrier); + break; + } + Message::Chunk(ref chunk) => { + last_binlog_offset = get_cdc_chunk_last_offset( + upstream_table_reader.inner().table_reader(), + chunk, + )?; + } + Message::Watermark(_) => { + // Ignore watermark + } + } + } + tracing::info!(upstream_table_id, initial_binlog_offset = ?last_binlog_offset, ?current_pk_pos, "start cdc backfill loop"); 'backfill_loop: loop { let mut upstream_chunk_buffer: Vec = vec![]; @@ -471,27 +479,13 @@ impl CdcBackfillExecutor { } } } - } else if is_snapshot_empty { - tracing::info!( - upstream_table_id, - initial_binlog_offset = ?last_binlog_offset, - "upstream snapshot is empty, mark backfill is done and persist current binlog offset"); - - // The snapshot is empty, just set backfill to finished - state_impl - .mutate_state( - current_pk_pos, - last_binlog_offset, - total_snapshot_row_count, - true, - ) - .await?; } // drop reader to release db connection drop(upstream_table_reader); tracing::info!( + upstream_table_id, "CdcBackfill has already finished and forward messages directly to the downstream" ); diff --git a/src/stream/src/executor/backfill/cdc/state.rs b/src/stream/src/executor/backfill/cdc/state.rs index d9a52d8e68d76..f62504415c9d1 100644 --- a/src/stream/src/executor/backfill/cdc/state.rs +++ b/src/stream/src/executor/backfill/cdc/state.rs @@ -159,18 +159,17 @@ impl MultiBackfillState { } pub async fn mutate_state(&mut self, record: &CdcStateRecord) -> StreamExecutorResult<()> { - let Some(current_pk_pos) = &record.current_pk_pos else { - return Ok(()); - }; - // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` | let state = self.cached_state.as_mut_slice(); let split_id = Some(ScalarImpl::from(self.split_id.clone())); + let state_len = state.len(); state[0] = split_id.clone(); - state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner()); - state[current_pk_pos.len() + 1] = Some(record.is_finished.into()); - state[current_pk_pos.len() + 2] = Some(record.row_count.into()); - state[current_pk_pos.len() + 3] = record.last_cdc_offset.clone().map(|cdc_offset| { + if let Some(current_pk_pos) = &record.current_pk_pos { + state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner()); + } + state[state_len - 3] = Some(record.is_finished.into()); + state[state_len - 2] = Some(record.row_count.into()); + state[state_len - 1] = record.last_cdc_offset.clone().map(|cdc_offset| { let json = serde_json::to_value(cdc_offset).unwrap(); ScalarImpl::Jsonb(JsonbVal::from(json)) });