From e12dcd9af4b81ea852ccd92490dab0523086ce3d Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 2 Nov 2023 16:32:29 +0800 Subject: [PATCH] reduce workload of background_mv_barrier_recovery --- src/stream/src/executor/flow_control.rs | 1 + .../recovery/background_ddl.rs | 70 ++++++++----------- 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/src/stream/src/executor/flow_control.rs b/src/stream/src/executor/flow_control.rs index c48fdc8a4392f..fcc9fd2b6f056 100644 --- a/src/stream/src/executor/flow_control.rs +++ b/src/stream/src/executor/flow_control.rs @@ -55,6 +55,7 @@ impl FlowControlExecutor { match msg { Message::Chunk(chunk) => { let Some(n) = NonZeroU32::new(chunk.cardinality() as u32) else { + tracing::trace!(?chunk, "empty chunk"); // Handle case where chunk is empty continue; }; diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index b4849c877f8ec..6cd7a0b5d903e 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -21,12 +21,14 @@ use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; +const DROP_TABLE: &str = "DROP TABLE t;"; const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 500);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;"; const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;"; const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; +const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;"; const WAIT: &str = "WAIT;"; async fn kill_cn_and_wait_recover(cluster: &Cluster) { @@ -80,80 +82,70 @@ fn init_logger() { .try_init(); } +async fn create_mv(session: &mut Session) -> Result<()> { + session.run(CREATE_MV1).await?; + sleep(Duration::from_secs(2)).await; + Ok(()) +} + #[tokio::test] async fn test_background_mv_barrier_recovery() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + init_logger(); + let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); - session.run("CREATE TABLE t1 (v1 int);").await?; - session - .run("INSERT INTO t1 select * from generate_series(1, 400000)") - .await?; - session.run("flush").await?; - session.run("SET BACKGROUND_DDL=true;").await?; - session - .run("create materialized view m1 as select * from t1;") - .await?; + session.run(CREATE_TABLE).await?; + session.run(SEED_TABLE).await?; + session.flush().await?; + session.run(SET_RATE_LIMIT_1).await?; + session.run(SET_BACKGROUND_DDL).await?; + create_mv(&mut session).await?; // If the CN is killed before first barrier pass for the MV, the MV will be dropped. // This is because it's table fragments will NOT be committed until first barrier pass. - sleep(Duration::from_secs(5)).await; - kill_cn_and_wait_recover(&cluster).await; - - // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; - kill_cn_and_wait_recover(&cluster).await; kill_and_wait_recover(&cluster).await; // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; + session.run(SEED_TABLE).await?; + session.run(SEED_TABLE).await?; + session.flush().await?; kill_and_wait_recover(&cluster).await; kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. - cluster - .run("INSERT INTO t1 select * from generate_series(1, 100000);") - .await?; - cluster.run("flush;").await?; + session.run(SEED_TABLE).await?; + session.run(SEED_TABLE).await?; + session.flush().await?; - // Now just wait for it to complete. + kill_cn_and_wait_recover(&cluster).await; + kill_and_wait_recover(&cluster).await; - sleep(Duration::from_secs(10)).await; + // Now just wait for it to complete. + session.run(WAIT).await?; session - .run("SELECT COUNT(v1) FROM m1") + .run("SELECT COUNT(v1) FROM mv1") .await? - .assert_result_eq("700000"); + .assert_result_eq("3500"); // Make sure that if MV killed and restarted // it will not be dropped. - - session.run("DROP MATERIALIZED VIEW m1").await?; - session.run("DROP TABLE t1").await?; + session.run(DROP_MV1).await?; + session.run(DROP_TABLE).await?; Ok(()) } #[tokio::test] async fn test_background_ddl_cancel() -> Result<()> { - async fn create_mv(session: &mut Session) -> Result<()> { - session.run(CREATE_MV1).await?; - sleep(Duration::from_secs(2)).await; - Ok(()) - } init_logger(); let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); session.run(CREATE_TABLE).await?; session.run(SEED_TABLE).await?; + session.flush().await?; session.run(SET_RATE_LIMIT_2).await?; session.run(SET_BACKGROUND_DDL).await?;