Skip to content

Commit

Permalink
test(stream): test cancel in deterministic testing mode (#13070)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 31, 2023
1 parent 25e8939 commit c86b821
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ impl CommandContext {
}

Command::CancelStreamingJob(table_fragments) => {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
let node_actors = table_fragments.worker_actor_ids();
self.clean_up(node_actors).await?;

Expand Down
22 changes: 22 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ impl CatalogManager {
}
commit_meta!(self, tables)?;

tracing::debug!(id = ?table.id, "notifying frontend");
let version = self
.notify_frontend(
Operation::Add,
Expand Down Expand Up @@ -932,6 +933,18 @@ impl CatalogManager {
);
return Ok(());
};
// `Unspecified` maps to Created state, due to backwards compatibility.
// `Created` states should not be cancelled.
if table
.get_stream_job_status()
.unwrap_or(StreamJobStatus::Created)
!= StreamJobStatus::Creating
{
return Err(MetaError::invalid_parameter(format!(
"table is not in creating state id={:#?}",
table_id
)));
}

tracing::trace!("cleanup tables for {}", table.id);
let mut table_ids = vec![table.id];
Expand Down Expand Up @@ -2518,6 +2531,15 @@ impl CatalogManager {
.await
}

pub async fn table_is_created(&self, table_id: TableId) -> bool {
let guard = self.core.lock().await;
return if let Some(table) = guard.database.tables.get(&table_id) {
table.get_stream_job_status() != Ok(StreamJobStatus::Creating)
} else {
false
};
}

pub async fn get_tables(&self, table_ids: &[TableId]) -> Vec<Table> {
let mut tables = vec![];
let guard = self.core.lock().await;
Expand Down
22 changes: 18 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl CreatingStreamingJobInfo {
{
receivers.insert(job_id, rx);
} else {
tracing::warn!("failed to send canceling state");
tracing::warn!(id=?job_id, "failed to send canceling state");
}
} else {
// If these job ids do not exist in streaming_jobs,
Expand Down Expand Up @@ -271,9 +271,12 @@ impl GlobalStreamManager {
while let Some(state) = receiver.recv().await {
match state {
CreatingState::Failed { reason } => {
tracing::debug!(id=?table_id, "stream job failed");
self.creating_job_info.delete_job(table_id).await;
return Err(reason);
}
CreatingState::Canceling { finish_tx } => {
tracing::debug!(id=?table_id, "cancelling streaming job");
if let Ok(table_fragments) = self
.fragment_manager
.select_table_fragments_by_table_id(&table_id)
Expand Down Expand Up @@ -331,10 +334,14 @@ impl GlobalStreamManager {
let _ = finish_tx.send(()).inspect_err(|_| {
tracing::warn!("failed to notify cancelled: {table_id}")
});
self.creating_job_info.delete_job(table_id).await;
return Err(MetaError::cancelled("create".into()));
}
}
CreatingState::Created => return Ok(()),
CreatingState::Created => {
self.creating_job_info.delete_job(table_id).await;
return Ok(());
}
}
}
};
Expand Down Expand Up @@ -600,22 +607,29 @@ impl GlobalStreamManager {
// NOTE(kwannoel): For recovered stream jobs, we can directly cancel them by running the barrier command,
// since Barrier manager manages the recovered stream jobs.
let futures = recovered_job_ids.into_iter().map(|id| async move {
tracing::debug!(?id, "cancelling recovered streaming job");
let result: MetaResult<()> = try {
let fragment = self
.fragment_manager
.select_table_fragments_by_table_id(&id)
.await?;
if fragment.is_created() {
Err(MetaError::invalid_parameter(format!(
"streaming job {} is already created",
id
)))?;
}
self.barrier_scheduler
.run_command(Command::CancelStreamingJob(fragment))
.await?;
};
match result {
Ok(_) => {
tracing::info!("cancelled recovered streaming job {id}");
tracing::info!(?id, "cancelled recovered streaming job");
Some(id)
},
Err(_) => {
tracing::error!("failed to cancel recovered streaming job {id}, does {id} correspond to any jobs in `SHOW JOBS`?");
tracing::error!(?id, "failed to cancel recovered streaming job, does it correspond to any jobs in `SHOW JOBS`?");
None
},
}
Expand Down
16 changes: 16 additions & 0 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ where

let pk_order = self.upstream_table.pk_serializer().get_order_types();

#[cfg(madsim)]
let snapshot_read_delay = if let Ok(v) = std::env::var("RW_BACKFILL_SNAPSHOT_READ_DELAY")
&& let Ok(v) = v.parse::<u64>() {
v
} else {
0
};

let upstream_table_id = self.upstream_table.table_id().table_id;

let mut upstream = self.upstream.execute();
Expand Down Expand Up @@ -295,6 +303,14 @@ where
break 'backfill_loop;
}
Some(chunk) => {
#[cfg(madsim)]
{
tokio::time::sleep(std::time::Duration::from_millis(
snapshot_read_delay as u64,
))
.await;
}

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand Down
10 changes: 10 additions & 0 deletions src/tests/simulation/src/background_ddl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[server]
telemetry_enabled = false
metrics_level = "Disabled"

#[streaming.developer]
#stream_chunk_size = 1

[system]
barrier_interval_ms = 1000
max_concurrent_creating_streaming_jobs = 4
31 changes: 31 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,29 @@ impl Configuration {
etcd_data_path: None,
}
}

pub fn for_background_ddl() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
let config_path = {
let mut file =
tempfile::NamedTempFile::new().expect("failed to create temp config file");
file.write_all(include_bytes!("background_ddl.toml"))
.expect("failed to write config file");
file.into_temp_path()
};

Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 1,
compute_nodes: 1,
meta_nodes: 3,
compactor_nodes: 1,
compute_node_cores: 2,
etcd_timeout_rate: 0.0,
etcd_data_path: None,
}
}
}

/// A risingwave cluster.
Expand Down Expand Up @@ -684,4 +707,12 @@ impl KillOpts {
kill_compactor: true,
restart_delay_secs: 20,
};
pub const ALL_FAST: Self = KillOpts {
kill_rate: 1.0,
kill_meta: true,
kill_frontend: true,
kill_compute: true,
kill_compactor: true,
restart_delay_secs: 2,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
use std::time::Duration;

use anyhow::Result;
use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts};
use itertools::Itertools;
use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session};
use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
// Kill it again
for _ in 0..5 {
Expand All @@ -39,16 +45,33 @@ async fn kill_cn_and_wait_recover(cluster: &Cluster) {

async fn kill_and_wait_recover(cluster: &Cluster) {
// Kill it again
for _ in 0..5 {
for _ in 0..3 {
sleep(Duration::from_secs(2)).await;
cluster.kill_node(&KillOpts::ALL).await;
cluster.kill_node(&KillOpts::ALL_FAST).await;
}
sleep(Duration::from_secs(20)).await;
sleep(Duration::from_secs(10)).await;
}

async fn cancel_stream_jobs(session: &mut Session) -> Result<Vec<u32>> {
tracing::info!("finding streaming jobs to cancel");
let ids = session
.run("select ddl_id from rw_catalog.rw_ddl_progress;")
.await?;
tracing::info!("selected streaming jobs to cancel {:?}", ids);
tracing::info!("cancelling streaming jobs");
let ids = ids.split('\n').collect::<Vec<_>>().join(",");
let result = session.run(&format!("cancel jobs {};", ids)).await?;
tracing::info!("cancelled streaming jobs, {:#?}", result);
let ids = result
.split('\n')
.map(|s| s.parse::<u32>().unwrap())
.collect_vec();
Ok(ids)
}

#[tokio::test]
async fn test_background_mv_barrier_recovery() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_backfill()).await?;
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();

session.run("CREATE TABLE t1 (v1 int);").await?;
Expand All @@ -73,8 +96,16 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
cluster.run("flush;").await?;

kill_cn_and_wait_recover(&cluster).await;
kill_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;

kill_and_wait_recover(&cluster).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
Expand All @@ -86,14 +117,75 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {

sleep(Duration::from_secs(10)).await;

// Make sure after finished, we should have 5000_000 rows.
session
.run("SELECT COUNT(v1) FROM m1")
.await?
.assert_result_eq("600000");
.assert_result_eq("700000");

// Make sure that if MV killed and restarted
// it will not be dropped.

session.run("DROP MATERIALIZED VIEW m1").await?;
session.run("DROP TABLE t1").await?;

Ok(())
}

#[tokio::test]
async fn test_background_ddl_cancel() -> Result<()> {
env::set_var("RW_BACKFILL_SNAPSHOT_READ_DELAY", "100");
async fn create_mv(session: &mut Session) -> Result<()> {
session.run(CREATE_MV1).await?;
sleep(Duration::from_secs(2)).await;
Ok(())
}
// FIXME: See if we can use rate limit instead.
use std::env;
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.init();
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.run(SET_BACKGROUND_DDL).await?;

for _ in 0..5 {
create_mv(&mut session).await?;
let ids = cancel_stream_jobs(&mut session).await?;
assert_eq!(ids.len(), 1);
}

create_mv(&mut session).await?;

// Test cancel after kill cn
kill_cn_and_wait_recover(&cluster).await;

let ids = cancel_stream_jobs(&mut session).await?;
assert_eq!(ids.len(), 1);

sleep(Duration::from_secs(2)).await;

create_mv(&mut session).await?;

// Test cancel after kill meta
kill_and_wait_recover(&cluster).await;

let ids = cancel_stream_jobs(&mut session).await?;
assert_eq!(ids.len(), 1);

// Make sure MV can be created after all these cancels
create_mv(&mut session).await?;

kill_and_wait_recover(&cluster).await;

// Wait for job to finish
session.run("WAIT;").await?;

session.run("DROP MATERIALIZED VIEW mv1").await?;
session.run("DROP TABLE t").await?;

env::remove_var("RW_BACKFILL_SNAPSHOT_READ_DELAY");
Ok(())
}

0 comments on commit c86b821

Please sign in to comment.