diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs
index bbe60c010b94b..d428bbed31d8e 100644
--- a/src/meta/src/barrier/command.rs
+++ b/src/meta/src/barrier/command.rs
@@ -668,6 +668,7 @@ impl CommandContext {
}
Command::CancelStreamingJob(table_fragments) => {
+ tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
let node_actors = table_fragments.worker_actor_ids();
self.clean_up(node_actors).await?;
diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs
index 781dd244c1c57..116ae756bcb9d 100644
--- a/src/meta/src/manager/catalog/mod.rs
+++ b/src/meta/src/manager/catalog/mod.rs
@@ -893,6 +893,7 @@ impl CatalogManager {
}
commit_meta!(self, tables)?;
+ tracing::debug!(id = ?table.id, "notifying frontend");
let version = self
.notify_frontend(
Operation::Add,
@@ -932,6 +933,18 @@ impl CatalogManager {
);
return Ok(());
};
+ // `Unspecified` maps to Created state, due to backwards compatibility.
+ // `Created` states should not be cancelled.
+ if table
+ .get_stream_job_status()
+ .unwrap_or(StreamJobStatus::Created)
+ != StreamJobStatus::Creating
+ {
+ return Err(MetaError::invalid_parameter(format!(
+ "table is not in creating state id={:#?}",
+ table_id
+ )));
+ }
tracing::trace!("cleanup tables for {}", table.id);
let mut table_ids = vec![table.id];
@@ -2518,6 +2531,15 @@ impl CatalogManager {
.await
}
+ pub async fn table_is_created(&self, table_id: TableId) -> bool {
+ let guard = self.core.lock().await;
+ return if let Some(table) = guard.database.tables.get(&table_id) {
+ table.get_stream_job_status() != Ok(StreamJobStatus::Creating)
+ } else {
+ false
+ };
+ }
+
pub async fn get_tables(&self, table_ids: &[TableId]) -> Vec
{
let mut tables = vec![];
let guard = self.core.lock().await;
diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs
index 003252582d6c5..184ca096734e1 100644
--- a/src/meta/src/stream/stream_manager.rs
+++ b/src/meta/src/stream/stream_manager.rs
@@ -133,7 +133,7 @@ impl CreatingStreamingJobInfo {
{
receivers.insert(job_id, rx);
} else {
- tracing::warn!("failed to send canceling state");
+ tracing::warn!(id=?job_id, "failed to send canceling state");
}
} else {
// If these job ids do not exist in streaming_jobs,
@@ -271,9 +271,12 @@ impl GlobalStreamManager {
while let Some(state) = receiver.recv().await {
match state {
CreatingState::Failed { reason } => {
+ tracing::debug!(id=?table_id, "stream job failed");
+ self.creating_job_info.delete_job(table_id).await;
return Err(reason);
}
CreatingState::Canceling { finish_tx } => {
+ tracing::debug!(id=?table_id, "cancelling streaming job");
if let Ok(table_fragments) = self
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
@@ -331,10 +334,14 @@ impl GlobalStreamManager {
let _ = finish_tx.send(()).inspect_err(|_| {
tracing::warn!("failed to notify cancelled: {table_id}")
});
+ self.creating_job_info.delete_job(table_id).await;
return Err(MetaError::cancelled("create".into()));
}
}
- CreatingState::Created => return Ok(()),
+ CreatingState::Created => {
+ self.creating_job_info.delete_job(table_id).await;
+ return Ok(());
+ }
}
}
};
@@ -600,22 +607,29 @@ impl GlobalStreamManager {
// NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
// since Barrier manager manages the recovered stream jobs.
let futures = recovered_job_ids.into_iter().map(|id| async move {
+ tracing::debug!(?id, "cancelling recovered streaming job");
let result: MetaResult<()> = try {
let fragment = self
.fragment_manager
.select_table_fragments_by_table_id(&id)
.await?;
+ if fragment.is_created() {
+ Err(MetaError::invalid_parameter(format!(
+ "streaming job {} is already created",
+ id
+ )))?;
+ }
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(fragment))
.await?;
};
match result {
Ok(_) => {
- tracing::info!("cancelled recovered streaming job {id}");
+ tracing::info!(?id, "cancelled recovered streaming job");
Some(id)
},
Err(_) => {
- tracing::error!("failed to cancel recovered streaming job {id}, does {id} correspond to any jobs in `SHOW JOBS`?");
+ tracing::error!(?id, "failed to cancel recovered streaming job, does it correspond to any jobs in `SHOW JOBS`?");
None
},
}
diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs
index 97a9da0ff6a99..45a0d81b968ab 100644
--- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs
+++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs
@@ -146,6 +146,14 @@ where
let pk_order = self.upstream_table.pk_serializer().get_order_types();
+ #[cfg(madsim)]
+ let snapshot_read_delay = if let Ok(v) = std::env::var("RW_BACKFILL_SNAPSHOT_READ_DELAY")
+ && let Ok(v) = v.parse::() {
+ v
+ } else {
+ 0
+ };
+
let upstream_table_id = self.upstream_table.table_id().table_id;
let mut upstream = self.upstream.execute();
@@ -295,6 +303,14 @@ where
break 'backfill_loop;
}
Some(chunk) => {
+ #[cfg(madsim)]
+ {
+ tokio::time::sleep(std::time::Duration::from_millis(
+ snapshot_read_delay as u64,
+ ))
+ .await;
+ }
+
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
diff --git a/src/tests/simulation/src/background_ddl.toml b/src/tests/simulation/src/background_ddl.toml
new file mode 100644
index 0000000000000..f0bf41f804739
--- /dev/null
+++ b/src/tests/simulation/src/background_ddl.toml
@@ -0,0 +1,10 @@
+[server]
+telemetry_enabled = false
+metrics_level = "Disabled"
+
+#[streaming.developer]
+#stream_chunk_size = 1
+
+[system]
+barrier_interval_ms = 1000
+max_concurrent_creating_streaming_jobs = 4
diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs
index 6cc6168513cd4..a9ba7a657bf67 100644
--- a/src/tests/simulation/src/cluster.rs
+++ b/src/tests/simulation/src/cluster.rs
@@ -137,6 +137,29 @@ impl Configuration {
etcd_data_path: None,
}
}
+
+ pub fn for_background_ddl() -> Self {
+ // Embed the config file and create a temporary file at runtime. The file will be deleted
+ // automatically when it's dropped.
+ let config_path = {
+ let mut file =
+ tempfile::NamedTempFile::new().expect("failed to create temp config file");
+ file.write_all(include_bytes!("background_ddl.toml"))
+ .expect("failed to write config file");
+ file.into_temp_path()
+ };
+
+ Configuration {
+ config_path: ConfigPath::Temp(config_path.into()),
+ frontend_nodes: 1,
+ compute_nodes: 1,
+ meta_nodes: 3,
+ compactor_nodes: 1,
+ compute_node_cores: 2,
+ etcd_timeout_rate: 0.0,
+ etcd_data_path: None,
+ }
+ }
}
/// A risingwave cluster.
@@ -684,4 +707,12 @@ impl KillOpts {
kill_compactor: true,
restart_delay_secs: 20,
};
+ pub const ALL_FAST: Self = KillOpts {
+ kill_rate: 1.0,
+ kill_meta: true,
+ kill_frontend: true,
+ kill_compute: true,
+ kill_compactor: true,
+ restart_delay_secs: 2,
+ };
}
diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs
index 89df82d4c21a0..67b447aba9df8 100644
--- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs
+++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs
@@ -15,10 +15,16 @@
use std::time::Duration;
use anyhow::Result;
-use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts};
+use itertools::Itertools;
+use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session};
use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;
+const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
+const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);";
+const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
+const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
+
async fn kill_cn_and_wait_recover(cluster: &Cluster) {
// Kill it again
for _ in 0..5 {
@@ -39,16 +45,33 @@ async fn kill_cn_and_wait_recover(cluster: &Cluster) {
async fn kill_and_wait_recover(cluster: &Cluster) {
// Kill it again
- for _ in 0..5 {
+ for _ in 0..3 {
sleep(Duration::from_secs(2)).await;
- cluster.kill_node(&KillOpts::ALL).await;
+ cluster.kill_node(&KillOpts::ALL_FAST).await;
}
- sleep(Duration::from_secs(20)).await;
+ sleep(Duration::from_secs(10)).await;
+}
+
+async fn cancel_stream_jobs(session: &mut Session) -> Result> {
+ tracing::info!("finding streaming jobs to cancel");
+ let ids = session
+ .run("select ddl_id from rw_catalog.rw_ddl_progress;")
+ .await?;
+ tracing::info!("selected streaming jobs to cancel {:?}", ids);
+ tracing::info!("cancelling streaming jobs");
+ let ids = ids.split('\n').collect::>().join(",");
+ let result = session.run(&format!("cancel jobs {};", ids)).await?;
+ tracing::info!("cancelled streaming jobs, {:#?}", result);
+ let ids = result
+ .split('\n')
+ .map(|s| s.parse::().unwrap())
+ .collect_vec();
+ Ok(ids)
}
#[tokio::test]
async fn test_background_mv_barrier_recovery() -> Result<()> {
- let mut cluster = Cluster::start(Configuration::for_backfill()).await?;
+ let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();
session.run("CREATE TABLE t1 (v1 int);").await?;
@@ -73,8 +96,16 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
cluster.run("flush;").await?;
kill_cn_and_wait_recover(&cluster).await;
+ kill_and_wait_recover(&cluster).await;
+
+ // Send some upstream updates.
+ cluster
+ .run("INSERT INTO t1 select * from generate_series(1, 100000);")
+ .await?;
+ cluster.run("flush;").await?;
kill_and_wait_recover(&cluster).await;
+ kill_cn_and_wait_recover(&cluster).await;
// Send some upstream updates.
cluster
@@ -86,14 +117,75 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
sleep(Duration::from_secs(10)).await;
- // Make sure after finished, we should have 5000_000 rows.
session
.run("SELECT COUNT(v1) FROM m1")
.await?
- .assert_result_eq("600000");
+ .assert_result_eq("700000");
+
+ // Make sure that if MV killed and restarted
+ // it will not be dropped.
session.run("DROP MATERIALIZED VIEW m1").await?;
session.run("DROP TABLE t1").await?;
Ok(())
}
+
+#[tokio::test]
+async fn test_background_ddl_cancel() -> Result<()> {
+ env::set_var("RW_BACKFILL_SNAPSHOT_READ_DELAY", "100");
+ async fn create_mv(session: &mut Session) -> Result<()> {
+ session.run(CREATE_MV1).await?;
+ sleep(Duration::from_secs(2)).await;
+ Ok(())
+ }
+ // FIXME: See if we can use rate limit instead.
+ use std::env;
+ tracing_subscriber::fmt()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .with_ansi(false)
+ .init();
+ let mut cluster = Cluster::start(Configuration::for_scale()).await?;
+ let mut session = cluster.start_session();
+ session.run(CREATE_TABLE).await?;
+ session.run(SEED_TABLE).await?;
+ session.run(SET_BACKGROUND_DDL).await?;
+
+ for _ in 0..5 {
+ create_mv(&mut session).await?;
+ let ids = cancel_stream_jobs(&mut session).await?;
+ assert_eq!(ids.len(), 1);
+ }
+
+ create_mv(&mut session).await?;
+
+ // Test cancel after kill cn
+ kill_cn_and_wait_recover(&cluster).await;
+
+ let ids = cancel_stream_jobs(&mut session).await?;
+ assert_eq!(ids.len(), 1);
+
+ sleep(Duration::from_secs(2)).await;
+
+ create_mv(&mut session).await?;
+
+ // Test cancel after kill meta
+ kill_and_wait_recover(&cluster).await;
+
+ let ids = cancel_stream_jobs(&mut session).await?;
+ assert_eq!(ids.len(), 1);
+
+ // Make sure MV can be created after all these cancels
+ create_mv(&mut session).await?;
+
+ kill_and_wait_recover(&cluster).await;
+
+ // Wait for job to finish
+ session.run("WAIT;").await?;
+
+ session.run("DROP MATERIALIZED VIEW mv1").await?;
+ session.run("DROP TABLE t").await?;
+
+ env::remove_var("RW_BACKFILL_SNAPSHOT_READ_DELAY");
+ Ok(())
+}