Skip to content

Commit

Permalink
fix(meta): do not panic on specified parallelism greater than vnode c…
Browse files Browse the repository at this point in the history
…ount (#15941)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Apr 2, 2024
1 parent 2afbb6f commit 5897c22
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl CatalogController {
streaming_job: &StreamingJob,
ctx: &StreamContext,
version: &PbTableVersion,
default_parallelism: &Option<NonZeroUsize>,
specified_parallelism: &Option<NonZeroUsize>,
) -> MetaResult<ObjectId> {
let id = streaming_job.id();
let inner = self.inner.write().await;
Expand All @@ -557,7 +557,7 @@ impl CatalogController {
return Err(MetaError::permission_denied("table version is stale"));
}

let parallelism = match default_parallelism {
let parallelism = match specified_parallelism {
None => StreamingParallelism::Adaptive,
Some(n) => StreamingParallelism::Fixed(n.get() as _),
};
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(is_sorted)]
#![feature(impl_trait_in_assoc_type)]
#![feature(const_option)]

pub mod backup_restore;
pub mod barrier;
Expand Down
39 changes: 19 additions & 20 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,9 +1300,11 @@ impl DdlController {

fn resolve_stream_parallelism(
&self,
default_parallelism: Option<NonZeroUsize>,
specified_parallelism: Option<NonZeroUsize>,
cluster_info: &StreamingClusterInfo,
) -> MetaResult<NonZeroUsize> {
const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap();

if cluster_info.parallel_units.is_empty() {
return Err(MetaError::unavailable(
"No available parallel units to schedule",
Expand All @@ -1311,21 +1313,13 @@ impl DdlController {

let available_parallel_units =
NonZeroUsize::new(cluster_info.parallel_units.len()).unwrap();

// Use configured parallel units if no default parallelism is specified.
let parallelism = default_parallelism.unwrap_or(match &self.env.opts.default_parallelism {
DefaultParallelism::Full => {
if available_parallel_units.get() > VirtualNode::COUNT {
tracing::warn!(
"Too many parallel units, use {} instead",
VirtualNode::COUNT
);
NonZeroUsize::new(VirtualNode::COUNT).unwrap()
} else {
available_parallel_units
}
}
DefaultParallelism::Default(num) => *num,
});
let parallelism =
specified_parallelism.unwrap_or_else(|| match &self.env.opts.default_parallelism {
DefaultParallelism::Full => available_parallel_units,
DefaultParallelism::Default(num) => *num,
});

if parallelism > available_parallel_units {
return Err(MetaError::unavailable(format!(
Expand All @@ -1334,7 +1328,12 @@ impl DdlController {
)));
}

Ok(parallelism)
if available_parallel_units > MAX_PARALLELISM {
tracing::warn!("Too many parallel units, use {} instead", MAX_PARALLELISM);
Ok(MAX_PARALLELISM)
} else {
Ok(parallelism)
}
}

/// Builds the actor graph:
Expand All @@ -1349,7 +1348,7 @@ impl DdlController {
affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
) -> MetaResult<(CreateStreamingJobContext, TableFragments)> {
let id = stream_job.id();
let default_parallelism = fragment_graph.default_parallelism();
let specified_parallelism = fragment_graph.specified_parallelism();
let internal_tables = fragment_graph.internal_tables();
let expr_context = stream_ctx.to_expr_context();

Expand Down Expand Up @@ -1380,7 +1379,7 @@ impl DdlController {
// 2. Build the actor graph.
let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;

let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?;
let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?;

let actor_graph_builder =
ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?;
Expand All @@ -1402,7 +1401,7 @@ impl DdlController {

// If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
// Otherwise, it defaults to FIXED based on deduction.
let table_parallelism = match (default_parallelism, &self.env.opts.default_parallelism) {
let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
(None, DefaultParallelism::Full) => TableParallelism::Adaptive,
_ => TableParallelism::Fixed(parallelism.get()),
};
Expand Down Expand Up @@ -1435,7 +1434,7 @@ impl DdlController {
&streaming_job,
&stream_ctx,
table.get_version()?,
&fragment_graph.default_parallelism(),
&fragment_graph.specified_parallelism(),
)
.await? as u32
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl DdlController {
&streaming_job,
&stream_ctx,
table.get_version()?,
&fragment_graph.default_parallelism(),
&fragment_graph.specified_parallelism(),
)
.await? as u32;

Expand Down Expand Up @@ -469,7 +469,7 @@ impl DdlController {
&streaming_job,
&ctx,
table.get_version()?,
&fragment_graph.default_parallelism(),
&fragment_graph.specified_parallelism(),
)
.await?;

Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ pub struct StreamFragmentGraph {

/// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
/// variable. If not specified, all active parallel units will be used.
default_parallelism: Option<NonZeroUsize>,
specified_parallelism: Option<NonZeroUsize>,
}

impl StreamFragmentGraph {
Expand Down Expand Up @@ -390,7 +390,7 @@ impl StreamFragmentGraph {
.map(TableId::from)
.collect();

let default_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
} else {
None
Expand All @@ -401,7 +401,7 @@ impl StreamFragmentGraph {
downstreams,
upstreams,
dependent_table_ids,
default_parallelism,
specified_parallelism,
})
}

Expand Down Expand Up @@ -496,9 +496,9 @@ impl StreamFragmentGraph {
&self.dependent_table_ids
}

/// Get the default parallelism of the job.
pub fn default_parallelism(&self) -> Option<NonZeroUsize> {
self.default_parallelism
/// Get the parallelism of the job, if specified by the user.
pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
self.specified_parallelism
}

/// Get downstreams of a fragment.
Expand Down

0 comments on commit 5897c22

Please sign in to comment.