Skip to content

Commit

Permalink
Update ScaleController logic, improve streaming parallelism tests, re…
Browse files Browse the repository at this point in the history
…move redundant test
  • Loading branch information
shanicky committed Aug 16, 2024
1 parent fd0cbea commit 7e1df0b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@ impl ScaleController {
}
FragmentDistributionType::Hash => match parallelism {
TableParallelism::Adaptive => {
if schedulable_worker_slots.len() > VirtualNode::COUNT {
if all_available_slots > VirtualNode::COUNT {
tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT");
// force limit to VirtualNode::COUNT
let target_worker_slots = schedule_units_for_slots(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,20 @@ async fn test_streaming_parallelism_index() -> Result<()> {
#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_scale();
configuration.compute_node_cores = vnode_max + 100;
let mut configuration = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
true,
);

configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 1;
let mut cluster = Cluster::start(configuration).await?;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

let mut session = cluster.start_session();
session.run("create table t(v int)").await?;
session
Expand All @@ -161,7 +172,9 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> {
#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let configuration = Configuration::for_scale();
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 100;
let mut cluster = Cluster::start(configuration).await?;
let mut session = cluster.start_session();
session.run("set streaming_parallelism = 1").await?;
Expand All @@ -172,62 +185,11 @@ async fn test_parallelism_exceed_virtual_node_max_alter() -> Result<()> {
.assert_result_eq("FIXED(1)");

session
.run(format!(
"alter table t set parallelism = {}",
vnode_max + 100
))
.run(format!("alter table t set parallelism = {}", vnode_max + 1))
.await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq(format!("FIXED({})", vnode_max));
Ok(())
}

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_auto_scale() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_scale();
let worker_parallelism = (vnode_max as f32 / 2.5).floor() as usize;
configuration.compute_node_cores = worker_parallelism;
configuration.compute_nodes = 3;
let mut cluster = Cluster::start(configuration).await?;

// Keep one worker reserved for adding later.
let select_worker = "compute-2";
cluster
.simple_kill_nodes(vec![select_worker.to_string()])
.await;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

let mut session = cluster.start_session();
session.run("create table t(v int)").await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("ADAPTIVE");

session
.run("select distinct parallelism from rw_fragment_parallelism where name = 't'")
.await?
.assert_result_eq(format!("{}", worker_parallelism * 2));

cluster.simple_restart_nodes(vec![select_worker]).await;

// wait for a while
sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

session
.run("select distinct parallelism from rw_fragment_parallelism where name = 't'")
.await?
.assert_result_eq(format!("{}", vnode_max));

Ok(())
}

0 comments on commit 7e1df0b

Please sign in to comment.