Skip to content

Commit

Permalink
feat(cdc): add a parameter to control timeout of cdc source waiting t…
Browse files Browse the repository at this point in the history
…ime (#16598)
  • Loading branch information
KeXiangWang authored and KeXiangWang committed May 7, 2024
1 parent 4e4c37c commit 521a014
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 16 deletions.
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bytea_output
user cdc_source_wait_streaming_start_timeout
user client_encoding
user client_min_messages
user create_compaction_group_for_mv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
public class DbzConnectorConfig {
private static final Logger LOG = LoggerFactory.getLogger(DbzConnectorConfig.class);

public static final String WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS =
"cdc.source.wait.streaming.before.exit.seconds";
private static final String WAIT_FOR_STREAMING_START_TIMEOUT_SECS =
"cdc.source.wait.streaming.start.timeout";

/* Common configs */
public static final String HOST = "hostname";
Expand Down Expand Up @@ -89,6 +89,7 @@ private static Map<String, String> extractDebeziumProperties(
private final SourceTypeE sourceType;
private final Properties resolvedDbzProps;
private final boolean isBackfillSource;
private final int waitStreamingStartTimeout;

public long getSourceId() {
return sourceId;
Expand All @@ -106,6 +107,10 @@ public boolean isBackfillSource() {
return isBackfillSource;
}

public int getWaitStreamingStartTimeout() {
return waitStreamingStartTimeout;
}

public DbzConnectorConfig(
SourceTypeE source,
long sourceId,
Expand All @@ -119,6 +124,9 @@ public DbzConnectorConfig(
var isCdcBackfill =
null != userProps.get(SNAPSHOT_MODE_KEY)
&& userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);
var waitStreamingStartTimeout =
Integer.parseInt(
userProps.getOrDefault(WAIT_FOR_STREAMING_START_TIMEOUT_SECS, "30"));

LOG.info(
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}",
Expand Down Expand Up @@ -255,6 +263,7 @@ public DbzConnectorConfig(
this.sourceType = source;
this.resolvedDbzProps = dbzProps;
this.isBackfillSource = isCdcBackfill;
this.waitStreamingStartTimeout = waitStreamingStartTimeout;
}

private Properties initiateDbConfig(String fileName, StringSubstitutor substitutor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,30 @@ private static String quotePostgres(String identifier) {
return "\"" + identifier + "\"";
}

public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) {
public static boolean waitForStreamingRunning(
SourceTypeE sourceType, String dbServerName, int waitStreamingStartTimeout) {
// Wait for streaming source of source that supported backfill
LOG.info("Waiting for streaming source of {} to start", dbServerName);
if (sourceType == SourceTypeE.MYSQL) {
return waitForStreamingRunningInner("mysql", dbServerName);
return waitForStreamingRunningInner("mysql", dbServerName, waitStreamingStartTimeout);
} else if (sourceType == SourceTypeE.POSTGRES) {
return waitForStreamingRunningInner("postgres", dbServerName);
return waitForStreamingRunningInner(
"postgres", dbServerName, waitStreamingStartTimeout);
} else {
LOG.info("Unsupported backfill source, just return true for {}", dbServerName);
return true;
}
}

private static boolean waitForStreamingRunningInner(String connector, String dbServerName) {
int timeoutSecs =
Integer.parseInt(
System.getProperty(
DbzConnectorConfig.WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS));
private static boolean waitForStreamingRunningInner(
String connector, String dbServerName, int waitStreamingStartTimeout) {
int pollCount = 0;
while (!isStreamingRunning(connector, dbServerName, "streaming")) {
if (pollCount > timeoutSecs) {
if (pollCount > waitStreamingStartTimeout) {
LOG.error(
"Debezium streaming source of {} failed to start in timeout {}",
dbServerName,
timeoutSecs);
waitStreamingStartTimeout);
return false;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public boolean start() throws InterruptedException {
.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
startOk =
DbzSourceUtils.waitForStreamingRunning(
config.getSourceType(), databaseServerName);
config.getSourceType(),
databaseServerName,
config.getWaitStreamingStartTimeout());
}

running.set(true);
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ pub struct SessionConfig {
#[parameter(default = 0)]
lock_timeout: i32,

/// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
#[parameter(default = 30)]
cdc_source_wait_streaming_start_timeout: i32,

/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
/// Unused in RisingWave, support for compatibility.
#[parameter(default = true)]
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
// We enable transaction for shared cdc source by default
pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";

pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use risingwave_connector::schema::schema_registry::{
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR,
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
};
use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
Expand Down Expand Up @@ -1371,6 +1372,13 @@ pub async fn handle_create_source(
with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
// enable transactional cdc
with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
with_properties.insert(
CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
session
.config()
.cdc_source_wait_streaming_start_timeout()
.to_string(),
);
}

// must behind `handle_addition_columns`
Expand Down
3 changes: 1 addition & 2 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ impl JavaVmWrapper {
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size))
.option("-Dcdc.source.wait.streaming.before.exit.seconds=30");
.option(format!("-Xmx{}", jvm_heap_size));

tracing::info!("JVM args: {:?}", args_builder);
let jvm_args = args_builder.build().context("invalid jvm args")?;
Expand Down

0 comments on commit 521a014

Please sign in to comment.