Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correctly handle the Parallelism inference under the DefaultParallelism configuration. #15543

Merged
merged 4 commits into from
Mar 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Update barrier recov. logic comments
shanicky committed Mar 8, 2024
commit 724651c3fb1db2269520b312064101466f73d64b
14 changes: 7 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
@@ -739,13 +739,13 @@ impl GlobalBarrierManagerContext {
Ok(())
}

/// We infer the new parallelism strategy based on the prior level of parallelism of the table.
/// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
/// For Custom, we'll assess the parallelism of the core fragment;
/// if the parallelism is higher than the currently available parallelism, we'll set it to Auto.
/// If it's lower, we'll set it to Fixed.
/// If it was previously set to Adaptive, but the DefaultParallelism in the configuration isn’t Full,
/// and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
// We infer the new parallelism strategy based on the prior level of parallelism of the table.
// If the parallelism strategy is Fixed or Auto, we won't make any modifications.
// For Custom, we'll assess the parallelism of the core fragment;
// if the parallelism is higher than the currently available parallelism, we'll set it to Adaptive.
// If it's lower, we'll set it to Fixed.
// If it was previously set to Adaptive, but the default_parallelism in the configuration isn’t Full,
// and it matches the actual fragment parallelism, in this case, it will be handled by downgrading to Fixed.
fn derive_target_parallelism(
Copy link
Contributor

@neverchanje neverchanje Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function only called when MV is being created? alter mv and create mv are two different handling paths. Only create mv should evaluate default_parallelism, alter mv should not.

The logic is pretty simple from my understanding.

// Let's divide the handling into two phases:
fn create_mv() {
  // handle `default_parallelism` first.
  let mut parallelism = match config.default_parallelism {
    Some(n) => Fixed(n)
    None => Adaptive
  };
  // Evaluate the sesssion variable  "streaming_parallelism".
  if let Some(p) = session_variable.streaming_parallelism {
    parallelism = p;
  }
}

fn alter_mv(p: TableParallelism) {
  // `p` will be the target parallelism, no matter what value `default_parallelism` is.
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification is made to handle offline scaling during startup recovery, addressing the situation in version 1.7 where new creations using default parallelism with an adaptive policy might exist. This aims to prevent a mass scale-out occurrence immediately after going live.

Copy link
Contributor

@neverchanje neverchanje Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new creations using default parallelism

It sounds good.

I am not familar with the code so I cannot tell if it's correct. Maybe you can manually test it, and make sure:

  1. alter mv will apply regardless of default_parallelism .
  2. Restarting the cluster will not change the parallelism of existing materialized views back to default_parallelism.

available_parallelism: usize,
assigned_parallelism: TableParallelism,