diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index d9af757ba4c36..2a130de04c19c 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -19,6 +19,7 @@ user background_ddl user batch_enable_distributed_dml user batch_parallelism user bytea_output +user cdc_source_wait_streaming_start_timeout user client_encoding user client_min_messages user create_compaction_group_for_mv diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index a84c8722ab590..a5804974fb29c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -31,8 +31,8 @@ public class DbzConnectorConfig { private static final Logger LOG = LoggerFactory.getLogger(DbzConnectorConfig.class); - public static final String WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS = - "cdc.source.wait.streaming.before.exit.seconds"; + private static final String WAIT_FOR_STREAMING_START_TIMEOUT_SECS = + "cdc.source.wait.streaming.start.timeout"; /* Common configs */ public static final String HOST = "hostname"; @@ -89,6 +89,7 @@ private static Map extractDebeziumProperties( private final SourceTypeE sourceType; private final Properties resolvedDbzProps; private final boolean isBackfillSource; + private final int waitStreamingStartTimeout; public long getSourceId() { return sourceId; @@ -106,6 +107,10 @@ public boolean isBackfillSource() { return isBackfillSource; } + public int getWaitStreamingStartTimeout() { + return waitStreamingStartTimeout; + } + public DbzConnectorConfig( SourceTypeE source, long sourceId, @@ -119,6 +124,9 @@ public DbzConnectorConfig( var isCdcBackfill = null != userProps.get(SNAPSHOT_MODE_KEY) && userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL); + var waitStreamingStartTimeout = + Integer.parseInt( + userProps.getOrDefault(WAIT_FOR_STREAMING_START_TIMEOUT_SECS, "30")); LOG.info( "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}", @@ -255,6 +263,7 @@ public DbzConnectorConfig( this.sourceType = source; this.resolvedDbzProps = dbzProps; this.isBackfillSource = isCdcBackfill; + this.waitStreamingStartTimeout = waitStreamingStartTimeout; } private Properties initiateDbConfig(String fileName, StringSubstitutor substitutor) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index a4a16f010ad63..bd6720656811c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -103,31 +103,30 @@ private static String quotePostgres(String identifier) { return "\"" + identifier + "\""; } - public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) { + public static boolean waitForStreamingRunning( + SourceTypeE sourceType, String dbServerName, int waitStreamingStartTimeout) { // Wait for streaming source of source that supported backfill LOG.info("Waiting for streaming source of {} to start", dbServerName); if (sourceType == SourceTypeE.MYSQL) { - return waitForStreamingRunningInner("mysql", dbServerName); + return waitForStreamingRunningInner("mysql", dbServerName, waitStreamingStartTimeout); } else if (sourceType == SourceTypeE.POSTGRES) { - return waitForStreamingRunningInner("postgres", dbServerName); + return waitForStreamingRunningInner( + "postgres", dbServerName, waitStreamingStartTimeout); } else { LOG.info("Unsupported backfill source, just return true for {}", dbServerName); return true; } } - private static boolean waitForStreamingRunningInner(String connector, String dbServerName) { - int timeoutSecs = - Integer.parseInt( - System.getProperty( - DbzConnectorConfig.WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS)); + private static boolean waitForStreamingRunningInner( + String connector, String dbServerName, int waitStreamingStartTimeout) { int pollCount = 0; while (!isStreamingRunning(connector, dbServerName, "streaming")) { - if (pollCount > timeoutSecs) { + if (pollCount > waitStreamingStartTimeout) { LOG.error( "Debezium streaming source of {} failed to start in timeout {}", dbServerName, - timeoutSecs); + waitStreamingStartTimeout); return false; } try { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java index a237e250bf06f..a64c11745cf91 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngineRunner.java @@ -145,7 +145,9 @@ public boolean start() throws InterruptedException { .getProperty(CommonConnectorConfig.TOPIC_PREFIX.name()); startOk = DbzSourceUtils.waitForStreamingRunning( - config.getSourceType(), databaseServerName); + config.getSourceType(), + databaseServerName, + config.getWaitStreamingStartTimeout()); } running.set(true); diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 4ee7617ee751d..572cecc0299c5 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -236,6 +236,10 @@ pub struct SessionConfig { #[parameter(default = 0)] lock_timeout: i32, + /// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds. + #[parameter(default = 30)] + cdc_source_wait_streaming_start_timeout: i32, + /// see . /// Unused in RisingWave, support for compatibility. #[parameter(default = true)] diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index e58fa0010f5ce..4a1222a343e54 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -44,6 +44,7 @@ pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval"; pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size"; // We enable transaction for shared cdc source by default pub const CDC_TRANSACTIONAL_KEY: &str = "transactional"; +pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout"; pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME; pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 0830cdb5392de..12d615fed4e1b 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -40,7 +40,8 @@ use risingwave_connector::schema::schema_registry::{ use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::{ CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, - CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, + CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, + MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; @@ -1371,6 +1372,13 @@ pub async fn handle_create_source( with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into()); // enable transactional cdc with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into()); + with_properties.insert( + CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(), + session + .config() + .cdc_source_wait_streaming_start_timeout() + .to_string(), + ); } // must behind `handle_addition_columns` diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index c369ad7b38939..5950cf7685ea7 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -107,8 +107,7 @@ impl JavaVmWrapper { .option("-Dis_embedded_connector=true") .option(format!("-Djava.class.path={}", class_vec.join(":"))) .option("-Xms16m") - .option(format!("-Xmx{}", jvm_heap_size)) - .option("-Dcdc.source.wait.streaming.before.exit.seconds=30"); + .option(format!("-Xmx{}", jvm_heap_size)); tracing::info!("JVM args: {:?}", args_builder); let jvm_args = args_builder.build().context("invalid jvm args")?;