Skip to content

Commit

Permalink
fix: Correctly handle the Parallelism inference under the DefaultPara…
Browse files Browse the repository at this point in the history
…llelism configuration. (#15543)

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Mar 8, 2024
1 parent 2ef0ff2 commit e0f305e
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ pub struct MetaConfig {
pub enable_dropped_column_reclaim: bool,
}

#[derive(Clone, Debug, Default)]
#[derive(Copy, Clone, Debug, Default)]
pub enum DefaultParallelism {
#[default]
Full,
Expand Down
168 changes: 138 additions & 30 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::DefaultParallelism;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand Down Expand Up @@ -747,6 +748,45 @@ impl GlobalBarrierManagerContext {
Ok(())
}

// We infer the new parallelism strategy based on the prior level of parallelism of the table.
// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
// For Custom, we'll assess the parallelism of the core fragment;
// if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
// If it's lower, we'll set it to Fixed.
// If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
// and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
fn derive_target_parallelism(
available_parallelism: usize,
assigned_parallelism: TableParallelism,
actual_fragment_parallelism: Option<usize>,
default_parallelism: DefaultParallelism,
) -> TableParallelism {
match assigned_parallelism {
TableParallelism::Custom => {
if let Some(fragment_parallelism) = actual_fragment_parallelism {
if fragment_parallelism >= available_parallelism {
TableParallelism::Adaptive
} else {
TableParallelism::Fixed(fragment_parallelism)
}
} else {
TableParallelism::Adaptive
}
}
TableParallelism::Adaptive => {
match (default_parallelism, actual_fragment_parallelism) {
(DefaultParallelism::Default(n), Some(fragment_parallelism))
if fragment_parallelism == n.get() =>
{
TableParallelism::Fixed(fragment_parallelism)
}
_ => TableParallelism::Adaptive,
}
}
_ => assigned_parallelism,
}
}

async fn scale_actors_v1(&self, workers: Vec<WorkerNode>) -> MetaResult<()> {
let info = self.resolve_actor_info(workers.clone()).await?;

Expand All @@ -758,43 +798,16 @@ impl GlobalBarrierManagerContext {
return Ok(());
}

let current_parallelism = info
let available_parallelism = info
.node_map
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

if current_parallelism == 0 {
if available_parallelism == 0 {
return Err(anyhow!("no available parallel units for auto scaling").into());
}

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
/// if the parallelism is higher than the currently available parallelism, we'll set it to Auto.
/// If it's lower, we'll set it to Fixed.
fn derive_target_parallelism_for_custom(
current_parallelism: usize,
table: &TableFragments,
) -> TableParallelism {
let derive_from_fragment = table.mview_fragment().or_else(|| table.sink_fragment());

if let TableParallelism::Custom = &table.assigned_parallelism {
if let Some(fragment) = derive_from_fragment {
let fragment_parallelism = fragment.get_actors().len();
if fragment_parallelism >= current_parallelism {
TableParallelism::Adaptive
} else {
TableParallelism::Fixed(fragment_parallelism)
}
} else {
TableParallelism::Adaptive
}
} else {
table.assigned_parallelism
}
}

let all_table_parallelisms: HashMap<_, _> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

Expand All @@ -803,9 +816,18 @@ impl GlobalBarrierManagerContext {
.iter()
.filter(|&(_, table)| matches!(table.state(), State::Created))
.map(|(table_id, table)| {
let actual_fragment_parallelism = table
.mview_fragment()
.or_else(|| table.sink_fragment())
.map(|fragment| fragment.get_actors().len());
(
*table_id,
derive_target_parallelism_for_custom(current_parallelism, table),
Self::derive_target_parallelism(
available_parallelism,
table.assigned_parallelism,
actual_fragment_parallelism,
self.env.opts.default_parallelism,
),
)
})
.collect()
Expand Down Expand Up @@ -1089,3 +1111,89 @@ impl GlobalBarrierManager {
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;

use super::*;
#[test]
fn test_derive_target_parallelism() {
// total 10, assigned custom, actual 5, default full -> fixed(5)
assert_eq!(
TableParallelism::Fixed(5),
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
Some(5),
DefaultParallelism::Full,
)
);

// total 10, assigned custom, actual 10, default full -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
Some(10),
DefaultParallelism::Full,
)
);

// total 10, assigned custom, actual 11, default full -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
Some(11),
DefaultParallelism::Full,
)
);

// total 10, assigned fixed(5), actual _, default full -> fixed(5)
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
None,
DefaultParallelism::Full,
)
);

// total 10, assigned adaptive, actual _, default full -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Adaptive,
None,
DefaultParallelism::Full,
)
);

// total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
assert_eq!(
TableParallelism::Fixed(5),
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Adaptive,
Some(5),
DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
)
);

// total 10, assigned adaptive, actual 6, default 5 -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Adaptive,
Some(6),
DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
)
);
}
}
8 changes: 5 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1399,9 +1399,11 @@ impl DdlController {
// and the context that contains all information needed for building the
// actors on the compute nodes.

let table_parallelism = match default_parallelism {
None => TableParallelism::Adaptive,
Some(parallelism) => TableParallelism::Fixed(parallelism.get()),
// If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
// Otherwise, it defaults to FIXED based on deduction.
let table_parallelism = match (default_parallelism, &self.env.opts.default_parallelism) {
(None, DefaultParallelism::Full) => TableParallelism::Adaptive,
_ => TableParallelism::Fixed(parallelism.get()),
};

let table_fragments = TableFragments::new(
Expand Down

0 comments on commit e0f305e

Please sign in to comment.