Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correctly handle the Parallelism inference under the DefaultParallelism configuration. #15543

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ pub struct MetaConfig {
pub enable_dropped_column_reclaim: bool,
}

#[derive(Clone, Debug, Default)]
#[derive(Copy, Clone, Debug, Default)]
pub enum DefaultParallelism {
#[default]
Full,
Expand Down
168 changes: 138 additions & 30 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::config::DefaultParallelism;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand Down Expand Up @@ -738,6 +739,45 @@ impl GlobalBarrierManagerContext {
Ok(())
}

// We infer the new parallelism strategy based on the prior level of parallelism of the table.
// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
// For Custom, we'll assess the parallelism of the core fragment;
// if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
// If it's lower, we'll set it to Fixed.
// If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
// and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
fn derive_target_parallelism(
Copy link
Contributor

@neverchanje neverchanje Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function only called when MV is being created? alter mv and create mv are two different handling paths. Only create mv should evaluate default_parallelism, alter mv should not.

The logic is pretty simple from my understanding.

// Let's divide the handling into two phases:
fn create_mv() {
  // handle `default_parallelism` first.
  let mut parallelism = match config.default_parallelism {
    Some(n) => Fixed(n)
    None => Adaptive
  };
  // Evaluate the sesssion variable  "streaming_parallelism".
  if let Some(p) = session_variable.streaming_parallelism {
    parallelism = p;
  }
}

fn alter_mv(p: TableParallelism) {
  // `p` will be the target parallelism, no matter what value `default_parallelism` is.
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification is made to handle offline scaling during startup recovery, addressing the situation in version 1.7 where new creations using default parallelism with an adaptive policy might exist. This aims to prevent a mass scale-out occurrence immediately after going live.

Copy link
Contributor

@neverchanje neverchanje Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new creations using default parallelism

It sounds good.

I am not familar with the code so I cannot tell if it's correct. Maybe you can manually test it, and make sure:

  1. alter mv will apply regardless of default_parallelism .
  2. Restarting the cluster will not change the parallelism of existing materialized views back to default_parallelism.

available_parallelism: usize,
assigned_parallelism: TableParallelism,
actual_fragment_parallelism: Option<usize>,
default_parallelism: DefaultParallelism,
) -> TableParallelism {
match assigned_parallelism {
TableParallelism::Custom => {
if let Some(fragment_parallelism) = actual_fragment_parallelism {
if fragment_parallelism >= available_parallelism {
TableParallelism::Adaptive
} else {
TableParallelism::Fixed(fragment_parallelism)
}
} else {
TableParallelism::Adaptive
}
}
TableParallelism::Adaptive => {
match (default_parallelism, actual_fragment_parallelism) {
(DefaultParallelism::Default(n), Some(fragment_parallelism))
if fragment_parallelism == n.get() =>
{
TableParallelism::Fixed(fragment_parallelism)
}
_ => TableParallelism::Adaptive,
}
}
_ => assigned_parallelism,
}
}

async fn scale_actors_v1(&self, workers: Vec<WorkerNode>) -> MetaResult<()> {
let info = self.resolve_actor_info(workers.clone()).await?;

Expand All @@ -749,43 +789,16 @@ impl GlobalBarrierManagerContext {
return Ok(());
}

let current_parallelism = info
let available_parallelism = info
.node_map
.values()
.flat_map(|worker_node| worker_node.parallel_units.iter())
.count();

if current_parallelism == 0 {
if available_parallelism == 0 {
return Err(anyhow!("no available parallel units for auto scaling").into());
}

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
/// if the parallelism is higher than the currently available parallelism, we'll set it to Auto.
/// If it's lower, we'll set it to Fixed.
fn derive_target_parallelism_for_custom(
current_parallelism: usize,
table: &TableFragments,
) -> TableParallelism {
let derive_from_fragment = table.mview_fragment().or_else(|| table.sink_fragment());

if let TableParallelism::Custom = &table.assigned_parallelism {
if let Some(fragment) = derive_from_fragment {
let fragment_parallelism = fragment.get_actors().len();
if fragment_parallelism >= current_parallelism {
TableParallelism::Adaptive
} else {
TableParallelism::Fixed(fragment_parallelism)
}
} else {
TableParallelism::Adaptive
}
} else {
table.assigned_parallelism
}
}

let all_table_parallelisms: HashMap<_, _> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

Expand All @@ -794,9 +807,18 @@ impl GlobalBarrierManagerContext {
.iter()
.filter(|&(_, table)| matches!(table.state(), State::Created))
.map(|(table_id, table)| {
let actual_fragment_parallelism = table
.mview_fragment()
.or_else(|| table.sink_fragment())
.map(|fragment| fragment.get_actors().len());
(
*table_id,
derive_target_parallelism_for_custom(current_parallelism, table),
Self::derive_target_parallelism(
available_parallelism,
table.assigned_parallelism,
actual_fragment_parallelism,
self.env.opts.default_parallelism,
),
)
})
.collect()
Expand Down Expand Up @@ -1080,3 +1102,89 @@ impl GlobalBarrierManager {
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;

use super::*;
#[test]
fn test_derive_target_parallelism() {
// total 10, assigned custom, actual 5, default full -> fixed(5)
assert_eq!(
TableParallelism::Fixed(5),
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
Some(5),
DefaultParallelism::Full,
)
);

// total 10, assigned custom, actual 10, default full -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
Some(10),
DefaultParallelism::Full,
)
);

// total 10, assigned custom, actual 11, default full -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
Some(11),
DefaultParallelism::Full,
)
);

// total 10, assigned fixed(5), actual _, default full -> fixed(5)
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Custom,
None,
DefaultParallelism::Full,
)
);

// total 10, assigned adaptive, actual _, default full -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Adaptive,
None,
DefaultParallelism::Full,
)
);

// total 10, assigned adaptive, actual 5, default 5 -> fixed(5)
assert_eq!(
TableParallelism::Fixed(5),
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Adaptive,
Some(5),
DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
)
);

// total 10, assigned adaptive, actual 6, default 5 -> adaptive
assert_eq!(
TableParallelism::Adaptive,
GlobalBarrierManagerContext::derive_target_parallelism(
10,
TableParallelism::Adaptive,
Some(6),
DefaultParallelism::Default(NonZeroUsize::new(5).unwrap()),
)
);
}
}
8 changes: 5 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1389,9 +1389,11 @@ impl DdlController {
// and the context that contains all information needed for building the
// actors on the compute nodes.

let table_parallelism = match default_parallelism {
None => TableParallelism::Adaptive,
Some(parallelism) => TableParallelism::Fixed(parallelism.get()),
// If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
// Otherwise, it defaults to FIXED based on deduction.
let table_parallelism = match (default_parallelism, &self.env.opts.default_parallelism) {
(None, DefaultParallelism::Full) => TableParallelism::Adaptive,
_ => TableParallelism::Fixed(parallelism.get()),
};

let table_fragments = TableFragments::new(
Expand Down
Loading