Skip to content

Commit

Permalink
reduce workload of background barrier recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 31, 2023
1 parent a7a9aab commit 5173a45
Showing 1 changed file with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const SET_RATE_LIMIT_2: &str = "SET STREAMING_RATE_LIMIT=2;";
const SET_RATE_LIMIT_1: &str = "SET STREAMING_RATE_LIMIT=1;";
const RESET_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=0;";
const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;";
const WAIT: &str = "WAIT;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
// Kill it again
Expand Down Expand Up @@ -85,41 +86,34 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {

// If the CN is killed before first barrier pass for the MV, the MV will be dropped.
// This is because it's table fragments will NOT be committed until first barrier pass.
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(3)).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;
session.run(SEED_TABLE).await?;
session.flush().await?;

kill_cn_and_wait_recover(&cluster).await;
kill_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;
session.run(SEED_TABLE).await?;
session.flush().await?;

kill_and_wait_recover(&cluster).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;
session.run(SEED_TABLE).await?;
session.flush().await?;

// Now just wait for it to complete.

sleep(Duration::from_secs(10)).await;
session.run(WAIT).await?;

session
.run("SELECT COUNT(v1) FROM m1")
.await?
.assert_result_eq("700000");
.assert_result_eq("2000");

// Make sure that if MV killed and restarted
// it will not be dropped.
Expand Down Expand Up @@ -180,7 +174,7 @@ async fn test_background_ddl_cancel() -> Result<()> {
kill_and_wait_recover(&cluster).await;

// Wait for job to finish
session.run("WAIT;").await?;
session.run(WAIT).await?;

session.run("DROP MATERIALIZED VIEW mv1").await?;
session.run("DROP TABLE t").await?;
Expand Down

0 comments on commit 5173a45

Please sign in to comment.