diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index f281eaa467bfd..1c43bd3faae97 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -5,6 +5,9 @@ set -euo pipefail source ci/scripts/common.sh +export LOGDIR=.risingwave/log +mkdir -p $LOGDIR + echo "--- Download artifacts" buildkite-agent artifact download simulation-it-test.tar.zst . @@ -13,10 +16,12 @@ tar -xvf simulation-it-test.tar.zst mkdir target/sim mv target/ci-sim target/sim +TEST_PATTERN="$@" + echo "--- Run integration tests in deterministic simulation mode" -seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ +seq $TEST_NUM | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ cargo nextest run \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ --binaries-metadata target/nextest/binaries-metadata.json \ - "$@" + $TEST_PATTERN 1>$LOGDIR/deterministic-it-test-{}.log 2>&1 && rm $LOGDIR/deterministic-it-test-{}.log" diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index d0d5eafb3c917..a5bac061e8831 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -117,214 +117,6 @@ test_snapshot_and_upstream_read() { cargo make wait-processes-exit } -# Test background ddl recovery -test_background_ddl_recovery() { - echo "--- e2e, $CLUSTER_PROFILE, test_background_ddl_recovery" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - OLD_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - # Restart - restart_cluster - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - # Recover the mview progress - sleep 5 - - NEW_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - if [[ ${OLD_PROGRESS%.*} -le ${NEW_PROGRESS%.*} ]]; then - echo "OK: $OLD_PROGRESS smaller or equal to $NEW_PROGRESS" - else - echo "FAILED: $OLD_PROGRESS larger than $NEW_PROGRESS" - exit 1 - fi - - sleep 60 - - # Test after backfill finished - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - # After cluster restart(s), backfilled mv should still be present. - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -test_background_ddl_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test background ddl" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - - # Restart - restart_cluster - - # Recover - sleep 3 - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - # After cancel should be able to create MV - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -# Test foreground ddl should not recover -test_foreground_ddl_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_ddl_cancel" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - run_sql "CREATE MATERIALIZED VIEW m1 as select * FROM t;" & - sleep 1 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_fg_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -# Test foreground ddl should not recover -test_foreground_ddl_no_recover() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_ddl_no_recover" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - run_sql "CREATE MATERIALIZED VIEW m1 as select * FROM t;" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - - # Restart - restart_cluster - - # Leave sometime for recovery - sleep 5 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - sleep 30 - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -test_foreground_index_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_index_cancel" - cargo make ci-start $CLUSTER_PROFILE - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - - # Test cancel - run_sql "CREATE INDEX i ON t (v1);" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - # Test index over recovery - run_sql "CREATE INDEX i ON t (v1);" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - - # Restart - restart_cluster - - # Leave sometime for recovery - sleep 5 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_index.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_index.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - -test_foreground_sink_cancel() { - echo "--- e2e, $CLUSTER_PROFILE, test_foreground_sink_ddl_cancel" - cargo make ci-start $CLUSTER_PROFILE - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - - # Test cancel - run_sql "CREATE SINK i FROM t WITH (connector='blackhole');" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - cancel_stream_jobs - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - - # Test sink over recovery - run_sql "CREATE SINK i FROM t WITH (connector='blackhole');" & - sleep 3 - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - - # Restart - restart_cluster - - # Leave sometime for recovery - sleep 5 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_no_jobs.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_sink.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_sink.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - # Lots of upstream tombstone, backfill should still proceed. test_backfill_tombstone() { echo "--- e2e, test_backfill_tombstone" @@ -358,85 +150,10 @@ test_backfill_tombstone() { wait } -test_backfill_restart_cn_recovery() { - echo "--- e2e, $CLUSTER_PROFILE, test_background_restart_cn_recovery" - cargo make ci-start $CLUSTER_PROFILE - - # Test before recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_table.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/create_bg_mv.slt" - sleep 1 - OLD_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - # Restart 1 CN - restart_cn - - # Give some time to recover. - sleep 3 - - # Test after recovery - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_one_job.slt" - - # Recover the mview progress - sleep 5 - - NEW_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - if [[ ${OLD_PROGRESS%.*} -le ${NEW_PROGRESS%.*} ]]; then - echo "OK: $OLD_PROGRESS smaller or equal to $NEW_PROGRESS" - else - echo "FAILED: $OLD_PROGRESS larger than $NEW_PROGRESS" - exit 1 - fi - - # Trigger a bootstrap recovery - pkill compute-node - kill_cluster - rename_logs_with_prefix "before-restart" - sleep 10 - cargo make dev $CLUSTER_PROFILE - - # Recover mview progress - sleep 5 - - OLD_PROGRESS=$NEW_PROGRESS - NEW_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") - - if [[ ${OLD_PROGRESS%.*} -le ${NEW_PROGRESS%.*} ]]; then - echo "OK: $OLD_PROGRESS smaller or equal to $NEW_PROGRESS" - else - echo "FAILED: $OLD_PROGRESS larger than $NEW_PROGRESS" - exit 1 - fi - - sleep 60 - - # Test after backfill finished - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - # After cluster restart(s), backfilled mv should still be present. - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - restart_cluster - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/validate_backfilled_mv.slt" - - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_mv.slt" - sqllogictest -d dev -h localhost -p 4566 "$COMMON_DIR/drop_table.slt" - - kill_cluster -} - main() { set -euo pipefail test_snapshot_and_upstream_read test_backfill_tombstone - test_background_ddl_recovery - test_background_ddl_cancel - test_foreground_ddl_no_recover - test_foreground_ddl_cancel - test_foreground_index_cancel - test_foreground_sink_cancel - test_backfill_restart_cn_recovery } main diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 6f58222424e18..891cb69ecb98f 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -193,6 +193,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 70 retry: *auto-retry @@ -205,6 +206,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 70 retry: *auto-retry @@ -217,6 +219,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 70 retry: *auto-retry diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 3aaa09f0d7716..bd71c7f7044ef 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -344,6 +344,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true + - ./ci/plugins/upload-failure-logs timeout_in_minutes: 20 retry: *auto-retry diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index d39dde51399d8..654c6359ca1bf 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -206,7 +206,7 @@ struct CheckpointControl { metrics: Arc, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_commands: Vec, + finished_jobs: Vec, } impl CheckpointControl { @@ -218,30 +218,30 @@ impl CheckpointControl { adding_actors: Default::default(), removing_actors: Default::default(), metrics, - finished_commands: Default::default(), + finished_jobs: Default::default(), } } /// Stash a command to finish later. fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_commands.push(finished_job); + self.finished_jobs.push(finished_job); } - /// Finish stashed commands. If the current barrier is not a `checkpoint`, we will not finish - /// the commands that requires a checkpoint, else we will finish all the commands. + /// Finish stashed jobs. + /// If checkpoint, means all jobs can be finished. + /// If not checkpoint, jobs which do not require checkpoint can be finished. /// - /// Returns whether there are still remaining stashed commands to finish. - async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult { - for command in self - .finished_commands - .extract_if(|c| checkpoint || c.is_barrier()) + /// Returns whether there are still remaining stashed jobs to finish. + async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + for job in self + .finished_jobs + .extract_if(|job| checkpoint || !job.is_checkpoint_required()) { // The command is ready to finish. We can now call `pre_finish`. - command.pre_finish().await?; - command.notify_finished(); + job.pre_finish().await?; + job.notify_finished(); } - - Ok(!self.finished_commands.is_empty()) + Ok(!self.finished_jobs.is_empty()) } fn cancel_command(&mut self, cancelled_job: TrackingJob) { @@ -258,7 +258,7 @@ impl CheckpointControl { } fn cancel_stashed_command(&mut self, id: TableId) { - self.finished_commands + self.finished_jobs .retain(|x| x.table_to_create() != Some(id)); } @@ -476,7 +476,7 @@ impl CheckpointControl { tracing::warn!("there are some changes in dropping_tables"); self.dropping_tables.clear(); } - self.finished_commands.clear(); + self.finished_jobs.clear(); } } @@ -1092,9 +1092,7 @@ impl GlobalBarrierManager { checkpoint_control.cancel_stashed_command(table_id); } - let remaining = checkpoint_control - .finish_commands(kind.is_checkpoint()) - .await?; + let remaining = checkpoint_control.finish_jobs(kind.is_checkpoint()).await?; // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if remaining { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 22cd6f8d9e200..2d5a745eaef5e 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -159,10 +159,15 @@ impl TrackingJob { } } - pub(crate) fn is_barrier(&self) -> bool { + /// Returns whether the `TrackingJob` requires a checkpoint to complete. + pub(crate) fn is_checkpoint_required(&self) -> bool { match self { + // Recovered tracking job is always a streaming job, + // It requires a checkpoint to complete. TrackingJob::Recovered(_) => true, - TrackingJob::New(command) => command.context.kind.is_barrier(), + TrackingJob::New(command) => { + command.context.kind.is_initial() || command.context.kind.is_checkpoint() + } } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 780f96552115a..9866061a14628 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1128,7 +1128,8 @@ impl DdlController { } pub async fn wait(&self) -> MetaResult<()> { - for _ in 0..30 * 60 { + let timeout_secs = 30 * 60; + for _ in 0..timeout_secs { if self .catalog_manager .list_creating_background_mvs() @@ -1139,7 +1140,9 @@ impl DdlController { } sleep(Duration::from_secs(1)).await; } - Err(MetaError::cancelled("timeout".into())) + Err(MetaError::cancelled(format!( + "timeout after {timeout_secs}s" + ))) } async fn comment_on(&self, comment: Comment) -> MetaResult { diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 97a9da0ff6a99..550fab11e9e9e 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -164,6 +164,7 @@ where mut old_state, } = Self::recover_backfill_state(self.state_table.as_ref(), pk_in_output_indices.len()) .await?; + tracing::trace!(is_finished, row_count, "backfill state recovered"); let mut builder = DataChunkBuilder::new(self.upstream_table.schema().data_types(), self.chunk_size); diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index c48fdc8a4392f..59a46356101cd 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -54,15 +54,16 @@ impl FlowControlExecutor { let msg = msg?; match msg { Message::Chunk(chunk) => { - let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else { + let chunk_cardinality = chunk.cardinality(); + let Some(n) = NonZeroU32::new(chunk_cardinality as u32) else { // Handle case where chunk is empty continue; }; if let Some(rate_limiter) = &rate_limiter { let result = rate_limiter.until_n_ready(n).await; - if let Err(InsufficientCapacity(n)) = result { + if let Err(InsufficientCapacity(_max_cells)) = result { tracing::error!( - "Rate Limit {:?} smaller than chunk cardinality {n}", + "Rate Limit {:?} smaller than chunk cardinality {chunk_cardinality}", self.rate_limit, ); } diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index b4849c877f8ec..0f8fceb46fd58 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -17,33 +17,24 @@ use std::time::Duration; use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session}; -use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; -const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; +const DROP_TABLE: &str = "DROP TABLE t;"; +const SEED_TABLE_500: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; +const SEED_TABLE_100: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;"; const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;"; const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; +const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;"; async fn kill_cn_and_wait_recover(cluster: &Cluster) { - // Kill it again - for _ in 0..5 { - cluster - .kill_node(&KillOpts { - kill_rate: 1.0, - kill_meta: false, - kill_frontend: false, - kill_compute: true, - kill_compactor: false, - restart_delay_secs: 1, - }) - .await; - sleep(Duration::from_secs(2)).await; - } + cluster + .kill_nodes(["compute-1", "compute-2", "compute-3"], 0) + .await; sleep(Duration::from_secs(10)).await; } @@ -80,80 +71,72 @@ fn init_logger() { .try_init(); } +async fn create_mv(session: &mut Session) -> Result<()> { + session.run(CREATE_MV1).await?; + sleep(Duration::from_secs(2)).await; + Ok(()) +} + #[tokio::test] async fn test_background_mv_barrier_recovery() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); - session.run("CREATE TABLE t1 (v1 int);").await?; - session - .run("INSERT INTO t1 select * from generate_series(1, 400000)") - .await?; - session.run("flush").await?; - session.run("SET BACKGROUND_DDL=true;").await?; + session.run(CREATE_TABLE).await?; session - .run("create materialized view m1 as select * from t1;") + .run("INSERT INTO t SELECT generate_series FROM generate_series(1, 200);") .await?; + session.flush().await?; + session.run(SET_RATE_LIMIT_2).await?; + session.run(SET_BACKGROUND_DDL).await?; + create_mv(&mut session).await?; // If the CN is killed before first barrier pass for the MV, the MV will be dropped. // This is because it's table fragments will NOT be committed until first barrier pass. - sleep(Duration::from_secs(5)).await; - kill_cn_and_wait_recover(&cluster).await; - - // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; - kill_cn_and_wait_recover(&cluster).await; - kill_and_wait_recover(&cluster).await; // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") + session + .run("INSERT INTO t SELECT generate_series FROM generate_series(201, 400);") .await?; - cluster.run("flush;").await?; + session.flush().await?; kill_and_wait_recover(&cluster).await; - kill_cn_and_wait_recover(&cluster).await; - - // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; // Now just wait for it to complete. + session.run(WAIT).await?; - sleep(Duration::from_secs(10)).await; - - session - .run("SELECT COUNT(v1) FROM m1") - .await? - .assert_result_eq("700000"); + let t_count = session.run("SELECT COUNT(v1) FROM t").await?; + let mv1_count = session.run("SELECT COUNT(v1) FROM mv1").await?; + if t_count != mv1_count { + let missing_rows = session + .run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") + .await?; + tracing::debug!(missing_rows); + let missing_rows_with_row_id = session + .run("SELECT _row_id, v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)") + .await?; + tracing::debug!(missing_rows_with_row_id); + } + assert_eq!(t_count, mv1_count); // Make sure that if MV killed and restarted // it will not be dropped. - - session.run("DROP MATERIALIZED VIEW m1").await?; - session.run("DROP TABLE t1").await?; + session.run(DROP_MV1).await?; + session.run(DROP_TABLE).await?; Ok(()) } #[tokio::test] async fn test_background_ddl_cancel() -> Result<()> { - async fn create_mv(session: &mut Session) -> Result<()> { - session.run(CREATE_MV1).await?; - sleep(Duration::from_secs(2)).await; - Ok(()) - } init_logger(); let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; - session.run(SEED_TABLE).await?; + session.run(SEED_TABLE_500).await?; + session.flush().await?; session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?; @@ -194,3 +177,104 @@ async fn test_background_ddl_cancel() -> Result<()> { Ok(()) } + +// When cluster stop, foreground ddl job must be cancelled. +#[tokio::test] +async fn test_foreground_ddl_no_recovery() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE_100).await?; + session.flush().await?; + + let mut session2 = cluster.start_session(); + tokio::spawn(async move { + session2.run(SET_RATE_LIMIT_2).await.unwrap(); + let result = create_mv(&mut session2).await; + assert!(result.is_err()); + }); + + // Wait for job to start + sleep(Duration::from_secs(2)).await; + + // Kill CN should stop the job + kill_cn_and_wait_recover(&cluster).await; + + // Create MV should succeed, since the previous foreground job should be cancelled. + session.run(SET_RATE_LIMIT_2).await?; + create_mv(&mut session).await?; + + session.run(DROP_MV1).await?; + session.run(DROP_TABLE).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_foreground_index_cancel() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE_100).await?; + session.flush().await?; + + let mut session2 = cluster.start_session(); + tokio::spawn(async move { + session2.run(SET_RATE_LIMIT_2).await.unwrap(); + let result = session2.run("CREATE INDEX i ON t (v1);").await; + assert!(result.is_err()); + }); + + // Wait for job to start + sleep(Duration::from_secs(2)).await; + + // Kill CN should stop the job + cancel_stream_jobs(&mut session).await?; + + // Create MV should succeed, since the previous foreground job should be cancelled. + session.run(SET_RATE_LIMIT_2).await?; + session.run("CREATE INDEX i ON t (v1);").await?; + + session.run("DROP INDEX i;").await?; + session.run(DROP_TABLE).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_foreground_sink_cancel() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut session = cluster.start_session(); + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE_100).await?; + session.flush().await?; + + let mut session2 = cluster.start_session(); + tokio::spawn(async move { + session2.run(SET_RATE_LIMIT_2).await.unwrap(); + let result = session2 + .run("CREATE SINK s FROM t WITH (connector='blackhole');") + .await; + assert!(result.is_err()); + }); + + // Wait for job to start + sleep(Duration::from_secs(2)).await; + + // Kill CN should stop the job + cancel_stream_jobs(&mut session).await?; + + // Create MV should succeed, since the previous foreground job should be cancelled. + session.run(SET_RATE_LIMIT_2).await?; + session + .run("CREATE SINK s FROM t WITH (connector='blackhole');") + .await?; + + session.run("DROP SINK s;").await?; + session.run(DROP_TABLE).await?; + + Ok(()) +}