From 679413a58a4418db2a3bee0bcf74ec5b40e4ca0b Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 28 Nov 2023 13:40:24 +0800 Subject: [PATCH] test: support `background_ddl_rate` in recovery test (#13552) --- Cargo.lock | 2 + ci/scripts/deterministic-recovery-test.sh | 11 +- ci/workflows/main-cron.yml | 19 +- ci/workflows/pull-request.yml | 2 +- e2e_test/background_ddl/sim/basic.slt | 12 +- src/frontend/src/handler/create_mv.rs | 21 +- src/meta/src/manager/catalog/fragment.rs | 12 +- src/tests/simulation/Cargo.toml | 2 + src/tests/simulation/src/lib.rs | 2 + src/tests/simulation/src/main.rs | 7 +- src/tests/simulation/src/slt.rs | 317 +++++++++++++++++++--- 11 files changed, 346 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fd7d800b10f1..624b4beb9f42f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8383,6 +8383,7 @@ dependencies = [ "cfg-or-panic", "clap", "console", + "expect-test", "fail", "futures", "glob", @@ -8398,6 +8399,7 @@ dependencies = [ "pretty_assertions", "prometheus", "rand", + "rand_chacha", "risingwave_common", "risingwave_compactor", "risingwave_compute", diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index c5f89a2bbc7e0..b1c8937ad8e8c 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -19,18 +19,17 @@ export LOGDIR=.risingwave/log mkdir -p $LOGDIR -# FIXME(kwannoel): Why is this failing? -# echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl" -# seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' +echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl" +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/background_ddl/sim/basic.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log' +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log' +seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --background-ddl-rate=${BACKGROUND_DDL_RATE} ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink" seq $TEST_NUM | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill --kill-rate=${KILL_RATE} --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/recovery-source-{}.log && rm $LOGDIR/recovery-source-{}.log' diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index ee92ccb90a7fd..e1501e908f124 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -293,7 +293,24 @@ steps: retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=12 KILL_RATE=1.0 timeout 55m ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 55m ci/scripts/deterministic-recovery-test.sh" + if: | + !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null + || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" + || build.env("CI_STEPS") =~ /(^|,)recovery-tests?-deterministic-simulation(,|$$)/ + depends_on: "build-simulation" + plugins: + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs + timeout_in_minutes: 60 + retry: *auto-retry + + # Ddl statements will randomly run with background_ddl. + - label: "background_ddl recovery test (deterministic simulation)" + command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 timeout 55m ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index df4a12ba8ac8c..7b43ccb77a2b1 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -410,7 +410,7 @@ steps: retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=8 KILL_RATE=0.5 ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/e2e_test/background_ddl/sim/basic.slt b/e2e_test/background_ddl/sim/basic.slt index b58dd36604c06..35f5814fe8b4f 100644 --- a/e2e_test/background_ddl/sim/basic.slt +++ b/e2e_test/background_ddl/sim/basic.slt @@ -13,6 +13,9 @@ INSERT INTO t select * from generate_series(1, 200000); statement ok FLUSH; +statement ok +SET STREAMING_RATE_LIMIT=4000; + statement ok CREATE MATERIALIZED VIEW m1 as SELECT * FROM t; @@ -26,23 +29,20 @@ CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; statement error CREATE MATERIALIZED VIEW m3 as SELECT * FROM t; -# Wait for background ddl to finish -sleep 30s - query I select count(*) from m1; ---- -10000000 +200000 query I select count(*) from m2; ---- -10000000 +200000 query I select count(*) from m3; ---- -10000000 +200000 statement ok DROP MATERIALIZED VIEW m1; diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index f42bb1d9bb284..bea68d1bf4d8d 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -20,6 +20,7 @@ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::catalog::{CreateType, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; +use risingwave_pb::stream_plan::StreamScanType; use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query}; use super::privilege::resolve_relation_privileges; @@ -164,7 +165,7 @@ pub async fn handle_create_mv( return Ok(resp); } - let (mut table, graph) = { + let (mut table, graph, can_run_in_background) = { let context = OptimizerContext::from_handler_args(handler_args); if !context.with_options().is_empty() { // get other useful fields by `remove`, the logic here is to reject unknown options. @@ -183,6 +184,20 @@ It only indicates the physical clustering of the data, which may improve the per let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; + // All leaf nodes must be stream table scan, no other scan operators support recovery. + fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool { + if plan.inputs().is_empty() { + if let Some(scan) = plan.as_stream_table_scan() { + scan.stream_scan_type() == StreamScanType::Backfill + } else { + false + } + } else { + assert!(!plan.inputs().is_empty()); + plan.inputs().iter().all(plan_has_backfill_leaf_nodes) + } + } + let can_run_in_background = plan_has_backfill_leaf_nodes(&plan); let context = plan.plan_base().ctx().clone(); let mut graph = build_graph(plan); graph.parallelism = @@ -196,7 +211,7 @@ It only indicates the physical clustering of the data, which may improve the per let env = graph.env.as_mut().unwrap(); env.timezone = context.get_session_timezone(); - (table, graph) + (table, graph, can_run_in_background) }; // Ensure writes to `StreamJobTracker` are atomic. @@ -212,7 +227,7 @@ It only indicates the physical clustering of the data, which may improve the per )); let run_in_background = session.config().background_ddl(); - let create_type = if run_in_background { + let create_type = if run_in_background && can_run_in_background { CreateType::Background } else { CreateType::Foreground diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 570813a7ab53a..22983b3a89824 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -33,6 +33,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode, + StreamScanType, }; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -177,13 +178,14 @@ impl FragmentManager { let map = &self.core.read().await.table_fragments; let mut table_map = HashMap::new(); // TODO(kwannoel): Can this be unified with `PlanVisitor`? - fn has_stream_scan(stream_node: &StreamNode) -> bool { - let is_node_scan = if let Some(node) = &stream_node.node_body { - node.is_stream_scan() + fn has_backfill(stream_node: &StreamNode) -> bool { + let is_backfill = if let Some(node) = &stream_node.node_body + && let Some(node) = node.as_stream_scan() { + node.stream_scan_type == StreamScanType::Backfill as i32 } else { false }; - is_node_scan || stream_node.get_input().iter().any(has_stream_scan) + is_backfill || stream_node.get_input().iter().any(has_backfill) } for table_id in table_ids { if let Some(table_fragment) = map.get(table_id) { @@ -191,7 +193,7 @@ impl FragmentManager { for fragment in table_fragment.fragments.values() { for actor in &fragment.actors { if let Some(node) = &actor.nodes - && has_stream_scan(node) + && has_backfill(node) { actors.insert(actor.actor_id); } else { diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 72eeca9837bf0..7783cfbcf0aba 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -18,6 +18,7 @@ cfg-or-panic = "0.2" clap = { version = "4", features = ["derive"] } console = "0.15" etcd-client = { workspace = true } +expect-test = "1" fail = { version = "0.5" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" @@ -29,6 +30,7 @@ pin-project = "1.1" pretty_assertions = "1" prometheus = { version = "0.13" } rand = "0.8" +rand_chacha = { version = "0.3.1" } rdkafka = { workspace = true } risingwave_common = { workspace = true } risingwave_compactor = { workspace = true } diff --git a/src/tests/simulation/src/lib.rs b/src/tests/simulation/src/lib.rs index 6cf880d7d66fb..5afd68c4f97c8 100644 --- a/src/tests/simulation/src/lib.rs +++ b/src/tests/simulation/src/lib.rs @@ -15,6 +15,8 @@ #![feature(trait_alias)] #![feature(lint_reasons)] #![feature(lazy_cell)] +#![feature(let_chains)] +#![feature(try_blocks)] pub mod client; pub mod cluster; diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index a3aa4ca056415..c0c996dc05e5d 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -139,6 +139,11 @@ pub struct Args { #[arg(short, long)] e2e_extended_test: bool, + + /// Background ddl + /// The probability of background ddl for a ddl query. + #[clap(long, default_value = "0.0")] + background_ddl_rate: f64, } #[tokio::main] @@ -245,7 +250,7 @@ async fn main() { if let Some(jobs) = args.jobs { run_parallel_slt_task(glob, jobs).await.unwrap(); } else { - run_slt_task(cluster0, glob, &kill_opts).await; + run_slt_task(cluster0, glob, &kill_opts, args.background_ddl_rate).await; } }) .await; diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 9df28c1ee24ff..b38b382cb47a0 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -16,8 +16,11 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; -use rand::{thread_rng, Rng}; -use sqllogictest::ParallelTestError; +use anyhow::{bail, Result}; +use itertools::Itertools; +use rand::{thread_rng, Rng, SeedableRng}; +use rand_chacha::ChaChaRng; +use sqllogictest::{ParallelTestError, Record}; use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; @@ -29,9 +32,20 @@ fn is_create_table_as(sql: &str) -> bool { parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as" } -#[derive(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] enum SqlCmd { - Create { is_create_table_as: bool }, + /// Other create statements. + Create { + is_create_table_as: bool, + }, + /// Create Materialized views + CreateMaterializedView { + name: String, + }, + /// Set background ddl + SetBackgroundDdl { + enable: bool, + }, Drop, Dml, Flush, @@ -59,16 +73,54 @@ impl SqlCmd { } fn extract_sql_command(sql: &str) -> SqlCmd { - let cmd = sql - .trim_start() - .split_once(' ') - .unwrap_or_default() - .0 - .to_lowercase(); - match cmd.as_str() { - "create" => SqlCmd::Create { - is_create_table_as: is_create_table_as(sql), - }, + let sql = sql.to_lowercase(); + let tokens = sql.split_whitespace(); + let mut tokens = tokens.multipeek(); + let first_token = tokens.next().unwrap_or(""); + + match first_token { + // NOTE(kwannoel): + // It's entirely possible for a malformed command to be parsed as `SqlCmd::Create`. + // BUT an error should be expected for such a test. + // So we don't need to handle this case. + // Eventually if there are too many edge cases, we can opt to use our parser. + "create" => { + let result: Option = try { + match tokens.next()? { + "materialized" => { + // view + tokens.next()?; + + // if not exists | name + let next = *tokens.peek()?; + if "if" == next + && let Some("not") = tokens.peek().cloned() + && let Some("exists") = tokens.peek().cloned() { + tokens.next(); + tokens.next(); + tokens.next(); + let name = tokens.next()?.to_string(); + SqlCmd::CreateMaterializedView { name } + } else { + let name = next.to_string(); + SqlCmd::CreateMaterializedView { name } + } + } + _ => SqlCmd::Create { + is_create_table_as: is_create_table_as(&sql), + }, + } + }; + result.unwrap_or(SqlCmd::Others) + } + "set" => { + if sql.contains("background_ddl") { + let enable = sql.contains("true"); + SqlCmd::SetBackgroundDdl { enable } + } else { + SqlCmd::Others + } + } "drop" => SqlCmd::Drop, "insert" | "update" | "delete" => SqlCmd::Dml, "flush" => SqlCmd::Flush, @@ -90,8 +142,46 @@ const KILL_IGNORE_FILES: &[&str] = &[ "transaction/tolerance.slt", ]; +/// Wait for background mv to finish creating +async fn wait_background_mv_finished(mview_name: &str) -> Result<()> { + let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else { + bail!("failed to connect to frontend for {mview_name}"); + }; + let client = rw.pg_client(); + if client.simple_query("WAIT;").await.is_err() { + bail!("failed to wait for background mv to finish creating for {mview_name}"); + } + + let Ok(result) = client + .query( + "select count(*) from pg_matviews where matviewname=$1;", + &[&mview_name], + ) + .await + else { + bail!("failed to query pg_matviews for {mview_name}"); + }; + + match result[0].try_get::<_, i64>(0) { + Ok(1) => Ok(()), + r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"), + } +} + /// Run the sqllogictest files in `glob`. -pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { +pub async fn run_slt_task( + cluster: Arc, + glob: &str, + opts: &KillOpts, + // Probability of background_ddl being set to true per ddl record. + background_ddl_rate: f64, +) { + tracing::info!("background_ddl_rate: {}", background_ddl_rate); + let seed = std::env::var("MADSIM_TEST_SEED") + .unwrap_or("0".to_string()) + .parse::() + .unwrap(); + let mut rng = ChaChaRng::seed_from_u64(seed); let kill = opts.kill_compute || opts.kill_meta || opts.kill_frontend || opts.kill_compactor; let files = glob::glob(glob).expect("failed to read glob pattern"); for file in files { @@ -109,6 +199,10 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt")) .then(|| hack_kafka_test(path)); let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path); + + // NOTE(kwannoel): For background ddl + let mut background_ddl_enabled = false; + for record in sqllogictest::parse_file(path).expect("failed to parse file") { // uncomment to print metrics for task counts // let metrics = madsim::runtime::Handle::current().metrics(); @@ -138,6 +232,32 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { | sqllogictest::Record::Query { sql, .. } => extract_sql_command(sql), _ => SqlCmd::Others, }; + tracing::debug!(?cmd, "Running"); + + if matches!(cmd, SqlCmd::SetBackgroundDdl { .. }) && background_ddl_rate > 0.0 { + panic!("We cannot run background_ddl statement with background_ddl_rate > 0.0, since it could be reset"); + } + + // For each background ddl compatible statement, provide a chance for background_ddl=true. + if let Record::Statement { + loc, + conditions, + connection, + .. + } = &record + && matches!(cmd, SqlCmd::CreateMaterializedView { .. }) { + let background_ddl_setting = rng.gen_bool(background_ddl_rate); + let set_background_ddl = Record::Statement { + loc: loc.clone(), + conditions: conditions.clone(), + connection: connection.clone(), + expected_error: None, + sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"), + expected_count: None, + }; + tester.run_async(set_background_ddl).await.unwrap(); + background_ddl_enabled = background_ddl_setting; + }; if cmd.ignore_kill() { for i in 0usize.. { @@ -179,9 +299,15 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { } else { None }; + // retry up to 5 times until it succeed + let max_retry = 5; for i in 0usize.. { + tracing::debug!(iteration = i, "retry count"); let delay = Duration::from_secs(1 << i); + if i > 0 { + tokio::time::sleep(delay).await; + } match tester .run_async(record.clone()) .timed(|_res, elapsed| { @@ -189,34 +315,93 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { }) .await { - Ok(_) => break, - // allow 'table exists' error when retry CREATE statement - Err(e) - if matches!( - cmd, - SqlCmd::Create { - is_create_table_as: false + Ok(_) => { + // For background ddl + if let SqlCmd::CreateMaterializedView { ref name } = cmd && background_ddl_enabled + && matches!(record, Record::Statement { expected_error: None, .. } | Record::Query { expected_error: None, ..}) + { + tracing::debug!(iteration=i, "Retry for background ddl"); + match wait_background_mv_finished(name).await { + Ok(_) => { + tracing::debug!(iteration=i, "Record with background_ddl {:?} finished", record); + break; + } + Err(err) => { + tracing::error!(iteration=i, ?err, "failed to wait for background mv to finish creating"); + if i >= max_retry { + panic!("failed to run test after retry {i} times, error={err:#?}"); + } + continue; + } } - ) && i != 0 - && e.to_string().contains("exists") - && e.to_string().contains("Catalog error") => - { - break + } + break; } - // allow 'not found' error when retry DROP statement - Err(e) - if cmd == SqlCmd::Drop - && i != 0 - && e.to_string().contains("not found") - && e.to_string().contains("Catalog error") => - { - break + Err(e) => { + match cmd { + // allow 'table exists' error when retry CREATE statement + SqlCmd::Create { + is_create_table_as: false, + } + | SqlCmd::CreateMaterializedView { .. } + if i != 0 + && e.to_string().contains("exists") + && e.to_string().contains("Catalog error") => + { + break + } + // allow 'not found' error when retry DROP statement + SqlCmd::Drop + if i != 0 + && e.to_string().contains("not found") + && e.to_string().contains("Catalog error") => + { + break + } + + // Keep i >= max_retry for other errors. Since these errors indicate that the MV might not yet be created. + _ if i >= max_retry => { + panic!("failed to run test after retry {i} times: {e}") + } + SqlCmd::CreateMaterializedView { ref name } + if i != 0 + && e.to_string().contains("table is in creating procedure") + && background_ddl_enabled => + { + tracing::debug!(iteration = i, name, "Retry for background ddl"); + match wait_background_mv_finished(name).await { + Ok(_) => { + tracing::debug!( + iteration = i, + "Record with background_ddl {:?} finished", + record + ); + break; + } + Err(err) => { + tracing::error!( + iteration = i, + ?err, + "failed to wait for background mv to finish creating" + ); + if i >= max_retry { + panic!("failed to run test after retry {i} times, error={err:#?}"); + } + continue; + } + } + } + _ => tracing::error!( + iteration = i, + "failed to run test: {e}\nretry after {delay:?}" + ), + } } - Err(e) if i >= 5 => panic!("failed to run test after retry {i} times: {e}"), - Err(e) => tracing::error!("failed to run test: {e}\nretry after {delay:?}"), } - tokio::time::sleep(delay).await; } + if let SqlCmd::SetBackgroundDdl { enable } = cmd { + background_ddl_enabled = enable; + }; if let Some(handle) = handle { handle.await.unwrap(); } @@ -278,7 +463,17 @@ fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile { #[cfg(test)] mod tests { + use std::fmt::Debug; + + use expect_test::{expect, Expect}; + use super::*; + + fn check(actual: impl Debug, expect: Expect) { + let actual = format!("{:#?}", actual); + expect.assert_eq(&actual); + } + #[test] fn test_is_create_table_as() { assert!(is_create_table_as(" create table xx as select 1;")); @@ -287,4 +482,50 @@ mod tests { )); assert!(!is_create_table_as(" create view xx as select 1;")); } + + #[test] + fn test_extract_sql_command() { + check( + extract_sql_command("create table t as select 1;"), + expect![[r#" + Create { + is_create_table_as: true, + }"#]], + ); + check( + extract_sql_command(" create table t (a int);"), + expect![[r#" + Create { + is_create_table_as: false, + }"#]], + ); + check( + extract_sql_command(" create materialized view m_1 as select 1;"), + expect![[r#" + CreateMaterializedView { + name: "m_1", + }"#]], + ); + check( + extract_sql_command("set background_ddl= true;"), + expect![[r#" + SetBackgroundDdl { + enable: true, + }"#]], + ); + check( + extract_sql_command("SET BACKGROUND_DDL=true;"), + expect![[r#" + SetBackgroundDdl { + enable: true, + }"#]], + ); + check( + extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"), + expect![[r#" + CreateMaterializedView { + name: "m_1", + }"#]], + ) + } }