Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jan 15, 2024
1 parent 26a29de commit 03ba6bf
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
8 changes: 6 additions & 2 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl Configuration {
..Default::default()
}
}

pub fn for_scale_no_shuffle() -> Self {
// Embed the config file and create a temporary file at runtime. The file will be deleted
// automatically when it's dropped.
Expand All @@ -149,7 +150,8 @@ impl Configuration {
meta_nodes: 3,
compactor_nodes: 2,
compute_node_cores: 2,
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()].into(),
per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()]
.into(),
..Default::default()
}
}
Expand Down Expand Up @@ -844,7 +846,9 @@ impl Session {
}

pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
let result = self.run("show streaming_enable_arrangement_backfill").await?;
let result = self
.run("show streaming_enable_arrangement_backfill")
.await?;
Ok(result == "true")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ async fn test_compatibility_with_low_level() -> Result<()> {
);
let mut cluster = Cluster::start(config.clone()).await?;
let mut session = cluster.start_session();
session.run("SET streaming_enable_arrangement_backfill = false;").await?;
session
.run("SET streaming_enable_arrangement_backfill = false;")
.await?;

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
Expand Down Expand Up @@ -643,12 +645,14 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<
sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;
.await;

session.run("create table t(v int);").await?;

// Streaming arrangement backfill
session.run("SET streaming_enable_arrangement_backfill = true;").await?;
session
.run("SET streaming_enable_arrangement_backfill = true;")
.await?;
session
.run("create materialized view m_simple as select * from t;")
.await?;
Expand Down Expand Up @@ -725,7 +729,7 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<
sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;
.await;

let after_fragment_parallelism = session
.run("select fragment_id, parallelism from rw_fragments order by fragment_id;")
Expand All @@ -735,4 +739,3 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<

Ok(())
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,9 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {

if arrangement_backfill_is_enabled {
// The chain fragment is in a different table fragment.
assert_eq!(
chain_fragment.inner.actors.len(),
6,
);
assert_eq!(chain_fragment.inner.actors.len(), 6,);
// The upstream materialized fragment should be scaled in
assert_eq!(
fragment.inner.actors.len(),
1,
);
assert_eq!(fragment.inner.actors.len(), 1,);
} else {
// No shuffle, so the fragment of upstream materialized node is the same
// as stream table scan.
Expand Down

0 comments on commit 03ba6bf

Please sign in to comment.