diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 45a0d81b968ab..97a9da0ff6a99 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -146,14 +146,6 @@ where let pk_order = self.upstream_table.pk_serializer().get_order_types(); - #[cfg(madsim)] - let snapshot_read_delay = if let Ok(v) = std::env::var("RW_BACKFILL_SNAPSHOT_READ_DELAY") - && let Ok(v) = v.parse::() { - v - } else { - 0 - }; - let upstream_table_id = self.upstream_table.table_id().table_id; let mut upstream = self.upstream.execute(); @@ -303,14 +295,6 @@ where break 'backfill_loop; } Some(chunk) => { - #[cfg(madsim)] - { - tokio::time::sleep(std::time::Duration::from_millis( - snapshot_read_delay as u64, - )) - .await; - } - // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 67b447aba9df8..773c739816ca2 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -23,6 +23,7 @@ use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; +const SET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=5000;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; async fn kill_cn_and_wait_recover(cluster: &Cluster) { @@ -133,13 +134,11 @@ async fn test_background_mv_barrier_recovery() -> Result<()> { #[tokio::test] async fn test_background_ddl_cancel() -> Result<()> { - env::set_var("RW_BACKFILL_SNAPSHOT_READ_DELAY", "100"); async fn create_mv(session: &mut Session) -> Result<()> { session.run(CREATE_MV1).await?; sleep(Duration::from_secs(2)).await; Ok(()) } - // FIXME: See if we can use rate limit instead. use std::env; tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) @@ -149,6 +148,7 @@ async fn test_background_ddl_cancel() -> Result<()> { let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; session.run(SEED_TABLE).await?; + session.run(SET_RATE_LIMIT).await?; session.run(SET_BACKGROUND_DDL).await?; for _ in 0..5 { @@ -175,6 +175,9 @@ async fn test_background_ddl_cancel() -> Result<()> { let ids = cancel_stream_jobs(&mut session).await?; assert_eq!(ids.len(), 1); + session.run(SEED_TABLE).await?; + session.flush().await?; + // Make sure MV can be created after all these cancels create_mv(&mut session).await?;