diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 9022921fe4e33..c8ede99031ff9 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -538,7 +538,7 @@ impl CatalogController { streaming_job: &StreamingJob, ctx: &StreamContext, version: &PbTableVersion, - default_parallelism: &Option, + specified_parallelism: &Option, ) -> MetaResult { let id = streaming_job.id(); let inner = self.inner.write().await; @@ -557,7 +557,7 @@ impl CatalogController { return Err(MetaError::permission_denied("table version is stale")); } - let parallelism = match default_parallelism { + let parallelism = match specified_parallelism { None => StreamingParallelism::Adaptive, Some(n) => StreamingParallelism::Fixed(n.get() as _), }; diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index bce4f082db4c1..71c99a7e065b4 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -30,6 +30,7 @@ #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] #![feature(is_sorted)] #![feature(impl_trait_in_assoc_type)] +#![feature(const_option)] pub mod backup_restore; pub mod barrier; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f816ef3bb1c98..d3ceeac6366e2 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1300,9 +1300,11 @@ impl DdlController { fn resolve_stream_parallelism( &self, - default_parallelism: Option, + specified_parallelism: Option, cluster_info: &StreamingClusterInfo, ) -> MetaResult { + const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap(); + if cluster_info.parallel_units.is_empty() { return Err(MetaError::unavailable( "No available parallel units to schedule", @@ -1311,21 +1313,13 @@ impl DdlController { let available_parallel_units = NonZeroUsize::new(cluster_info.parallel_units.len()).unwrap(); + // Use configured parallel units if no default parallelism is specified. - let parallelism = default_parallelism.unwrap_or(match &self.env.opts.default_parallelism { - DefaultParallelism::Full => { - if available_parallel_units.get() > VirtualNode::COUNT { - tracing::warn!( - "Too many parallel units, use {} instead", - VirtualNode::COUNT - ); - NonZeroUsize::new(VirtualNode::COUNT).unwrap() - } else { - available_parallel_units - } - } - DefaultParallelism::Default(num) => *num, - }); + let parallelism = + specified_parallelism.unwrap_or_else(|| match &self.env.opts.default_parallelism { + DefaultParallelism::Full => available_parallel_units, + DefaultParallelism::Default(num) => *num, + }); if parallelism > available_parallel_units { return Err(MetaError::unavailable(format!( @@ -1334,7 +1328,12 @@ impl DdlController { ))); } - Ok(parallelism) + if available_parallel_units > MAX_PARALLELISM { + tracing::warn!("Too many parallel units, use {} instead", MAX_PARALLELISM); + Ok(MAX_PARALLELISM) + } else { + Ok(parallelism) + } } /// Builds the actor graph: @@ -1349,7 +1348,7 @@ impl DdlController { affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>, ) -> MetaResult<(CreateStreamingJobContext, TableFragments)> { let id = stream_job.id(); - let default_parallelism = fragment_graph.default_parallelism(); + let specified_parallelism = fragment_graph.specified_parallelism(); let internal_tables = fragment_graph.internal_tables(); let expr_context = stream_ctx.to_expr_context(); @@ -1380,7 +1379,7 @@ impl DdlController { // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let parallelism = self.resolve_stream_parallelism(default_parallelism, &cluster_info)?; + let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; @@ -1402,7 +1401,7 @@ impl DdlController { // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE. // Otherwise, it defaults to FIXED based on deduction. - let table_parallelism = match (default_parallelism, &self.env.opts.default_parallelism) { + let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) { (None, DefaultParallelism::Full) => TableParallelism::Adaptive, _ => TableParallelism::Fixed(parallelism.get()), }; @@ -1435,7 +1434,7 @@ impl DdlController { &streaming_job, &stream_ctx, table.get_version()?, - &fragment_graph.default_parallelism(), + &fragment_graph.specified_parallelism(), ) .await? as u32 } diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 46838e596fdef..47d82d1420460 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -328,7 +328,7 @@ impl DdlController { &streaming_job, &stream_ctx, table.get_version()?, - &fragment_graph.default_parallelism(), + &fragment_graph.specified_parallelism(), ) .await? as u32; @@ -469,7 +469,7 @@ impl DdlController { &streaming_job, &ctx, table.get_version()?, - &fragment_graph.default_parallelism(), + &fragment_graph.specified_parallelism(), ) .await?; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 0e332054d7eaf..592c9c9e966d6 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -317,7 +317,7 @@ pub struct StreamFragmentGraph { /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session /// variable. If not specified, all active parallel units will be used. - default_parallelism: Option, + specified_parallelism: Option, } impl StreamFragmentGraph { @@ -390,7 +390,7 @@ impl StreamFragmentGraph { .map(TableId::from) .collect(); - let default_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism { + let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism { Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?) } else { None @@ -401,7 +401,7 @@ impl StreamFragmentGraph { downstreams, upstreams, dependent_table_ids, - default_parallelism, + specified_parallelism, }) } @@ -496,9 +496,9 @@ impl StreamFragmentGraph { &self.dependent_table_ids } - /// Get the default parallelism of the job. - pub fn default_parallelism(&self) -> Option { - self.default_parallelism + /// Get the parallelism of the job, if specified by the user. + pub fn specified_parallelism(&self) -> Option { + self.specified_parallelism } /// Get downstreams of a fragment.