diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 4d3122c486d94..e5d4002a03d40 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -136,7 +136,6 @@ pub struct Args { } #[tokio::main] -#[cfg_or_panic(madsim)] async fn main() { use std::sync::Arc; @@ -186,7 +185,7 @@ async fn main() { cluster.create_kafka_producer(&datadir).await; } - let seed = madsim::runtime::Handle::current().seed(); + let seed = sqlsmith_seed(); if let Some(count) = args.sqlsmith { cluster .run_on_client(async move { @@ -270,3 +269,8 @@ async fn main() { cluster.graceful_shutdown().await; } + +#[cfg_or_panic(madsim)] +fn sqlsmith_seed() -> u64 { + madsim::runtime::Handle::current().seed() +} diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 7bf9d62d19649..e293ad7cd8b17 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -19,6 +19,7 @@ use std::time::Duration; use anyhow::{bail, Result}; use itertools::Itertools; +use rand::seq::IteratorRandom; use rand::{thread_rng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; use sqllogictest::{Condition, ParallelTestError, QueryExpect, Record, StatementExpect}; @@ -85,6 +86,15 @@ impl SqlCmd { // are not transactional, we can't kill during `alter table add/drop columns` for now, will // remove it until transactional commit of table fragment and catalog is supported. } + + fn is_create(&self) -> bool { + matches!( + self, + SqlCmd::Create { .. } + | SqlCmd::CreateSink { .. } + | SqlCmd::CreateMaterializedView { .. } + ) + } } fn extract_sql_command(sql: &str) -> SqlCmd { @@ -229,7 +239,19 @@ pub async fn run_slt_task( // We can revert it back to false only if we encounter a record that sets background_ddl to false. let mut manual_background_ddl_enabled = false; - for record in sqllogictest::parse_file(path).expect("failed to parse file") { + let records = sqllogictest::parse_file(path).expect("failed to parse file"); + let can_use_random_vnode_count = records.iter().all(|record| { + if let Record::Statement { sql, .. } = record + && sql.to_lowercase().contains("max_parallelism") + { + tracing::debug!(path = %path.display(), "skip random vnode count"); + false + } else { + true + } + }); + + for record in records { // uncomment to print metrics for task counts // let metrics = madsim::runtime::Handle::current().metrics(); // println!("{:#?}", metrics); @@ -293,6 +315,32 @@ pub async fn run_slt_task( background_ddl_enabled = background_ddl_setting; }; + if let Record::Statement { + loc, + conditions, + connection, + .. + } = &record + && cmd.is_create() + && can_use_random_vnode_count + { + let sql = format!( + "SET STREAMING_MAX_PARALLELISM = {};", + ((1..=64).chain(224..=288).chain(992..=1056)) + .choose(&mut thread_rng()) + .unwrap() + ); + println!("setting vnode count: {sql}"); + let set_random_vnode_count = Record::Statement { + loc: loc.clone(), + conditions: conditions.clone(), + connection: connection.clone(), + sql, + expected: StatementExpect::Ok, + }; + tester.run_async(set_random_vnode_count).await.unwrap(); + } + if !cmd.allow_kill() { for i in 0usize.. { let delay = Duration::from_secs(1 << i);