Skip to content

Commit

Permalink
fix(schedule): Restrict parallelism that exceed the virtual node limi…
Browse files Browse the repository at this point in the history
…t during automatic scaling. (#18006) (#18091)

Signed-off-by: Shanicky Chen <[email protected]>
Co-authored-by: Shanicky Chen <[email protected]>
  • Loading branch information
github-actions[bot] and shanicky authored Aug 19, 2024
1 parent 37f1ef0 commit a4f2803
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 23 deletions.
37 changes: 33 additions & 4 deletions src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

use pgwire::pg_response::StatementType;
use risingwave_common::bail;
use risingwave_common::hash::VirtualNode;
use risingwave_pb::meta::table_parallelism::{
AdaptiveParallelism, FixedParallelism, PbParallelism,
AdaptiveParallelism, FixedParallelism, Parallelism, PbParallelism,
};
use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
Expand Down Expand Up @@ -92,15 +93,43 @@ pub async fn handle_alter_parallelism(
}
};

let target_parallelism = extract_table_parallelism(parallelism)?;
let mut target_parallelism = extract_table_parallelism(parallelism)?;

let available_parallelism = session
.env()
.worker_node_manager()
.list_worker_nodes()
.iter()
.filter(|w| w.is_streaming_schedulable())
.map(|w| w.parallelism)
.sum::<u32>();

let mut builder = RwPgResponse::builder(stmt_type);

match &target_parallelism.parallelism {
Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => {
if available_parallelism > VirtualNode::COUNT as u32 {
builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {}", VirtualNode::COUNT));
}
}
Some(Parallelism::Fixed(FixedParallelism { parallelism })) => {
if *parallelism > VirtualNode::COUNT as u32 {
builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({})", VirtualNode::COUNT));
target_parallelism = PbTableParallelism {
parallelism: Some(PbParallelism::Fixed(FixedParallelism {
parallelism: VirtualNode::COUNT as u32,
})),
};
}
}
_ => {}
};

let catalog_writer = session.catalog_writer()?;
catalog_writer
.alter_parallelism(table_id, target_parallelism, deferred)
.await?;

let mut builder = RwPgResponse::builder(stmt_type);

if deferred {
builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_string());
}
Expand Down
22 changes: 14 additions & 8 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,8 +1523,6 @@ impl DdlController {
specified_parallelism: Option<NonZeroUsize>,
cluster_info: &StreamingClusterInfo,
) -> MetaResult<NonZeroUsize> {
const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap();

let available_parallelism = cluster_info.parallelism();
if available_parallelism == 0 {
return Err(MetaError::unavailable("No available slots to schedule"));
Expand All @@ -1546,12 +1544,7 @@ impl DdlController {
)));
}

if available_parallelism > MAX_PARALLELISM {
tracing::warn!("Too many parallelism, use {} instead", MAX_PARALLELISM);
Ok(MAX_PARALLELISM)
} else {
Ok(parallelism)
}
Ok(parallelism)
}

/// Builds the actor graph:
Expand Down Expand Up @@ -1617,6 +1610,15 @@ impl DdlController {

let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?;

const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap();

let parallelism_limited = parallelism > MAX_PARALLELISM;
if parallelism_limited {
tracing::warn!("Too many parallelism, use {} instead", MAX_PARALLELISM);
}

let parallelism = parallelism.min(MAX_PARALLELISM);

let actor_graph_builder =
ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?;

Expand All @@ -1638,6 +1640,10 @@ impl DdlController {
// If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
// Otherwise, it defaults to FIXED based on deduction.
let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
(None, DefaultParallelism::Full) if parallelism_limited => {
tracing::warn!("Parallelism limited to 256 in ADAPTIVE mode");
TableParallelism::Adaptive
}
(None, DefaultParallelism::Full) => TableParallelism::Adaptive,
_ => TableParallelism::Fixed(parallelism.get()),
};
Expand Down
48 changes: 38 additions & 10 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ impl ScaleController {
.map(|worker| (worker.id, worker))
.collect();

let worker_slots = workers
let schedulable_worker_slots = workers
.values()
.map(|worker| (worker.id, worker.parallelism as usize))
.collect::<BTreeMap<_, _>>();
Expand Down Expand Up @@ -2191,7 +2191,7 @@ impl ScaleController {
*fragment_slots.entry(*worker_id).or_default() += 1;
}

let all_available_slots: usize = worker_slots.values().cloned().sum();
let all_available_slots: usize = schedulable_worker_slots.values().cloned().sum();

if all_available_slots == 0 {
bail!(
Expand All @@ -2208,12 +2208,13 @@ impl ScaleController {

assert_eq!(*should_be_one, 1);

if worker_slots.contains_key(single_worker_id) {
if schedulable_worker_slots.contains_key(single_worker_id) {
// NOTE: shall we continue?
continue;
}

let units = schedule_units_for_slots(&worker_slots, 1, table_id)?;
let units =
schedule_units_for_slots(&schedulable_worker_slots, 1, table_id)?;

let (chosen_target_worker_id, should_be_one) =
units.iter().exactly_one().ok().with_context(|| {
Expand All @@ -2237,14 +2238,41 @@ impl ScaleController {
}
FragmentDistributionType::Hash => match parallelism {
TableParallelism::Adaptive => {
target_plan.insert(
fragment_id,
Self::diff_worker_slot_changes(&fragment_slots, &worker_slots),
);
if all_available_slots > VirtualNode::COUNT {
tracing::warn!("available parallelism for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT");
// force limit to VirtualNode::COUNT
let target_worker_slots = schedule_units_for_slots(
&schedulable_worker_slots,
VirtualNode::COUNT,
table_id,
)?;

target_plan.insert(
fragment_id,
Self::diff_worker_slot_changes(
&fragment_slots,
&target_worker_slots,
),
);
} else {
target_plan.insert(
fragment_id,
Self::diff_worker_slot_changes(
&fragment_slots,
&schedulable_worker_slots,
),
);
}
}
TableParallelism::Fixed(n) => {
TableParallelism::Fixed(mut n) => {
if n > VirtualNode::COUNT {
// This should be unreachable, but we still intercept it to prevent accidental modifications.
tracing::warn!("parallelism {n} for table {table_id} is larger than VirtualNode::COUNT, force limit to VirtualNode::COUNT");
n = VirtualNode::COUNT
}

let target_worker_slots =
schedule_units_for_slots(&worker_slots, n, table_id)?;
schedule_units_for_slots(&schedulable_worker_slots, n, table_id)?;

target_plan.insert(
fragment_id,
Expand Down
9 changes: 9 additions & 0 deletions src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use prost::Message;
use risingwave_error::tonic::ToTonicStatus;
use thiserror::Error;


#[rustfmt::skip]
#[cfg_attr(madsim, path = "sim/catalog.rs")]
pub mod catalog;
Expand Down Expand Up @@ -242,6 +243,14 @@ impl meta::table_fragments::ActorStatus {
}
}

impl common::WorkerNode {
pub fn is_streaming_schedulable(&self) -> bool {
let property = self.property.as_ref();
property.map_or(false, |p| p.is_streaming)
&& !property.map_or(false, |p| p.is_unschedulable)
}
}

impl common::ActorLocation {
pub fn from_worker(worker_node_id: u32) -> Option<Self> {
Some(Self { worker_node_id })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

/// Please ensure that this value is the same as the one in the `risingwave-auto-scale.toml` file.
const MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE: u64 = 15;
pub const MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE: u64 = 15;

#[tokio::test]
async fn test_passive_online_and_offline() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use anyhow::Result;
use madsim::time::sleep;
use risingwave_common::hash::VirtualNode;
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::ctl_ext::predicate::identity_contains;
use risingwave_simulation::utils::AssertResult;

use crate::scale::auto_parallelism::MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE;

#[tokio::test]
async fn test_streaming_parallelism_default() -> Result<()> {
Expand Down Expand Up @@ -135,3 +142,91 @@ async fn test_streaming_parallelism_index() -> Result<()> {
);
Ok(())
}

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
true,
);

configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 1;
let mut cluster = Cluster::start(configuration).await?;

sleep(Duration::from_secs(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2,
))
.await;

let mut session = cluster.start_session();
session.run("create table t(v int)").await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("ADAPTIVE");

session
.run("select distinct parallelism from rw_fragment_parallelism where name = 't'")
.await?
.assert_result_eq(format!("{}", vnode_max));

Ok(())
}

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 100;
let mut cluster = Cluster::start(configuration).await?;
let mut session = cluster.start_session();
session.run("set streaming_parallelism = 1").await?;
session.run("create table t(v int)").await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("FIXED(1)");

session
.run(format!("alter table t set parallelism = {}", vnode_max + 1))
.await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq(format!("FIXED({})", vnode_max));
Ok(())
}

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 100;
let mut cluster = Cluster::start(configuration).await?;
let mut session = cluster.start_session();
session.run("set streaming_parallelism = 1").await?;
session.run("create table t(v int)").await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("FIXED(1)");

session
.run("alter table t set parallelism = adaptive")
.await?;
session
.run("select parallelism from rw_streaming_parallelism where name = 't'")
.await?
.assert_result_eq("ADAPTIVE");

session
.run("select distinct parallelism from rw_fragment_parallelism where name = 't'")
.await?
.assert_result_eq(format!("{}", vnode_max));

Ok(())
}

0 comments on commit a4f2803

Please sign in to comment.