diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index cf00dfe3dd0a9..9fe0e88c3874c 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -65,7 +65,7 @@ test_background_ddl_recovery() { OLD_PROGRESS=$(run_sql "SHOW JOBS;" | grep -E -o "[0-9]{1,2}\.[0-9]{1,2}") # Restart - cargo make ci-kill + cargo make kill cargo make dev ci-1cn-1fe-with-recovery # Test after recovery @@ -89,7 +89,7 @@ test_background_ddl_recovery() { sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/recovery/drop.slt" - cargo make ci-kill + cargo make kill } # Test foreground ddl should not recover @@ -104,7 +104,7 @@ test_foreground_ddl_no_recover() { sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/validate.slt" # Restart - cargo make ci-kill + cargo make kill cargo make dev ci-1cn-1fe-with-recovery # Leave sometime for recovery @@ -115,13 +115,13 @@ test_foreground_ddl_no_recover() { sqllogictest -d dev -h localhost -p 4566 "$TEST_DIR/background_ddl/foreground/drop.slt" - cargo make ci-kill + cargo make kill } main() { set -euo pipefail # test_snapshot_and_upstream_read - # test_background_ddl_recovery + test_background_ddl_recovery test_foreground_ddl_no_recover } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 2e86fa5ed5077..19c454b5734ec 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -978,6 +978,7 @@ impl GlobalBarrierManager { drop(tracker); for (table, internal_tables, finished) in receivers { let catalog_manager = self.catalog_manager.clone(); + let fragment_manager = self.fragment_manager.clone(); tokio::spawn(async move { let res: MetaResult<()> = try { finished @@ -988,11 +989,15 @@ impl GlobalBarrierManager { // and mark catalog as created and commit to meta. // both of these are done by catalog manager. catalog_manager - .finish_create_table_procedure(internal_tables, table) + .finish_create_table_procedure(internal_tables, table.clone()) .await?; }; if let Err(e) = res.as_ref() { tracing::error!("Failed to finish stream job: {e:?}"); + catalog_manager + .cancel_create_table_procedure(&table, fragment_manager) + .await + .unwrap(); } // FIXME: Should copy the functionality from DDLController, // and call cancel_stream_job here if any part of this failed.