Skip to content

Commit

Permalink
fix sink test + deterministically kill cn
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 2, 2023
1 parent 3ac170e commit 20cf09a
Showing 1 changed file with 6 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,9 @@ const DROP_MV1: &str = "DROP MATERIALIZED VIEW mv1;";
const WAIT: &str = "WAIT;";

async fn kill_cn_and_wait_recover(cluster: &Cluster) {
// Kill it again
for _ in 0..5 {
cluster
.kill_node(&KillOpts {
kill_rate: 1.0,
kill_meta: false,
kill_frontend: false,
kill_compute: true,
kill_compactor: false,
restart_delay_secs: 1,
})
.await;
sleep(Duration::from_secs(2)).await;
}
cluster
.kill_nodes(["compute-1", "compute-2", "compute-3"], 0)
.await;
sleep(Duration::from_secs(10)).await;
}

Expand Down Expand Up @@ -198,10 +187,7 @@ async fn test_foreground_ddl_no_recovery() -> Result<()> {
sleep(Duration::from_secs(2)).await;

// Kill CN should stop the job
cluster
.kill_nodes(["compute-1", "compute-2", "compute-3"], 0)
.await;
sleep(Duration::from_secs(10)).await;
kill_cn_and_wait_recover(&cluster).await;

// Create MV should succeed, since the previous foreground job should be cancelled.
session.run(SET_RATE_LIMIT_2).await?;
Expand Down Expand Up @@ -257,7 +243,7 @@ async fn test_foreground_sink_cancel() -> Result<()> {
let mut session2 = cluster.start_session();
tokio::spawn(async move {
session2.run(SET_RATE_LIMIT_2).await.unwrap();
let result = session2.run("CREATE SINK s ON t (v1);").await;
let result = session2.run("CREATE SINK s FROM t WITH (connector='blackhole');").await;
assert!(result.is_err());
});

Expand All @@ -269,7 +255,7 @@ async fn test_foreground_sink_cancel() -> Result<()> {

// Create MV should succeed, since the previous foreground job should be cancelled.
session.run(SET_RATE_LIMIT_2).await?;
session.run("CREATE SINK s ON t (v1);").await?;
session.run("CREATE SINK s FROM t WITH (connector='blackhole');").await?;

session.run("DROP SINK s;").await?;
session.run(DROP_TABLE).await?;
Expand Down

0 comments on commit 20cf09a

Please sign in to comment.