Skip to content

Commit

Permalink
Improve parallelism handling: Check limits in alter_parallelism.rs,…
Browse files Browse the repository at this point in the history
… log warnings for adaptive mode in `ddl_controller.rs`.
  • Loading branch information
shanicky committed Aug 16, 2024
1 parent 7e1df0b commit 3a27704
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 20 deletions.
33 changes: 17 additions & 16 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,25 @@ pub async fn handle_alter_parallelism(

let mut builder = RwPgResponse::builder(stmt_type);

let parallelism = match &target_parallelism.parallelism {
Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => Some(available_parallelism),
Some(Parallelism::Fixed(FixedParallelism { parallelism })) => Some(*parallelism),
_ => None,
match &target_parallelism.parallelism {
Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => {
if available_parallelism > VirtualNode::COUNT as u32 {
builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::COUNT));
}
}
Some(Parallelism::Fixed(FixedParallelism { parallelism })) => {
if *parallelism > VirtualNode::COUNT as u32 {
builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::COUNT));
target_parallelism = PbTableParallelism {
parallelism: Some(PbParallelism::Fixed(FixedParallelism {
parallelism: VirtualNode::COUNT as u32,
})),
};
}
}
_ => {}
};

if let Some(parallelism) = parallelism
&& parallelism > VirtualNode::COUNT as u32
{
target_parallelism = PbTableParallelism {
parallelism: Some(PbParallelism::Fixed(FixedParallelism {
parallelism: VirtualNode::COUNT as u32,
})),
};

builder = builder.notice("Available or provided parallelism exceeds the maximum parallelism limit, resetting to FIXED(256).".to_string());
}

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_parallelism(table_id, target_parallelism, deferred)
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1640,9 +1640,9 @@ impl DdlController {
// If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
// Otherwise, it defaults to FIXED based on deduction.
let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
// If the parallelism is limited, the parallelism is set to FIXED.
(None, DefaultParallelism::Full) if parallelism_limited => {
TableParallelism::Fixed(parallelism.get())
tracing::warn!("Parallelism limited to 256 in ADAPTIVE mode");
TableParallelism::Adaptive
}
(None, DefaultParallelism::Full) => TableParallelism::Adaptive,
_ => TableParallelism::Fixed(parallelism.get()),
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,7 @@ impl ScaleController {
}
TableParallelism::Fixed(mut n) => {
if n > VirtualNode::COUNT {
// This should be unreachable, but we still intercept it to prevent accidental modifications.
tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT");
n = VirtualNode::COUNT
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,18 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> {
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq(format!("FIXED({})", vnode_max));
.assert_result_eq("ADAPTIVE");

session
.run("select distinct parallelism from rw_fragment_parallelism where name = 't'")
.await?
.assert_result_eq(format!("{}", vnode_max));

Ok(())
}

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter() -> Result<()> {
async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
Expand All @@ -193,3 +199,34 @@ async fn test_parallelism_exceed_virtual_node_max_alter() -> Result<()> {
.assert_result_eq(format!("FIXED({})", vnode_max));
Ok(())
}

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 100;
let mut cluster = Cluster::start(configuration).await?;
let mut session = cluster.start_session();
session.run("set streaming_parallelism = 1").await?;
session.run("create table t(v int)").await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("FIXED(1)");

session
.run("alter table t set parallelism = adaptive")
.await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("ADAPTIVE");

session
.run("select distinct parallelism from rw_fragment_parallelism where name = 't'")
.await?
.assert_result_eq(format!("{}", vnode_max));

Ok(())
}

0 comments on commit 3a27704

Please sign in to comment.