Skip to content

Commit

Permalink
use rate limit instead
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 31, 2023
1 parent c86b821 commit 526bec0
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 18 deletions.
16 changes: 0 additions & 16 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ where

let pk_order = self.upstream_table.pk_serializer().get_order_types();

#[cfg(madsim)]
let snapshot_read_delay = if let Ok(v) = std::env::var("RW_BACKFILL_SNAPSHOT_READ_DELAY")
&& let Ok(v) = v.parse::<u64>() {
v
} else {
0
};

let upstream_table_id = self.upstream_table.table_id().table_id;

let mut upstream = self.upstream.execute();
Expand Down Expand Up @@ -303,14 +295,6 @@ where
break 'backfill_loop;
}
Some(chunk) => {
#[cfg(madsim)]
{
tokio::time::sleep(std::time::Duration::from_millis(
snapshot_read_delay as u64,
))
.await;
}

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::time::sleep;
const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=5000;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
Expand Down Expand Up @@ -133,13 +134,11 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {

#[tokio::test]
async fn test_background_ddl_cancel() -> Result<()> {
env::set_var("RW_BACKFILL_SNAPSHOT_READ_DELAY", "100");
async fn create_mv(session: &mut Session) -> Result<()> {
session.run(CREATE_MV1).await?;
sleep(Duration::from_secs(2)).await;
Ok(())
}
// FIXME: See if we can use rate limit instead.
use std::env;
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
Expand All @@ -149,6 +148,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.run(SET_RATE_LIMIT).await?;
session.run(SET_BACKGROUND_DDL).await?;

for _ in 0..5 {
Expand All @@ -175,6 +175,9 @@ async fn test_background_ddl_cancel() -> Result<()> {
let ids = cancel_stream_jobs(&mut session).await?;
assert_eq!(ids.len(), 1);

session.run(SEED_TABLE).await?;
session.flush().await?;

// Make sure MV can be created after all these cancels
create_mv(&mut session).await?;

Expand Down

0 comments on commit 526bec0

Please sign in to comment.