Skip to content

Commit

Permalink
reduce workload of background_mv_barrier_recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 2, 2023
1 parent 73dcbc2 commit e12dcd9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/stream/src/executor/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl FlowControlExecutor {
match msg {
Message::Chunk(chunk) => {
let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else {
tracing::trace!(?chunk, "empty chunk");
// Handle case where chunk is empty
continue;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
const DROP_TABLE: &str = "DROP TABLE t;";
const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);";
const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;";
const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;";
const WAIT: &str = "WAIT;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
Expand Down Expand Up @@ -80,80 +82,70 @@ fn init_logger() {
.try_init();
}

async fn create_mv(session: &mut Session) -> Result<()> {
session.run(CREATE_MV1).await?;
sleep(Duration::from_secs(2)).await;
Ok(())
}

#[tokio::test]
async fn test_background_mv_barrier_recovery() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut session = cluster.start_session();

session.run("CREATE TABLE t1 (v1 int);").await?;
session
.run("INSERT INTO t1 select * from generate_series(1, 400000)")
.await?;
session.run("flush").await?;
session.run("SET BACKGROUND_DDL=true;").await?;
session
.run("create materialized view m1 as select * from t1;")
.await?;
session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.flush().await?;
session.run(SET_RATE_LIMIT_1).await?;
session.run(SET_BACKGROUND_DDL).await?;
create_mv(&mut session).await?;

// If the CN is killed before first barrier pass for the MV, the MV will be dropped.
// This is because it's table fragments will NOT be committed until first barrier pass.
sleep(Duration::from_secs(5)).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;

kill_cn_and_wait_recover(&cluster).await;
kill_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;
session.run(SEED_TABLE).await?;
session.run(SEED_TABLE).await?;
session.flush().await?;

kill_and_wait_recover(&cluster).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;
session.run(SEED_TABLE).await?;
session.run(SEED_TABLE).await?;
session.flush().await?;

// Now just wait for it to complete.
kill_cn_and_wait_recover(&cluster).await;
kill_and_wait_recover(&cluster).await;

sleep(Duration::from_secs(10)).await;
// Now just wait for it to complete.
session.run(WAIT).await?;

session
.run("SELECT COUNT(v1) FROM m1")
.run("SELECT COUNT(v1) FROM mv1")
.await?
.assert_result_eq("700000");
.assert_result_eq("3500");

// Make sure that if MV killed and restarted
// it will not be dropped.

session.run("DROP MATERIALIZED VIEW m1").await?;
session.run("DROP TABLE t1").await?;
session.run(DROP_MV1).await?;
session.run(DROP_TABLE).await?;

Ok(())
}

#[tokio::test]
async fn test_background_ddl_cancel() -> Result<()> {
async fn create_mv(session: &mut Session) -> Result<()> {
session.run(CREATE_MV1).await?;
sleep(Duration::from_secs(2)).await;
Ok(())
}
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut session = cluster.start_session();
session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.flush().await?;
session.run(SET_RATE_LIMIT_2).await?;
session.run(SET_BACKGROUND_DDL).await?;

Expand Down

0 comments on commit e12dcd9

Please sign in to comment.