diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index e7bf5341732ba..40288f5848b16 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -22,7 +22,6 @@ echo "--- Run integration tests in deterministic simulation mode" seq "$TEST_NUM" | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ cargo nextest run \ --no-fail-fast \ - --no-capture \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ $TEST_PATTERN 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log" diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index e8a0fa32f1010..5ee988110ca56 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -388,7 +388,7 @@ steps: # Ddl statements will randomly run with background_ddl. - label: "background_ddl, arrangement_backfill recovery test (madsim)" key: "background-ddl-arrangement-backfill-recovery-test-deterministic" - command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 65m ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 70m ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index 2ba6d54084218..4899b7d152d82 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -44,13 +44,7 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; let internal_tables = session.run("show internal tables").await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs index af3cab0ad53f4..0307fc671e024 100644 --- a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs +++ b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs @@ -40,13 +40,7 @@ async fn test_sink_decouple_err_isolation() -> Result<()> { session.run("set sink_decouple = true").await?; session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; test_sink.set_err_rate(0.002); @@ -87,13 +81,7 @@ async fn test_sink_error_event_logs() -> Result<()> { session.run("set sink_decouple = true").await?; session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; test_sink.store.wait_for_err(1).await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs index 54b27de6eb2c2..124f0b0d9fe5d 100644 --- a/src/tests/simulation/tests/integration_tests/sink/recovery.rs +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -71,12 +71,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; let count = test_source.id_list.len(); diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 71bc7a998783d..9ecff238fb035 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -73,13 +73,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; let mut sink_fragments = cluster .locate_fragments([identity_contains("Sink")]) diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs index bef5bdfa35d0b..9c93b388dca5d 100644 --- a/src/tests/simulation/tests/integration_tests/sink/utils.rs +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -39,6 +39,7 @@ use risingwave_connector::source::test_source::{ registry_test_source, BoxSource, TestSourceRegistryGuard, TestSourceSplit, }; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; +use tokio::task::yield_now; use tokio::time::sleep; use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; @@ -244,6 +245,14 @@ impl SimulationTestSink { let err_rate = u32::MAX as f64 * err_rate; self.err_rate.store(err_rate as _, Relaxed); } + + pub async fn wait_initial_parallelism(&self, parallelism: usize) -> Result<()> { + while self.parallelism_counter.load(Relaxed) < parallelism { + yield_now().await; + } + assert_eq!(self.parallelism_counter.load(Relaxed), parallelism); + Ok(()) + } } pub fn build_stream_chunk(