Skip to content

Commit

Permalink
make it configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 12, 2024
1 parent 483e27a commit c96200a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 36 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ steps:
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "TEST_NUM=4 ci/scripts/deterministic-e2e-test.sh"
command: "TEST_NUM=4 RW_SIM_RANDOM_VNODE_COUNT=true ci/scripts/deterministic-e2e-test.sh"
if: |
!(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation"
Expand Down
11 changes: 10 additions & 1 deletion src/tests/simulation/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ pub struct Args {
/// Use arrangement backfill
#[clap(long, default_value = "false")]
use_arrangement_backfill: bool,

/// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
#[clap(long, env = "RW_SIM_RANDOM_VNODE_COUNT")]
random_vnode_count: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -246,7 +250,12 @@ async fn main() {
if let Some(jobs) = args.jobs {
run_parallel_slt_task(glob, jobs).await.unwrap();
} else {
run_slt_task(cluster0, glob, &kill_opts, args.background_ddl_rate).await;
let opts = Opts {
kill_opts,
background_ddl_rate: args.background_ddl_rate,
random_vnode_count: args.random_vnode_count,
};
run_slt_task(cluster0, glob, opts).await;
}
})
.await;
Expand Down
81 changes: 47 additions & 34 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,34 @@ async fn wait_background_mv_finished(mview_name: &str) -> Result<()> {
}
}

pub struct Opts {
pub kill_opts: KillOpts,
/// Probability of `background_ddl` being set to true per ddl record.
pub background_ddl_rate: f64,
/// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
pub random_vnode_count: bool,
}

/// Run the sqllogictest files in `glob`.
pub async fn run_slt_task(
cluster: Arc<Cluster>,
glob: &str,
opts: &KillOpts,
// Probability of background_ddl being set to true per ddl record.
background_ddl_rate: f64,
Opts {
kill_opts,
background_ddl_rate,
random_vnode_count,
}: Opts,
) {
tracing::info!("background_ddl_rate: {}", background_ddl_rate);
let seed = std::env::var("MADSIM_TEST_SEED")
.unwrap_or("0".to_string())
.parse::<u64>()
.unwrap();
let mut rng = ChaChaRng::seed_from_u64(seed);
let kill = opts.kill_compute || opts.kill_meta || opts.kill_frontend || opts.kill_compactor;
let kill = kill_opts.kill_compute
|| kill_opts.kill_meta
|| kill_opts.kill_frontend
|| kill_opts.kill_compactor;
let files = glob::glob(glob).expect("failed to read glob pattern");
for file in files {
// use a session per file
Expand All @@ -240,16 +253,19 @@ pub async fn run_slt_task(
let mut manual_background_ddl_enabled = false;

let records = sqllogictest::parse_file(path).expect("failed to parse file");
let can_use_random_vnode_count = records.iter().all(|record| {
if let Record::Statement { sql, .. } = record
&& sql.to_lowercase().contains("parallelism")
{
tracing::debug!(path = %path.display(), "skip random vnode count");
false
} else {
true
}
});
let random_vnode_count = random_vnode_count
// Skip using random vnode count if the test case cares about parallelism, including
// setting parallelism manually or checking the parallelism with system tables.
&& records.iter().all(|record| {
if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
&& sql.to_lowercase().contains("parallelism")
{
println!("[RANDOM VNODE COUNT] skip: {}", path.display());
false
} else {
true
}
});

for record in records {
// uncomment to print metrics for task counts
Expand All @@ -268,25 +284,23 @@ pub async fn run_slt_task(

// For normal records.
if !kill {
if let Record::Statement {
loc,
conditions,
connection,
..
} = &record
// Set random vnode count if needed.
if random_vnode_count
&& cmd.is_create()
&& can_use_random_vnode_count
&& let Record::Statement {
loc,
conditions,
connection,
..
} = &record
{
let sql = format!(
"SET STREAMING_MAX_PARALLELISM = {};",
((2..=64)
.chain(224..=288)
.chain(992..=1056)
.chain(std::iter::once(32768)))
let vnode_count = (2..=64) // small
.chain(224..=288) // normal
.chain(992..=1056) // 1024 affects row id gen behavior
.choose(&mut thread_rng())
.unwrap()
);
println!("[BUGEN] setting vnode count: {sql}");
.unwrap();
let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
println!("[RANDOM VNODE COUNT] set: {vnode_count}");
let set_random_vnode_count = Record::Statement {
loc: loc.clone(),
conditions: conditions.clone(),
Expand All @@ -295,8 +309,7 @@ pub async fn run_slt_task(
expected: StatementExpect::Ok,
};
tester.run_async(set_random_vnode_count).await.unwrap();

println!("[BUGEN] running record: {record}");
println!("[RANDOM VNODE COUNT] run: {record}");
}

match tester
Expand Down Expand Up @@ -383,11 +396,11 @@ pub async fn run_slt_task(
continue;
}

let should_kill = thread_rng().gen_bool(opts.kill_rate as f64);
let should_kill = thread_rng().gen_bool(kill_opts.kill_rate as f64);
// spawn a background task to kill nodes
let handle = if should_kill {
let cluster = cluster.clone();
let opts = *opts;
let opts = kill_opts;
Some(tokio::spawn(async move {
let t = thread_rng().gen_range(Duration::default()..Duration::from_secs(1));
tokio::time::sleep(t).await;
Expand Down

0 comments on commit c96200a

Please sign in to comment.