From 23cffc5558bf38cb990f399fbc1c2b0a4f1df120 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 7 Nov 2024 15:28:58 +0800 Subject: [PATCH] make it configurable Signed-off-by: Bugen Zhao --- ci/workflows/pull-request.yml | 2 +- src/tests/simulation/src/main.rs | 11 ++++- src/tests/simulation/src/slt.rs | 81 ++++++++++++++++++-------------- 3 files changed, 58 insertions(+), 36 deletions(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 2991397dfb1c5..c878d3f95bfe0 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -569,7 +569,7 @@ steps: retry: *auto-retry - label: "end-to-end test (deterministic simulation)" - command: "TEST_NUM=4 ci/scripts/deterministic-e2e-test.sh" + command: "TEST_NUM=4 RW_SIM_RANDOM_VNODE_COUNT=true ci/scripts/deterministic-e2e-test.sh" if: | !(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation" diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 2c94f1b741886..3b8e6d7b24afa 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -133,6 +133,10 @@ pub struct Args { /// Use arrangement backfill #[clap(long, default_value = "false")] use_arrangement_backfill: bool, + + /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL. + #[clap(long, env = "RW_SIM_RANDOM_VNODE_COUNT")] + random_vnode_count: bool, } #[tokio::main] @@ -246,7 +250,12 @@ async fn main() { if let Some(jobs) = args.jobs { run_parallel_slt_task(glob, jobs).await.unwrap(); } else { - run_slt_task(cluster0, glob, &kill_opts, args.background_ddl_rate).await; + let opts = Opts { + kill_opts, + background_ddl_rate: args.background_ddl_rate, + random_vnode_count: args.random_vnode_count, + }; + run_slt_task(cluster0, glob, opts).await; } }) .await; diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 0ee9f3dcc9dfc..ede789792d1af 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -199,13 +199,23 @@ async fn wait_background_mv_finished(mview_name: &str) -> Result<()> { } } +pub struct Opts { + pub kill_opts: KillOpts, + /// Probability of `background_ddl` being set to true per ddl record. + pub background_ddl_rate: f64, + /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL. + pub random_vnode_count: bool, +} + /// Run the sqllogictest files in `glob`. pub async fn run_slt_task( cluster: Arc, glob: &str, - opts: &KillOpts, - // Probability of background_ddl being set to true per ddl record. - background_ddl_rate: f64, + Opts { + kill_opts, + background_ddl_rate, + random_vnode_count, + }: Opts, ) { tracing::info!("background_ddl_rate: {}", background_ddl_rate); let seed = std::env::var("MADSIM_TEST_SEED") @@ -213,7 +223,10 @@ pub async fn run_slt_task( .parse::() .unwrap(); let mut rng = ChaChaRng::seed_from_u64(seed); - let kill = opts.kill_compute || opts.kill_meta || opts.kill_frontend || opts.kill_compactor; + let kill = kill_opts.kill_compute + || kill_opts.kill_meta + || kill_opts.kill_frontend + || kill_opts.kill_compactor; let files = glob::glob(glob).expect("failed to read glob pattern"); for file in files { // use a session per file @@ -240,16 +253,19 @@ pub async fn run_slt_task( let mut manual_background_ddl_enabled = false; let records = sqllogictest::parse_file(path).expect("failed to parse file"); - let can_use_random_vnode_count = records.iter().all(|record| { - if let Record::Statement { sql, .. } = record - && sql.to_lowercase().contains("parallelism") - { - tracing::debug!(path = %path.display(), "skip random vnode count"); - false - } else { - true - } - }); + let random_vnode_count = random_vnode_count + // Skip using random vnode count if the test case cares about parallelism, including + // setting parallelism manually or checking the parallelism with system tables. + && records.iter().all(|record| { + if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record + && sql.to_lowercase().contains("parallelism") + { + println!("[RANDOM VNODE COUNT] skip: {}", path.display()); + false + } else { + true + } + }); for record in records { // uncomment to print metrics for task counts @@ -268,25 +284,23 @@ pub async fn run_slt_task( // For normal records. if !kill { - if let Record::Statement { - loc, - conditions, - connection, - .. - } = &record + // Set random vnode count if needed. + if random_vnode_count && cmd.is_create() - && can_use_random_vnode_count + && let Record::Statement { + loc, + conditions, + connection, + .. + } = &record { - let sql = format!( - "SET STREAMING_MAX_PARALLELISM = {};", - ((2..=64) - .chain(224..=288) - .chain(992..=1056) - .chain(std::iter::once(32768))) + let vnode_count = (2..=64) // small + .chain(224..=288) // normal + .chain(992..=1056) // 1024 affects row id gen behavior .choose(&mut thread_rng()) - .unwrap() - ); - println!("[BUGEN] setting vnode count: {sql}"); + .unwrap(); + let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};"); + println!("[RANDOM VNODE COUNT] set: {vnode_count}"); let set_random_vnode_count = Record::Statement { loc: loc.clone(), conditions: conditions.clone(), @@ -295,8 +309,7 @@ pub async fn run_slt_task( expected: StatementExpect::Ok, }; tester.run_async(set_random_vnode_count).await.unwrap(); - - println!("[BUGEN] running record: {record}"); + println!("[RANDOM VNODE COUNT] run: {record}"); } match tester @@ -383,11 +396,11 @@ pub async fn run_slt_task( continue; } - let should_kill = thread_rng().gen_bool(opts.kill_rate as f64); + let should_kill = thread_rng().gen_bool(kill_opts.kill_rate as f64); // spawn a background task to kill nodes let handle = if should_kill { let cluster = cluster.clone(); - let opts = *opts; + let opts = kill_opts; Some(tokio::spawn(async move { let t = thread_rng().gen_range(Duration::default()..Duration::from_secs(1)); tokio::time::sleep(t).await;