Skip to content

Commit

Permalink
fix: Refactor auto_parallelism.rs to initialize session after kil…
Browse files Browse the repository at this point in the history
…ling compute node (#17751)
  • Loading branch information
shanicky authored Jul 19, 2024
1 parent af97625 commit 2a52dd3
Showing 1 changed file with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ async fn test_active_online() -> Result<()> {
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();

// Keep one worker reserved for adding later.
cluster
Expand All @@ -229,6 +228,8 @@ async fn test_active_online() -> Result<()> {
))
.await;

let mut session = cluster.start_session();

session.run("create table t (v1 int);").await?;
session
.run("create materialized view m as select count(*) from t;")
Expand Down Expand Up @@ -303,7 +304,6 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper(
enable_auto_parallelism_control,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
Expand All @@ -316,6 +316,8 @@ async fn test_auto_parallelism_control_with_fixed_and_auto_helper(
))
.await;

let mut session = cluster.start_session();

session.run("create table t (v1 int);").await?;

session
Expand Down Expand Up @@ -490,10 +492,6 @@ async fn test_compatibility_with_low_level() -> Result<()> {
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();
session
.run("SET streaming_use_arrangement_backfill = false;")
.await?;

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
Expand All @@ -506,6 +504,11 @@ async fn test_compatibility_with_low_level() -> Result<()> {
))
.await;

let mut session = cluster.start_session();
session
.run("SET streaming_use_arrangement_backfill = false;")
.await?;

session.run("create table t(v int);").await?;

// single fragment downstream
Expand Down Expand Up @@ -631,7 +634,6 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<
true,
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
Expand All @@ -644,6 +646,8 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<
))
.await;

let mut session = cluster.start_session();

session.run("create table t(v int);").await?;

// Streaming arrangement backfill
Expand Down

0 comments on commit 2a52dd3

Please sign in to comment.