Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): add a parameter to control timeout of cdc source waiting time (#16598) #16624

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bytea_output
user cdc_source_wait_streaming_start_timeout
user client_encoding
user client_min_messages
user create_compaction_group_for_mv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
public class DbzConnectorConfig {
private static final Logger LOG = LoggerFactory.getLogger(DbzConnectorConfig.class);

public static final String WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS =
"cdc.source.wait.streaming.before.exit.seconds";
private static final String WAIT_FOR_STREAMING_START_TIMEOUT_SECS =
"cdc.source.wait.streaming.start.timeout";

/* Common configs */
public static final String HOST = "hostname";
Expand Down Expand Up @@ -89,6 +89,7 @@ private static Map<String, String> extractDebeziumProperties(
private final SourceTypeE sourceType;
private final Properties resolvedDbzProps;
private final boolean isBackfillSource;
private final int waitStreamingStartTimeout;

public long getSourceId() {
return sourceId;
Expand All @@ -106,6 +107,10 @@ public boolean isBackfillSource() {
return isBackfillSource;
}

public int getWaitStreamingStartTimeout() {
return waitStreamingStartTimeout;
}

public DbzConnectorConfig(
SourceTypeE source,
long sourceId,
Expand All @@ -119,6 +124,9 @@ public DbzConnectorConfig(
var isCdcBackfill =
null != userProps.get(SNAPSHOT_MODE_KEY)
&& userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);
var waitStreamingStartTimeout =
Integer.parseInt(
userProps.getOrDefault(WAIT_FOR_STREAMING_START_TIMEOUT_SECS, "30"));

LOG.info(
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}",
Expand Down Expand Up @@ -255,6 +263,7 @@ public DbzConnectorConfig(
this.sourceType = source;
this.resolvedDbzProps = dbzProps;
this.isBackfillSource = isCdcBackfill;
this.waitStreamingStartTimeout = waitStreamingStartTimeout;
}

private Properties initiateDbConfig(String fileName, StringSubstitutor substitutor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,30 @@ private static String quotePostgres(String identifier) {
return "\"" + identifier + "\"";
}

public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) {
public static boolean waitForStreamingRunning(
SourceTypeE sourceType, String dbServerName, int waitStreamingStartTimeout) {
// Wait for streaming source of source that supported backfill
LOG.info("Waiting for streaming source of {} to start", dbServerName);
if (sourceType == SourceTypeE.MYSQL) {
return waitForStreamingRunningInner("mysql", dbServerName);
return waitForStreamingRunningInner("mysql", dbServerName, waitStreamingStartTimeout);
} else if (sourceType == SourceTypeE.POSTGRES) {
return waitForStreamingRunningInner("postgres", dbServerName);
return waitForStreamingRunningInner(
"postgres", dbServerName, waitStreamingStartTimeout);
} else {
LOG.info("Unsupported backfill source, just return true for {}", dbServerName);
return true;
}
}

private static boolean waitForStreamingRunningInner(String connector, String dbServerName) {
int timeoutSecs =
Integer.parseInt(
System.getProperty(
DbzConnectorConfig.WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS));
private static boolean waitForStreamingRunningInner(
String connector, String dbServerName, int waitStreamingStartTimeout) {
int pollCount = 0;
while (!isStreamingRunning(connector, dbServerName, "streaming")) {
if (pollCount > timeoutSecs) {
if (pollCount > waitStreamingStartTimeout) {
LOG.error(
"Debezium streaming source of {} failed to start in timeout {}",
dbServerName,
timeoutSecs);
waitStreamingStartTimeout);
return false;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public boolean start() throws InterruptedException {
.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
startOk =
DbzSourceUtils.waitForStreamingRunning(
config.getSourceType(), databaseServerName);
config.getSourceType(),
databaseServerName,
config.getWaitStreamingStartTimeout());
}

running.set(true);
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ pub struct SessionConfig {
#[parameter(default = 0)]
lock_timeout: i32,

/// For limiting the startup time of a shareable CDC streaming source when the source is being created. Unit: seconds.
#[parameter(default = 30)]
cdc_source_wait_streaming_start_timeout: i32,

/// see <https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-ROW-SECURITY>.
/// Unused in RisingWave, support for compatibility.
#[parameter(default = true)]
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
// We enable transaction for shared cdc source by default
pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";

pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use risingwave_connector::schema::schema_registry::{
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::cdc::{
CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY,
CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR, MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, MONGODB_CDC_CONNECTOR,
MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR,
};
use risingwave_connector::source::datagen::DATAGEN_CONNECTOR;
use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
Expand Down Expand Up @@ -1371,6 +1372,13 @@ pub async fn handle_create_source(
with_properties.insert(CDC_SHARING_MODE_KEY.into(), "true".into());
// enable transactional cdc
with_properties.insert(CDC_TRANSACTIONAL_KEY.into(), "true".into());
with_properties.insert(
CDC_WAIT_FOR_STREAMING_START_TIMEOUT.into(),
session
.config()
.cdc_source_wait_streaming_start_timeout()
.to_string(),
);
}

// must behind `handle_addition_columns`
Expand Down
3 changes: 1 addition & 2 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ impl JavaVmWrapper {
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size))
.option("-Dcdc.source.wait.streaming.before.exit.seconds=30");
.option(format!("-Xmx{}", jvm_heap_size));

tracing::info!("JVM args: {:?}", args_builder);
let jvm_args = args_builder.build().context("invalid jvm args")?;
Expand Down
Loading