Skip to content

Commit

Permalink
do not rewrite alter parallelism in frontend
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 16, 2024
1 parent 9eb8365 commit 43f4a88
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 13 deletions.
20 changes: 9 additions & 11 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub async fn handle_alter_parallelism(
}
};

let mut target_parallelism = extract_table_parallelism(parallelism)?;
let target_parallelism = extract_table_parallelism(parallelism)?;

let available_parallelism = session
.env()
Expand All @@ -103,26 +103,24 @@ pub async fn handle_alter_parallelism(
.filter(|w| w.is_streaming_schedulable())
.map(|w| w.parallelism)
.sum::<u32>();
// TODO(var-vnode): get max parallelism from catalogs.
// Although the meta service will clamp the value for us, we should still check it here for better UI.
let max_parallelism = VirtualNode::COUNT_FOR_COMPAT;

let mut builder = RwPgResponse::builder(stmt_type);

// TODO(var-vnode): get correct max parallelism from catalogs.
// Although the meta service will clamp the value for us and print warnings there,
// we may still check it here for better UI.
let max_parallelism = VirtualNode::COUNT_FOR_COMPAT;

match &target_parallelism.parallelism {
Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => {
if available_parallelism > max_parallelism as u32 {
builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {max_parallelism}"));
builder = builder.notice("Available parallelism may exceed the maximum parallelism limit, the actual parallelism will be limited");
}
}
Some(Parallelism::Fixed(FixedParallelism { parallelism })) => {
if *parallelism > max_parallelism as u32 {
builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({max_parallelism})"));
target_parallelism = PbTableParallelism {
parallelism: Some(PbParallelism::Fixed(FixedParallelism {
parallelism: max_parallelism as u32,
})),
};
builder = builder.notice("Provided parallelism may exceed the maximum parallelism limit, will be reset to FIXED(max_parallelism)");
// Rewriting will be done in meta service.
}
}
_ => {}
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2226,8 +2226,6 @@ impl ScaleController {
}
TableParallelism::Fixed(mut n) => {
if n > max_parallelism {
// This should be unreachable as it was already checked and rewritten in the frontend.
// We still intercept it to prevent accidental modifications.
tracing::warn!("specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}");
n = max_parallelism
}
Expand Down

0 comments on commit 43f4a88

Please sign in to comment.