From b64a0ac7eceb220f09026e16b86c2d84f477606f Mon Sep 17 00:00:00 2001
From: Kexiang Wang <kx.wang@hotmail.com>
Date: Wed, 8 May 2024 04:02:02 +0800
Subject: [PATCH] feat(cdc): add a parameter to control timeout of cdc source
 waiting time (#16598)

---
 e2e_test/batch/catalog/pg_settings.slt.part   |  1 +
 .../source/common/DbzConnectorConfig.java     | 13 +++++++++++--
 .../source/common/DbzSourceUtils.java         | 19 +++++++++----------
 .../source/core/DbzCdcEngineRunner.java       |  4 +++-
 src/common/src/session_config/mod.rs          |  4 ++++
 src/connector/src/source/cdc/mod.rs           |  1 +
 src/frontend/src/handler/create_source.rs     | 10 +++++++++-
 src/jni_core/src/jvm_runtime.rs               |  3 +--
 8 files changed, 39 insertions(+), 16 deletions(-)

diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part
index d9af757ba4c36..2a130de04c19c 100644
--- a/e2e_test/batch/catalog/pg_settings.slt.part
+++ b/e2e_test/batch/catalog/pg_settings.slt.part
@@ -19,6 +19,7 @@ user background_ddl
 user batch_enable_distributed_dml
 user batch_parallelism
 user bytea_output
+user cdc_source_wait_streaming_start_timeout
 user client_encoding
 user client_min_messages
 user create_compaction_group_for_mv
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java
index a84c8722ab590..a5804974fb29c 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java
@@ -31,8 +31,8 @@
 public class DbzConnectorConfig {
     private static final Logger LOG = LoggerFactory.getLogger(DbzConnectorConfig.class);
 
-    public static final String WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS =
-            "cdc.source.wait.streaming.before.exit.seconds";
+    private static final String WAIT_FOR_STREAMING_START_TIMEOUT_SECS =
+            "cdc.source.wait.streaming.start.timeout";
 
     /* Common configs */
     public static final String HOST = "hostname";
@@ -89,6 +89,7 @@ private static Map<String, String> extractDebeziumProperties(
     private final SourceTypeE sourceType;
     private final Properties resolvedDbzProps;
     private final boolean isBackfillSource;
+    private final int waitStreamingStartTimeout;
 
     public long getSourceId() {
         return sourceId;
@@ -106,6 +107,10 @@ public boolean isBackfillSource() {
         return isBackfillSource;
     }
 
+    public int getWaitStreamingStartTimeout() {
+        return waitStreamingStartTimeout;
+    }
+
     public DbzConnectorConfig(
             SourceTypeE source,
             long sourceId,
@@ -119,6 +124,9 @@ public DbzConnectorConfig(
         var isCdcBackfill =
                 null != userProps.get(SNAPSHOT_MODE_KEY)
                         && userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);
+        var waitStreamingStartTimeout =
+                Integer.parseInt(
+                        userProps.getOrDefault(WAIT_FOR_STREAMING_START_TIMEOUT_SECS, "30"));
 
         LOG.info(
                 "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}",
@@ -255,6 +263,7 @@ public DbzConnectorConfig(
         this.sourceType = source;
         this.resolvedDbzProps = dbzProps;
         this.isBackfillSource = isCdcBackfill;
+        this.waitStreamingStartTimeout = waitStreamingStartTimeout;
     }
 
     private Properties initiateDbConfig(String fileName, StringSubstitutor substitutor) {
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
index a4a16f010ad63..bd6720656811c 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
@@ -103,31 +103,30 @@ private static String quotePostgres(String identifier) {
         return "\"" + identifier + "\"";
     }
 
-    public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) {
+    public static boolean waitForStreamingRunning(
+            SourceTypeE sourceType, String dbServerName, int waitStreamingStartTimeout) {
         // Wait for streaming source of source that supported backfill
         LOG.info("Waiting for streaming source of {} to start", dbServerName);
         if (sourceType == SourceTypeE.MYSQL) {
-            return waitForStreamingRunningInner("mysql", dbServerName);
+            return waitForStreamingRunningInner("mysql", dbServerName, waitStreamingStartTimeout);
         } else if (sourceType == SourceTypeE.POSTGRES) {
-            return waitForStreamingRunningInner("postgres", dbServerName);
+            return waitForStreamingRunningInner(
+                    "postgres", dbServerName, waitStreamingStartTimeout);
         } else {
             LOG.info("Unsupported backfill source, just return true for {}", dbServerName);
             return true;
         }
     }
 
-    private static boolean waitForStreamingRunningInner(String connector, String dbServerName) {
-        int timeoutSecs =
-                Integer.parseInt(
-                        System.getProperty(
-                                DbzConnectorConfig.WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS));
+    private static boolean waitForStreamingRunningInner(
+            String connector, String dbServerName, int waitStreamingStartTimeout) {
         int pollCount = 0;
         while (!isStreamingRunning(connector, dbServerName, "streaming")) {
-            if (pollCount > timeoutSecs) {
+            if (pollCount > waitStreamingStartTimeout) {
                 LOG.error(
                         "Debezium streaming source of {} failed to start in timeout {}",
                         dbServerName,
-                        timeoutSecs);
+                        waitStreamingStartTimeout);
                 return false;
             }
             try {
diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
index a237e250bf06f..a64c11745cf91 100644
--- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
+++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java
@@ -145,7 +145,9 @@ public boolean start() throws InterruptedException {
                             .getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
             startOk =
                     DbzSourceUtils.waitForStreamingRunning(
-                            config.getSourceType(), databaseServerName);
+                            config.getSourceType(),
+                            databaseServerName,
+                            config.getWaitStreamingStartTimeout());
         }
 
         running.set(true);
diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs
index 4ee7617ee751d..572cecc0299c5 100644
--- a/src/common/src/session_config/mod.rs
+++ b/src/common/src/session_config/mod.rs
@@ -236,6 +236,10 @@ pub struct SessionConfig {
     #[parameter(default = 0)]
     lock_timeout: i32,
 
+    /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
+    #[parameter(default = 30)]
+    cdc_source_wait_streaming_start_timeout: i32,
+
     /// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
     /// Unused in RisingWave, support for compatibility.
     #[parameter(default = true)]
diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs
index e58fa0010f5ce..4a1222a343e54 100644
--- a/src/connector/src/source/cdc/mod.rs
+++ b/src/connector/src/source/cdc/mod.rs
@@ -44,6 +44,7 @@ pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
 pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
 // We enable transaction for shared cdc source by default
 pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
+pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
 
 pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
 pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs
index 0830cdb5392de..12d615fed4e1b 100644
--- a/src/frontend/src/handler/create_source.rs
+++ b/src/frontend/src/handler/create_source.rs
@@ -40,7 +40,8 @@ use risingwave_connector::schema::schema_registry::{
 use risingwave_connector::sink::iceberg::IcebergConfig;
 use risingwave_connector::source::cdc::{
     CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
-    CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
+    CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR,
+    MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
 };
 use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
 use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
@@ -1371,6 +1372,13 @@ pub async fn handle_create_source(
         with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
         // enable transactional cdc
         with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
+        with_properties.insert(
+            CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
+            session
+                .config()
+                .cdc_source_wait_streaming_start_timeout()
+                .to_string(),
+        );
     }
 
     // must behind `handle_addition_columns`
diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs
index c369ad7b38939..5950cf7685ea7 100644
--- a/src/jni_core/src/jvm_runtime.rs
+++ b/src/jni_core/src/jvm_runtime.rs
@@ -107,8 +107,7 @@ impl JavaVmWrapper {
             .option("-Dis_embedded_connector=true")
             .option(format!("-Djava.class.path={}", class_vec.join(":")))
             .option("-Xms16m")
-            .option(format!("-Xmx{}", jvm_heap_size))
-            .option("-Dcdc.source.wait.streaming.before.exit.seconds=30");
+            .option(format!("-Xmx{}", jvm_heap_size));
 
         tracing::info!("JVM args: {:?}", args_builder);
         let jvm_args = args_builder.build().context("invalid jvm args")?;