Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
wenym1 committed Nov 18, 2024
1 parent 1411059 commit e417e80
Showing 7 changed files with 15 additions and 36 deletions.
1 change: 0 additions & 1 deletion ci/scripts/deterministic-it-test.sh
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ echo "--- Run integration tests in deterministic simulation mode"
seq "$TEST_NUM" | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \
cargo nextest run \
--no-fail-fast \
--no-capture \
--cargo-metadata target/nextest/cargo-metadata.json \
--binaries-metadata target/nextest/binaries-metadata.json \
$TEST_PATTERN 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log"
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
@@ -388,7 +388,7 @@ steps:
# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (madsim)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 65m ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 70m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
8 changes: 1 addition & 7 deletions src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
@@ -44,13 +44,7 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> {
}
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;

if test_sink.parallelism_counter.load(Relaxed) != 6 {
return Err(anyhow::anyhow!(
"incorrect initial parallelism: {} ",
test_sink.parallelism_counter.load(Relaxed)
));
}
test_sink.wait_initial_parallelism(6).await?;

let internal_tables = session.run("show internal tables").await?;

16 changes: 2 additions & 14 deletions src/tests/simulation/tests/integration_tests/sink/err_isolation.rs
Original file line number Diff line number Diff line change
@@ -40,13 +40,7 @@ async fn test_sink_decouple_err_isolation() -> Result<()> {
session.run("set sink_decouple = true").await?;
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;

if test_sink.parallelism_counter.load(Relaxed) != 6 {
return Err(anyhow::anyhow!(
"incorrect initial parallelism: {} ",
test_sink.parallelism_counter.load(Relaxed)
));
}
test_sink.wait_initial_parallelism(6).await?;

test_sink.set_err_rate(0.002);

@@ -87,13 +81,7 @@ async fn test_sink_error_event_logs() -> Result<()> {
session.run("set sink_decouple = true").await?;
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;

if test_sink.parallelism_counter.load(Relaxed) != 6 {
return Err(anyhow::anyhow!(
"incorrect initial parallelism: {} ",
test_sink.parallelism_counter.load(Relaxed)
));
}
test_sink.wait_initial_parallelism(6).await?;

test_sink.store.wait_for_err(1).await?;

Original file line number Diff line number Diff line change
@@ -71,12 +71,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> {
}
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
if test_sink.parallelism_counter.load(Relaxed) != 6 {
return Err(anyhow::anyhow!(
"incorrect initial parallelism: {} ",
test_sink.parallelism_counter.load(Relaxed)
));
}
test_sink.wait_initial_parallelism(6).await?;

let count = test_source.id_list.len();

8 changes: 1 addition & 7 deletions src/tests/simulation/tests/integration_tests/sink/scale.rs
Original file line number Diff line number Diff line change
@@ -73,13 +73,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> {
}
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;

if test_sink.parallelism_counter.load(Relaxed) != 6 {
return Err(anyhow::anyhow!(
"incorrect initial parallelism: {} ",
test_sink.parallelism_counter.load(Relaxed)
));
}
test_sink.wait_initial_parallelism(6).await?;

let mut sink_fragments = cluster
.locate_fragments([identity_contains("Sink")])
9 changes: 9 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/utils.rs
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ use risingwave_connector::source::test_source::{
registry_test_source, BoxSource, TestSourceRegistryGuard, TestSourceSplit,
};
use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration};
use tokio::task::yield_now;
use tokio::time::sleep;

use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert};
@@ -244,6 +245,14 @@ impl SimulationTestSink {
let err_rate = u32::MAX as f64 * err_rate;
self.err_rate.store(err_rate as _, Relaxed);
}

pub async fn wait_initial_parallelism(&self, parallelism: usize) -> Result<()> {
while self.parallelism_counter.load(Relaxed) < parallelism {
yield_now().await;
}
assert_eq!(self.parallelism_counter.load(Relaxed), parallelism);
Ok(())
}
}

pub fn build_stream_chunk(

0 comments on commit e417e80

Please sign in to comment.