From b0d719f1f10551f9cdbeb1b40cdd2b9b0247b4a1 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 18 Nov 2024 14:36:26 +0800 Subject: [PATCH 1/3] fix: (reproduce) incorrect madsim initial sink parallelism --- ci/scripts/deterministic-it-test.sh | 1 + .../tests/integration_tests/sink/basic.rs | 8 +++++++- .../integration_tests/sink/err_isolation.rs | 16 ++++++++++++++-- .../tests/integration_tests/sink/recovery.rs | 7 ++++++- .../tests/integration_tests/sink/scale.rs | 8 +++++++- 5 files changed, 35 insertions(+), 5 deletions(-) diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index 40288f5848b1..e7bf5341732b 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -22,6 +22,7 @@ echo "--- Run integration tests in deterministic simulation mode" seq "$TEST_NUM" | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ cargo nextest run \ --no-fail-fast \ + --no-capture \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ $TEST_PATTERN 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log" diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index 8ba8982ce4d7..be6212f836df 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -44,7 +44,13 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + if test_sink.parallelism_counter.load(Relaxed) != 6 { + return Err(anyhow!( + "incorrect initial parallelism: {} ", + test_sink.parallelism_counter.load(Relaxed) + )); + } let internal_tables = session.run("show internal tables").await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs index 124653946b87..a427c256a22a 100644 --- a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs +++ b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs @@ -40,7 +40,13 @@ async fn test_sink_decouple_err_isolation() -> Result<()> { session.run("set sink_decouple = true").await?; session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + if test_sink.parallelism_counter.load(Relaxed) != 6 { + return Err(anyhow!( + "incorrect initial parallelism: {} ", + test_sink.parallelism_counter.load(Relaxed) + )); + } test_sink.set_err_rate(0.002); @@ -81,7 +87,13 @@ async fn test_sink_error_event_logs() -> Result<()> { session.run("set sink_decouple = true").await?; session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + if test_sink.parallelism_counter.load(Relaxed) != 6 { + return Err(anyhow!( + "incorrect initial parallelism: {} ", + test_sink.parallelism_counter.load(Relaxed) + )); + } test_sink.store.wait_for_err(1).await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs index 6b4f71d7d508..df047961e36d 100644 --- a/src/tests/simulation/tests/integration_tests/sink/recovery.rs +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -71,7 +71,12 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + if test_sink.parallelism_counter.load(Relaxed) != 6 { + return Err(anyhow!( + "incorrect initial parallelism: {} ", + test_sink.parallelism_counter.load(Relaxed) + )); + } let count = test_source.id_list.len(); diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 99c3b7e9ebc5..5427a3452ec7 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -73,7 +73,13 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - assert_eq!(6, test_sink.parallelism_counter.load(Relaxed)); + + if test_sink.parallelism_counter.load(Relaxed) != 6 { + return Err(anyhow!( + "incorrect initial parallelism: {} ", + test_sink.parallelism_counter.load(Relaxed) + )); + } let mut sink_fragments = cluster .locate_fragments([identity_contains("Sink")]) From 1411059515d61f54a4704adf66dbd11d2af9f3f2 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 18 Nov 2024 14:47:16 +0800 Subject: [PATCH 2/3] fix compile --- src/tests/simulation/tests/integration_tests/sink/basic.rs | 2 +- .../simulation/tests/integration_tests/sink/err_isolation.rs | 4 ++-- src/tests/simulation/tests/integration_tests/sink/recovery.rs | 2 +- src/tests/simulation/tests/integration_tests/sink/scale.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index be6212f836df..2ba6d5408421 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -46,7 +46,7 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> { session.run(CREATE_SINK).await?; if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow!( + return Err(anyhow::anyhow!( "incorrect initial parallelism: {} ", test_sink.parallelism_counter.load(Relaxed) )); diff --git a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs index a427c256a22a..af3cab0ad53f 100644 --- a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs +++ b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs @@ -42,7 +42,7 @@ async fn test_sink_decouple_err_isolation() -> Result<()> { session.run(CREATE_SINK).await?; if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow!( + return Err(anyhow::anyhow!( "incorrect initial parallelism: {} ", test_sink.parallelism_counter.load(Relaxed) )); @@ -89,7 +89,7 @@ async fn test_sink_error_event_logs() -> Result<()> { session.run(CREATE_SINK).await?; if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow!( + return Err(anyhow::anyhow!( "incorrect initial parallelism: {} ", test_sink.parallelism_counter.load(Relaxed) )); diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs index df047961e36d..54b27de6eb2c 100644 --- a/src/tests/simulation/tests/integration_tests/sink/recovery.rs +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -72,7 +72,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow!( + return Err(anyhow::anyhow!( "incorrect initial parallelism: {} ", test_sink.parallelism_counter.load(Relaxed) )); diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 5427a3452ec7..71bc7a998783 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -75,7 +75,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { session.run(CREATE_SINK).await?; if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow!( + return Err(anyhow::anyhow!( "incorrect initial parallelism: {} ", test_sink.parallelism_counter.load(Relaxed) )); From e417e80a402755f8355e56885905e6ab5fddfb1b Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 18 Nov 2024 15:48:17 +0800 Subject: [PATCH 3/3] fix --- ci/scripts/deterministic-it-test.sh | 1 - ci/workflows/main-cron.yml | 2 +- .../tests/integration_tests/sink/basic.rs | 8 +------- .../integration_tests/sink/err_isolation.rs | 16 ++-------------- .../tests/integration_tests/sink/recovery.rs | 7 +------ .../tests/integration_tests/sink/scale.rs | 8 +------- .../tests/integration_tests/sink/utils.rs | 9 +++++++++ 7 files changed, 15 insertions(+), 36 deletions(-) diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index e7bf5341732b..40288f5848b1 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -22,7 +22,6 @@ echo "--- Run integration tests in deterministic simulation mode" seq "$TEST_NUM" | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ cargo nextest run \ --no-fail-fast \ - --no-capture \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ $TEST_PATTERN 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log" diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index e8a0fa32f101..5ee988110ca5 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -388,7 +388,7 @@ steps: # Ddl statements will randomly run with background_ddl. - label: "background_ddl, arrangement_backfill recovery test (madsim)" key: "background-ddl-arrangement-backfill-recovery-test-deterministic" - command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 65m ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 70m ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/src/tests/simulation/tests/integration_tests/sink/basic.rs b/src/tests/simulation/tests/integration_tests/sink/basic.rs index 2ba6d5408421..4899b7d152d8 100644 --- a/src/tests/simulation/tests/integration_tests/sink/basic.rs +++ b/src/tests/simulation/tests/integration_tests/sink/basic.rs @@ -44,13 +44,7 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; let internal_tables = session.run("show internal tables").await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs index af3cab0ad53f..0307fc671e02 100644 --- a/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs +++ b/src/tests/simulation/tests/integration_tests/sink/err_isolation.rs @@ -40,13 +40,7 @@ async fn test_sink_decouple_err_isolation() -> Result<()> { session.run("set sink_decouple = true").await?; session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; test_sink.set_err_rate(0.002); @@ -87,13 +81,7 @@ async fn test_sink_error_event_logs() -> Result<()> { session.run("set sink_decouple = true").await?; session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; test_sink.store.wait_for_err(1).await?; diff --git a/src/tests/simulation/tests/integration_tests/sink/recovery.rs b/src/tests/simulation/tests/integration_tests/sink/recovery.rs index 54b27de6eb2c..124f0b0d9fe5 100644 --- a/src/tests/simulation/tests/integration_tests/sink/recovery.rs +++ b/src/tests/simulation/tests/integration_tests/sink/recovery.rs @@ -71,12 +71,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; let count = test_source.id_list.len(); diff --git a/src/tests/simulation/tests/integration_tests/sink/scale.rs b/src/tests/simulation/tests/integration_tests/sink/scale.rs index 71bc7a998783..9ecff238fb03 100644 --- a/src/tests/simulation/tests/integration_tests/sink/scale.rs +++ b/src/tests/simulation/tests/integration_tests/sink/scale.rs @@ -73,13 +73,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> { } session.run(CREATE_SOURCE).await?; session.run(CREATE_SINK).await?; - - if test_sink.parallelism_counter.load(Relaxed) != 6 { - return Err(anyhow::anyhow!( - "incorrect initial parallelism: {} ", - test_sink.parallelism_counter.load(Relaxed) - )); - } + test_sink.wait_initial_parallelism(6).await?; let mut sink_fragments = cluster .locate_fragments([identity_contains("Sink")]) diff --git a/src/tests/simulation/tests/integration_tests/sink/utils.rs b/src/tests/simulation/tests/integration_tests/sink/utils.rs index bef5bdfa35d0..9c93b388dca5 100644 --- a/src/tests/simulation/tests/integration_tests/sink/utils.rs +++ b/src/tests/simulation/tests/integration_tests/sink/utils.rs @@ -39,6 +39,7 @@ use risingwave_connector::source::test_source::{ registry_test_source, BoxSource, TestSourceRegistryGuard, TestSourceSplit, }; use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration}; +use tokio::task::yield_now; use tokio::time::sleep; use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert}; @@ -244,6 +245,14 @@ impl SimulationTestSink { let err_rate = u32::MAX as f64 * err_rate; self.err_rate.store(err_rate as _, Relaxed); } + + pub async fn wait_initial_parallelism(&self, parallelism: usize) -> Result<()> { + while self.parallelism_counter.load(Relaxed) < parallelism { + yield_now().await; + } + assert_eq!(self.parallelism_counter.load(Relaxed), parallelism); + Ok(()) + } } pub fn build_stream_chunk(