From 53b355601f043bfe329464b5e281e31f5282a630 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 11 Sep 2024 14:45:40 +0800 Subject: [PATCH 01/13] add back changes on session variable This reverts commit 683746cebb4aafe4ee5dde67454f20ed3ec4c6fc. --- src/common/src/session_config/mod.rs | 15 +++++++++++++++ src/frontend/src/stream_fragmenter/mod.rs | 4 +--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 163aa1879939..ea85e0cb266e 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -30,6 +30,7 @@ use serde_with::{serde_as, DisplayFromStr}; use thiserror::Error; use self::non_zero64::ConfigNonZeroU64; +use crate::hash::VirtualNode; use crate::session_config::sink_decouple::SinkDecouple; use crate::session_config::transaction_isolation_level::IsolationLevel; pub use crate::session_config::visibility_mode::VisibilityMode; @@ -298,6 +299,9 @@ pub struct SessionConfig { /// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit. #[parameter(default = false)] bypass_cluster_limits: bool, + + #[parameter(default = VirtualNode::COUNT, check_hook = check_vnode_count)] + vnode_count: usize, } fn check_timezone(val: &str) -> Result<(), String> { @@ -324,6 +328,17 @@ fn check_bytea_output(val: &str) -> Result<(), String> { } } +fn check_vnode_count(val: &usize) -> Result<(), String> { + match val { + 0 => Err("VNODE_COUNT must be greater than 0".to_owned()), + 1..=VirtualNode::MAX_COUNT => Ok(()), + _ => Err(format!( + "VNODE_COUNT must be less than or equal to {}", + VirtualNode::MAX_COUNT + )), + } +} + impl SessionConfig { pub fn set_force_two_phase_agg( &mut self, diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index ce08a8f0eb44..35faeb2c6a49 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -14,7 +14,6 @@ mod graph; use graph::*; -use risingwave_common::hash::VirtualNode; use risingwave_common::util::recursive::{self, Recurse as _}; use risingwave_connector::WithPropertiesExt; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -145,8 +144,7 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult Date: Thu, 12 Sep 2024 14:31:29 +0800 Subject: [PATCH 02/13] vnode expr context Signed-off-by: Bugen Zhao --- src/expr/core/src/expr_context.rs | 7 ++++++ src/expr/impl/src/scalar/vnode.rs | 7 +++--- src/stream/src/executor/actor.rs | 42 +++++++++++++++++++++---------- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index 27a888118e31..547aef01fef1 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -14,6 +14,7 @@ use std::future::Future; +use risingwave_common::hash::VirtualNode; use risingwave_expr::{define_context, Result as ExprResult}; use risingwave_pb::plan_common::ExprContext; @@ -21,6 +22,7 @@ use risingwave_pb::plan_common::ExprContext; define_context! { pub TIME_ZONE: String, pub FRAGMENT_ID: u32, + pub VNODE_COUNT: usize, } pub fn capture_expr_context() -> ExprResult { @@ -28,6 +30,11 @@ pub fn capture_expr_context() -> ExprResult { Ok(ExprContext { time_zone }) } +/// Get the vnode count from the context, or [`VirtualNode::COUNT`] if not set. +pub fn vnode_count() -> usize { + VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT) +} + pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output where Fut: Future, diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index edd4caa39970..960d71ca809d 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -19,6 +19,7 @@ use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_expr::expr::{BoxedExpression, Expression}; +use risingwave_expr::expr_context::vnode_count; use risingwave_expr::{build_function, Result}; #[derive(Debug)] @@ -43,8 +44,7 @@ impl Expression for VnodeExpression { } async fn eval(&self, input: &DataChunk) -> Result { - // TODO(var-vnode): get vnode count from context - let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, VirtualNode::COUNT); + let vnodes = VirtualNode::compute_chunk(input, &self.dist_key_indices, vnode_count()); let mut builder = I16ArrayBuilder::new(input.capacity()); vnodes .into_iter() @@ -53,9 +53,8 @@ impl Expression for VnodeExpression { } async fn eval_row(&self, input: &OwnedRow) -> Result { - // TODO(var-vnode): get vnode count from context Ok(Some( - VirtualNode::compute_row(input, &self.dist_key_indices, VirtualNode::COUNT) + VirtualNode::compute_row(input, &self.dist_key_indices, vnode_count()) .to_scalar() .into(), )) diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index b1b167567ed3..b6d98b63a8fa 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -19,13 +19,16 @@ use std::sync::{Arc, LazyLock}; use anyhow::anyhow; use await_tree::InstrumentAwait; use futures::future::join_all; +use futures::FutureExt; use hytra::TrAdder; +use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; use risingwave_common::config::StreamingConfig; +use risingwave_common::hash::VirtualNode; use risingwave_common::log::LogSuppresser; use risingwave_common::metrics::{IntGaugeExt, GLOBAL_ERROR_METRICS}; use risingwave_common::util::epoch::EpochPair; -use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID}; +use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID, VNODE_COUNT}; use risingwave_expr::ExprError; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::stream_plan::PbStreamActor; @@ -44,6 +47,7 @@ use crate::task::{ActorId, LocalBarrierManager}; pub struct ActorContext { pub id: ActorId, pub fragment_id: u32, + pub vnode_count: usize, pub mview_definition: String, // TODO(eric): these seem to be useless now? @@ -71,6 +75,7 @@ impl ActorContext { Arc::new(Self { id, fragment_id: 0, + vnode_count: VirtualNode::COUNT_FOR_TEST, mview_definition: "".to_string(), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), @@ -97,6 +102,9 @@ impl ActorContext { id: stream_actor.actor_id, fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), + vnode_count: (stream_actor.vnode_bitmap.as_ref()) + // TODO(var-vnode): use 1 for singleton fragment + .map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, @@ -177,18 +185,26 @@ where #[inline(always)] pub async fn run(mut self) -> StreamResult<()> { - FRAGMENT_ID::scope( - self.actor_context.fragment_id, - expr_context_scope(self.expr_context.clone(), async move { - tokio::join!( - // Drive the subtasks concurrently. - join_all(std::mem::take(&mut self.subtasks)), - self.run_consumer(), - ) - .1 - }), - ) - .await + let expr_context = self.expr_context.clone(); + let fragment_id = self.actor_context.fragment_id; + let vnode_count = self.actor_context.vnode_count; + + let run = async move { + tokio::join!( + // Drive the subtasks concurrently. + join_all(std::mem::take(&mut self.subtasks)), + self.run_consumer(), + ) + .1 + } + .boxed(); + + // Attach contexts to the future. + let run = expr_context_scope(expr_context, run); + let run = FRAGMENT_ID::scope(fragment_id, run); + let run = VNODE_COUNT::scope(vnode_count, run); + + run.await } async fn run_consumer(self) -> StreamResult<()> { From 7e80ecaf87480c9e73072ac9d216c7bfee0de960 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 12 Sep 2024 17:10:35 +0800 Subject: [PATCH 03/13] give up and use 256 for singleton vnode bitmap Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/bitmap.rs | 24 +++++++++++++------ .../src/hash/consistent_hash/mapping.rs | 3 ++- src/common/src/hash/table_distribution.rs | 9 +++---- src/meta/src/stream/stream_graph/schedule.rs | 4 ++-- .../log_store_impl/kv_log_store/serde.rs | 2 +- src/stream/src/executor/actor.rs | 3 ++- 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index eee6a64a2b42..397335b2e7ea 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -13,8 +13,9 @@ // limitations under the License. use std::ops::RangeInclusive; +use std::sync::{Arc, LazyLock}; -use crate::bitmap::Bitmap; +use crate::bitmap::{Bitmap, BitmapBuilder}; use crate::hash::table_distribution::SINGLETON_VNODE; use crate::hash::VirtualNode; @@ -39,15 +40,24 @@ impl Bitmap { } /// Returns whether only the [`SINGLETON_VNODE`] is set in the bitmap. - /// - /// Note that this method returning `true` does not imply that the bitmap was created by - /// [`VnodeBitmapExt::singleton`], or that the bitmap has length 1. pub fn is_singleton(&self) -> bool { self.count_ones() == 1 && self.iter_vnodes().next().unwrap() == SINGLETON_VNODE } - /// Creates a bitmap with length 1 and the single bit set. - pub fn singleton() -> Self { - Self::ones(1) + /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length + /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + pub fn singleton() -> &'static Self { + Self::singleton_arc() + } + + /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length + /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + pub fn singleton_arc() -> &'static Arc { + static SINGLETON: LazyLock> = LazyLock::new(|| { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + builder.set(SINGLETON_VNODE.to_index(), true); + builder.finish().into() + }); + &SINGLETON } } diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 80d5a56941cf..644349b10d7e 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -140,7 +140,8 @@ impl VnodeMapping { } /// Create a vnode mapping with the single item. Should only be used for singletons. - // TODO(var-vnode): make vnode count 1, also `Distribution::vnode_count` + /// + /// For backwards compatibility, [`VirtualNode::COUNT`] is used as the vnode count. pub fn new_single(item: T::Item) -> Self { Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) } diff --git a/src/common/src/hash/table_distribution.rs b/src/common/src/hash/table_distribution.rs index 5275aca04adb..822db591c157 100644 --- a/src/common/src/hash/table_distribution.rs +++ b/src/common/src/hash/table_distribution.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::mem::replace; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use itertools::Itertools; use risingwave_pb::plan_common::StorageTableDesc; @@ -74,7 +74,7 @@ impl TableDistribution { ) -> Self { let compute_vnode = if let Some(vnode_col_idx_in_pk) = vnode_col_idx_in_pk { ComputeVnode::VnodeColumnIndex { - vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton().into()), + vnodes: vnodes.unwrap_or_else(|| Bitmap::singleton_arc().clone()), vnode_col_idx_in_pk, } } else if !dist_key_in_pk_indices.is_empty() { @@ -132,13 +132,10 @@ impl TableDistribution { /// Get vnode bitmap if distributed, or a dummy [`Bitmap::singleton()`] if singleton. pub fn vnodes(&self) -> &Arc { - static SINGLETON_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::singleton().into()); - match &self.compute_vnode { ComputeVnode::DistKeyIndices { vnodes, .. } => vnodes, ComputeVnode::VnodeColumnIndex { vnodes, .. } => vnodes, - ComputeVnode::Singleton => &SINGLETON_VNODES, + ComputeVnode::Singleton => Bitmap::singleton_arc(), } } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index f67d8547e28a..ef2691deb180 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -152,8 +152,8 @@ impl Distribution { } /// Get the vnode count of the distribution. - // TODO(var-vnode): after `ServingVnodeMapping::upsert` is made vnode-count-aware, - // we may return 1 for singleton. + /// + /// For backwards compatibility, [`VirtualNode::COUNT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { Distribution::Singleton(_) => VirtualNode::COUNT, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index ec7cc62f2d49..6e020536cfb9 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -201,7 +201,7 @@ impl LogStoreRowSerde { let vnodes = match vnodes { Some(vnodes) => vnodes, - None => Bitmap::singleton().into(), + None => Bitmap::singleton_arc().clone(), }; // epoch and seq_id. The seq_id of barrier is set null, and therefore the second order type diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index b6d98b63a8fa..b59abd6dff57 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -103,7 +103,8 @@ impl ActorContext { fragment_id: stream_actor.fragment_id, mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) - // TODO(var-vnode): use 1 for singleton fragment + // An unset `vnode_bitmap` means the actor is a singleton. + // For backwards compatibility, `VirtualNode::COUNT` is used for singleton. .map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), From a003192918a37757dbe5ccf044c683886fb4bc4c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 12 Sep 2024 17:26:37 +0800 Subject: [PATCH 04/13] rename vnode count to count_for_compat Signed-off-by: Bugen Zhao --- src/common/src/config.rs | 4 ++-- src/common/src/hash/consistent_hash/bitmap.rs | 6 +++--- src/common/src/hash/consistent_hash/compat.rs | 4 ++-- .../src/hash/consistent_hash/mapping.rs | 4 ++-- src/common/src/hash/consistent_hash/vnode.rs | 19 +++++++++++-------- src/common/src/session_config/mod.rs | 2 +- src/config/docs.md | 4 ++-- src/expr/core/src/expr_context.rs | 6 ++++-- src/expr/impl/src/scalar/vnode.rs | 14 ++++++++++---- src/frontend/src/handler/alter_parallelism.rs | 2 +- src/jni_core/src/lib.rs | 4 ++-- src/meta/src/serving/mod.rs | 7 ++++++- src/meta/src/stream/stream_graph/schedule.rs | 4 ++-- .../src/hummock/sstable/multi_builder.rs | 2 +- src/stream/src/executor/actor.rs | 4 ++-- .../scale/streaming_parallelism.rs | 6 +++--- 16 files changed, 54 insertions(+), 38 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index b312304e8079..7ab8b5d84c69 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -322,7 +322,7 @@ pub struct MetaConfig { pub do_not_config_object_storage_lifecycle: bool, /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. - /// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. #[serde(default = "default::meta::partition_vnode_count")] pub partition_vnode_count: u32, @@ -351,7 +351,7 @@ pub struct MetaConfig { /// Count of partitions of tables in default group and materialized view group. /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. - /// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. /// Set it zero to disable this feature. #[serde(default = "default::meta::hybrid_partition_vnode_count")] pub hybrid_partition_vnode_count: u32, diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index 397335b2e7ea..a40946273a0a 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 58ff07a1514f..0c86fbb12bcd 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -16,7 +16,7 @@ use super::vnode::VirtualNode; /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { - /// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set, + /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, /// typically for backward compatibility. /// /// See the documentation on the field of the implementing type for more details. @@ -38,7 +38,7 @@ macro_rules! impl_maybe_vnode_count_compat { impl VnodeCountCompat for $ty { fn vnode_count(&self) -> usize { self.maybe_vnode_count - .map_or(VirtualNode::COUNT, |v| v as _) + .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) } } )* diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 644349b10d7e..1e7bca125fc5 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -141,9 +141,9 @@ impl VnodeMapping { /// Create a vnode mapping with the single item. Should only be used for singletons. /// - /// For backwards compatibility, [`VirtualNode::COUNT`] is used as the vnode count. + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) + Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) } /// The length (or count) of the vnode in this mapping. diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 388083b81bff..2a7390cfbbc5 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -51,19 +51,22 @@ impl Crc32HashCode { } impl VirtualNode { - /// The total count of virtual nodes. - // TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST` - pub const COUNT: usize = 1 << 8; - /// The maximum value of the virtual node. - // TODO(var-vnode): remove this and only keep `MAX_FOR_TEST` - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); + /// The total count of virtual nodes, for compatibility purposes **ONLY**. + /// + /// Typical use cases: + /// + /// - As the default value for the session configuration. + /// - As the vnode count for all streaming jobs, fragments, and tables that were created before + /// the variable vnode count support was introduced. + /// - As the vnode count for singletons. + pub const COUNT_FOR_COMPAT: usize = 1 << 8; } impl VirtualNode { /// The total count of virtual nodes, for testing purposes. - pub const COUNT_FOR_TEST: usize = Self::COUNT; + pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT; /// The maximum value of the virtual node, for testing purposes. - pub const MAX_FOR_TEST: VirtualNode = Self::MAX; + pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1); } impl VirtualNode { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index ea85e0cb266e..5509284e80dd 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -300,7 +300,7 @@ pub struct SessionConfig { #[parameter(default = false)] bypass_cluster_limits: bool, - #[parameter(default = VirtualNode::COUNT, check_hook = check_vnode_count)] + #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)] vnode_count: usize, } diff --git a/src/config/docs.md b/src/config/docs.md index c9d008af2bb3..edf32965c492 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config` | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | -| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | +| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | | meta_leader_lease_secs | | 30 | | min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 | @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config` | parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 | | parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 | | parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 | -| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | +| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | | periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 | | periodic_scheduling_compaction_group_interval_sec | | 10 | | periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 | diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index 547aef01fef1..d6798a861315 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -30,9 +30,11 @@ pub fn capture_expr_context() -> ExprResult { Ok(ExprContext { time_zone }) } -/// Get the vnode count from the context, or [`VirtualNode::COUNT`] if not set. +/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set. +// TODO(var-vnode): the only case where this is not set is for batch queries, is it still +// necessary to support `rw_vnode` expression in batch queries? pub fn vnode_count() -> usize { - VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT) + VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT) } pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 960d71ca809d..8fdecda841d8 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -67,9 +67,11 @@ mod tests { use risingwave_common::hash::VirtualNode; use risingwave_common::row::Row; use risingwave_expr::expr::build_from_pretty; + use risingwave_expr::expr_context::VNODE_COUNT; #[tokio::test] async fn test_vnode_expr_eval() { + let vnode_count = 32; let expr = build_from_pretty("(vnode:int2 $0:int4 $0:int8 $0:varchar)"); let input = DataChunk::from_pretty( "i I T @@ -79,17 +81,21 @@ mod tests { ); // test eval - let output = expr.eval(&input).await.unwrap(); + let output = VNODE_COUNT::scope(vnode_count, expr.eval(&input)) + .await + .unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } // test eval_row for row in input.rows() { - let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + let result = VNODE_COUNT::scope(vnode_count, expr.eval_row(&row.to_owned_row())) + .await + .unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 8c3dbd6e419c..23ccc6706fa0 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -105,7 +105,7 @@ pub async fn handle_alter_parallelism( .sum::(); // TODO(var-vnode): get max parallelism from catalogs. // Although the meta service will clamp the value for us, we should still check it here for better UI. - let max_parallelism = VirtualNode::COUNT; + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; let mut builder = RwPgResponse::builder(stmt_type); diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index bb7dca9e0231..efcd8c0b8b59 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -320,8 +320,8 @@ impl<'a> Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - // TODO(var-vnode): vnode count can vary for different tables. - VirtualNode::COUNT as jint + // TODO(var-vnode): vnode count can vary for different tables, use real ones. + VirtualNode::COUNT_FOR_COMPAT as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 3d7ab9888362..a7d719369d7c 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -58,7 +58,12 @@ impl ServingVnodeMapping { None }; // TODO(var-vnode): also fetch vnode count for each fragment - place_vnode(old_mapping, workers, max_parallelism, VirtualNode::COUNT) + place_vnode( + old_mapping, + workers, + max_parallelism, + VirtualNode::COUNT_FOR_COMPAT, + ) }; match new_mapping { None => { diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index ef2691deb180..55ae23171bdf 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -153,10 +153,10 @@ impl Distribution { /// Get the vnode count of the distribution. /// - /// For backwards compatibility, [`VirtualNode::COUNT`] is used for singleton. + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT, + Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 25bdd54df720..45870257bc04 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -94,7 +94,7 @@ where concurrent_uploading_sst_count: Option, ) -> Self { // TODO(var-vnode): should use value from caller - let vnode_count = VirtualNode::COUNT; + let vnode_count = VirtualNode::COUNT_FOR_COMPAT; Self { builder_factory, diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index b59abd6dff57..0aa5ebca9007 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -104,8 +104,8 @@ impl ActorContext { mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT` is used for singleton. - .map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()), + // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. + .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index bef6b1ca1f1c..7494b2bcc53b 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; From 5a802b9f1363a6b93bb9692f47b8eddf9113e84f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 12 Sep 2024 17:49:04 +0800 Subject: [PATCH 05/13] serving info be aware of vnode count Signed-off-by: Bugen Zhao --- src/expr/impl/src/scalar/vnode.rs | 1 - src/meta/src/controller/fragment.rs | 25 +++++++++++++--- src/meta/src/manager/catalog/fragment.rs | 28 ++++++++++------- src/meta/src/manager/metadata.rs | 8 ++--- src/meta/src/serving/mod.rs | 38 ++++++++++++------------ 5 files changed, 61 insertions(+), 39 deletions(-) diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 8fdecda841d8..7d44dfb0e03b 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -64,7 +64,6 @@ impl Expression for VnodeExpression { #[cfg(test)] mod tests { use risingwave_common::array::{DataChunk, DataChunkTestExt}; - use risingwave_common::hash::VirtualNode; use risingwave_common::row::Row; use risingwave_expr::expr::build_from_pretty; use risingwave_expr::expr_context::VNODE_COUNT; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index b29c4cc207eb..721e6da5fc7f 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -56,7 +56,9 @@ use crate::controller::utils::{ get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; -use crate::manager::{ActorInfos, InflightFragmentInfo, LocalNotification}; +use crate::manager::{ + ActorInfos, FragmentParallelismInfo, InflightFragmentInfo, LocalNotification, +}; use crate::model::{TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -475,8 +477,9 @@ impl CatalogController { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> MetaResult> { + ) -> MetaResult> { let inner = self.inner.read().await; + let mut select = Actor::find() .select_only() .column(actor::Column::FragmentId) @@ -485,11 +488,25 @@ impl CatalogController { if let Some(id_filter) = id_filter { select = select.having(actor::Column::FragmentId.is_in(id_filter)); } - let fragment_parallelisms: Vec<(FragmentId, i64)> = + select = select + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .column(fragment::Column::DistributionType) + .column(fragment::Column::VnodeCount); + + let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> = select.into_tuple().all(&inner.db).await?; Ok(fragment_parallelisms .into_iter() - .map(|(fragment_id, count)| (fragment_id, count as usize)) + .map(|(fragment_id, count, distribution_type, vnode_count)| { + ( + fragment_id, + FragmentParallelismInfo { + distribution_type: distribution_type.into(), + actor_count: count as usize, + vnode_count: vnode_count as usize, + }, + ) + }) .collect()) } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 6ec70a4b8d28..07f73fbc0afe 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::stream_graph_visitor::{ visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut, }; @@ -136,7 +136,7 @@ impl FragmentManagerCore { fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> HashMap { + ) -> HashMap { self.table_fragments .values() .filter(|tf| tf.state() != State::Initial) @@ -148,13 +148,14 @@ impl FragmentManagerCore { return None; } - let parallelism = match fragment.get_distribution_type().unwrap() { - FragmentDistributionType::Unspecified => unreachable!(), - FragmentDistributionType::Single => 1, - FragmentDistributionType::Hash => fragment.get_actors().len(), - }; - - Some((fragment.fragment_id, parallelism)) + Some(( + fragment.fragment_id, + FragmentParallelismInfo { + distribution_type: fragment.get_distribution_type().unwrap(), + actor_count: fragment.actors.len(), + vnode_count: fragment.vnode_count(), + }, + )) }) }) .collect() @@ -189,6 +190,13 @@ impl ActorInfos { } } +#[derive(Clone, Debug)] +pub struct FragmentParallelismInfo { + pub distribution_type: FragmentDistributionType, + pub actor_count: usize, + pub vnode_count: usize, +} + pub type FragmentManagerRef = Arc; impl FragmentManager { @@ -1678,7 +1686,7 @@ impl FragmentManager { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> HashMap { + ) -> HashMap { self.core .read() .await diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 4e0ca9e77f4b..78df862e1d8e 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -31,6 +31,7 @@ use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; use tracing::warn; +use super::FragmentParallelismInfo; use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; @@ -436,15 +437,12 @@ impl MetadataManager { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> MetaResult> { + ) -> MetaResult> { match self { MetadataManager::V1(mgr) => Ok(mgr .fragment_manager .running_fragment_parallelisms(id_filter) - .await - .into_iter() - .map(|(k, v)| (k as FragmentId, v)) - .collect()), + .await), MetadataManager::V2(mgr) => { let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect()); Ok(mgr diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index a7d719369d7c..c7e39ca8a0a8 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,15 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::{VirtualNode, WorkerSlotMapping}; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef}; +use crate::manager::{ + FragmentParallelismInfo, LocalNotification, MetadataManager, NotificationManagerRef, +}; use crate::model::FragmentId; pub type ServingVnodeMappingRef = Arc; @@ -43,27 +46,21 @@ impl ServingVnodeMapping { /// Returns (successful updates, failed updates). pub fn upsert( &self, - streaming_parallelisms: HashMap, + streaming_parallelisms: HashMap, workers: &[WorkerNode], ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; - for (fragment_id, streaming_parallelism) in streaming_parallelisms { + for (fragment_id, info) in streaming_parallelisms { let new_mapping = { let old_mapping = serving_vnode_mappings.get(&fragment_id); - let max_parallelism = if streaming_parallelism == 1 { - Some(1) - } else { - None + let max_parallelism = match info.distribution_type { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => Some(1), + FragmentDistributionType::Hash => None, }; - // TODO(var-vnode): also fetch vnode count for each fragment - place_vnode( - old_mapping, - workers, - max_parallelism, - VirtualNode::COUNT_FOR_COMPAT, - ) + place_vnode(old_mapping, workers, max_parallelism, info.vnode_count) }; match new_mapping { None => { @@ -134,7 +131,10 @@ pub async fn on_meta_start( async fn fetch_serving_infos( metadata_manager: &MetadataManager, -) -> (Vec, HashMap) { +) -> ( + Vec, + HashMap, +) { match metadata_manager { MetadataManager::V1(mgr) => ( mgr.cluster_manager @@ -160,7 +160,7 @@ async fn fetch_serving_infos( serving_compute_nodes, parallelisms .into_iter() - .map(|(fragment_id, cnt)| (fragment_id as FragmentId, cnt)) + .map(|(fragment_id, info)| (fragment_id as FragmentId, info)) .collect(), ) } @@ -198,9 +198,9 @@ pub async fn start_serving_vnode_mapping_worker( continue; } let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; - let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id|{ + let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| { match streaming_parallelisms.get(frag_id) { - Some(parallelism) => Some((*frag_id, *parallelism)), + Some(info) => Some((*frag_id, info.clone())), None => { tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found"); None From 6067ec56a61f6a723c15306a0da5f58458c66717 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 16 Sep 2024 14:02:58 +0800 Subject: [PATCH 06/13] fix java binding vnode count Signed-off-by: Bugen Zhao --- java/com_risingwave_java_binding_Binding.h | 4 ++-- .../com/risingwave/java/binding/HummockReadDemo.java | 5 ++++- .../main/java/com/risingwave/java/binding/Binding.java | 6 +++++- src/jni_core/src/lib.rs | 5 +++-- src/jni_core/src/macros.rs | 10 +++++----- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/java/com_risingwave_java_binding_Binding.h b/java/com_risingwave_java_binding_Binding.h index 606110c40528..9de9f1d2f4fd 100644 --- a/java/com_risingwave_java_binding_Binding.h +++ b/java/com_risingwave_java_binding_Binding.h @@ -9,10 +9,10 @@ extern "C" { #endif /* * Class: com_risingwave_java_binding_Binding - * Method: vnodeCount + * Method: defaultVnodeCount * Signature: ()I */ -JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_vnodeCount +JNIEXPORT jint JNICALL Java_com_risingwave_java_binding_Binding_defaultVnodeCount (JNIEnv *, jclass); /* diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java index 6a3598b9c97d..53ad64cc6a10 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java @@ -50,7 +50,10 @@ public static void main(String[] args) { HummockVersion version = metaClient.pinVersion(); Table tableCatalog = metaClient.getTable(dbName, tableName); - int vnodeCount = Binding.vnodeCount(); + int vnodeCount = Binding.defaultVnodeCount(); + if (tableCatalog.hasMaybeVnodeCount()) { + vnodeCount = tableCatalog.getMaybeVnodeCount(); + } List vnodeList = new ArrayList<>(); for (int i = 0; i < vnodeCount; i++) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index db832566fdfa..5d1a555968af 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -33,7 +33,11 @@ public static native void tracingSlf4jEvent( public static native boolean tracingSlf4jEventEnabled(int level); - public static native int vnodeCount(); + /** + * Used to get the default number of vnodes for a table, if its `maybeVnodeCount` field is not + * set. + */ + public static native int defaultVnodeCount(); static native long iteratorNewStreamChunk(long pointer); diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index efcd8c0b8b59..3f776c3a2e98 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -319,8 +319,9 @@ impl<'a> Deref for JavaBindingIterator<'a> { } #[no_mangle] -extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - // TODO(var-vnode): vnode count can vary for different tables, use real ones. +extern "system" fn Java_com_risingwave_java_binding_Binding_defaultVnodeCount( + _env: EnvParam<'_>, +) -> jint { VirtualNode::COUNT_FOR_COMPAT as jint } diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index 2e7d095e0bd4..e89023285513 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -385,7 +385,7 @@ macro_rules! to_jvalue { /// gen_jni_sig!(boolean f(int, java.lang.String)), /// "(ILjava/lang/String;)Z" /// ); -/// assert_eq!(gen_jni_sig!(public static native int vnodeCount()), "()I"); +/// assert_eq!(gen_jni_sig!(public static native int defaultVnodeCount()), "()I"); /// assert_eq!( /// gen_jni_sig!(long hummockIteratorNew(byte[] readPlan)), /// "([B)J" @@ -445,7 +445,7 @@ macro_rules! for_all_plain_native_methods { public static native boolean tracingSlf4jEventEnabled(int level); - public static native int vnodeCount(); + public static native int defaultVnodeCount(); static native long iteratorNewStreamChunk(long pointer); @@ -861,7 +861,7 @@ mod tests { (test) => {{ for_all_native_methods! { { - public static native int vnodeCount(); + public static native int defaultVnodeCount(); static native long hummockIteratorNew(byte[] readPlan); public static native byte[] rowGetKey(long pointer); }, @@ -885,7 +885,7 @@ mod tests { assert_eq!( sig, [ - ("vnodeCount", "()I"), + ("defaultVnodeCount", "()I"), ("hummockIteratorNew", "([B)J"), ("rowGetKey", "(J)[B") ] @@ -902,7 +902,7 @@ mod tests { [ tracingSlf4jEvent (Ljava/lang/String;Ljava/lang/String;ILjava/lang/String;)V, tracingSlf4jEventEnabled (I)Z, - vnodeCount ()I, + defaultVnodeCount ()I, iteratorNewStreamChunk (J)J, iteratorNext (J)Z, iteratorClose (J)V, From 417a9a2e77c0b645f1d7c1ba7f99e09b6c4bf9e3 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 16 Sep 2024 14:49:38 +0800 Subject: [PATCH 07/13] do not rewrite alter parallelism in frontend Signed-off-by: Bugen Zhao --- src/frontend/src/handler/alter_parallelism.rs | 20 +++++++++---------- src/meta/src/stream/scale.rs | 2 -- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 23ccc6706fa0..3a823330e974 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -93,7 +93,7 @@ pub async fn handle_alter_parallelism( } }; - let mut target_parallelism = extract_table_parallelism(parallelism)?; + let target_parallelism = extract_table_parallelism(parallelism)?; let available_parallelism = session .env() @@ -103,26 +103,24 @@ pub async fn handle_alter_parallelism( .filter(|w| w.is_streaming_schedulable()) .map(|w| w.parallelism) .sum::(); - // TODO(var-vnode): get max parallelism from catalogs. - // Although the meta service will clamp the value for us, we should still check it here for better UI. - let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; let mut builder = RwPgResponse::builder(stmt_type); + // TODO(var-vnode): get correct max parallelism from catalogs. + // Although the meta service will clamp the value for us and print warnings there, + // we may still check it here for better UI. + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; + match &target_parallelism.parallelism { Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { if available_parallelism > max_parallelism as u32 { - builder = builder.notice(format!("Available parallelism exceeds the maximum parallelism limit, the actual parallelism will be limited to {max_parallelism}")); + builder = builder.notice("Available parallelism may exceed the maximum parallelism limit, the actual parallelism will be limited"); } } Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { if *parallelism > max_parallelism as u32 { - builder = builder.notice(format!("Provided parallelism exceeds the maximum parallelism limit, resetting to FIXED({max_parallelism})")); - target_parallelism = PbTableParallelism { - parallelism: Some(PbParallelism::Fixed(FixedParallelism { - parallelism: max_parallelism as u32, - })), - }; + builder = builder.notice("Provided parallelism may exceed the maximum parallelism limit, will be reset to FIXED(max_parallelism)"); + // Rewriting will be done in meta service. } } _ => {} diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index c2891d81b81a..58e731b83bc0 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2244,8 +2244,6 @@ impl ScaleController { } TableParallelism::Fixed(mut n) => { if n > max_parallelism { - // This should be unreachable as it was already checked and rewritten in the frontend. - // We still intercept it to prevent accidental modifications. tracing::warn!("specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"); n = max_parallelism } From 4cdd87484782a559061994ec72dda058c9d6a7fd Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 23 Sep 2024 18:07:07 +0800 Subject: [PATCH 08/13] do not expose `vnode_count` term to user, use `max_parallelism` instead Signed-off-by: Bugen Zhao --- src/common/src/session_config/mod.rs | 17 ++++++++++++++--- src/frontend/src/stream_fragmenter/mod.rs | 2 +- src/meta/src/rpc/ddl_controller.rs | 6 +++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 5509284e80dd..813172ac20a6 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -140,8 +140,11 @@ pub struct SessionConfig { #[parameter(default = "UTC", check_hook = check_timezone)] timezone: String, - /// If `STREAMING_PARALLELISM` is non-zero, CREATE MATERIALIZED VIEW/TABLE/INDEX will use it as - /// streaming parallelism. + /// The execution parallelism for streaming queries, including tables, materialized views, indexes, + /// and sinks. Defaults to 0, which means they will be scheduled adaptively based on the cluster size. + /// + /// If a non-zero value is set, streaming queries will be scheduled to use a fixed number of parallelism. + /// Note that the value will be bounded at `STREAMING_MAX_PARALLELISM`. #[serde_as(as = "DisplayFromStr")] #[parameter(default = ConfigNonZeroU64::default())] streaming_parallelism: ConfigNonZeroU64, @@ -300,8 +303,16 @@ pub struct SessionConfig { #[parameter(default = false)] bypass_cluster_limits: bool, + /// The maximum number of parallelism a streaming query can use. Defaults to 256. + /// + /// Compared to `STREAMING_PARALLELISM`, which configures the initial parallelism, this configures + /// the maximum parallelism a streaming query can use in the future, if the cluster size changes or + /// users manually change the parallelism with `ALTER .. SET PARALLELISM`. + /// + /// It's not always a good idea to set this to a very large number, as it may cause performance + /// degradation when performing range scans on the table or the materialized view. #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)] - vnode_count: usize, + streaming_max_parallelism: usize, } fn check_timezone(val: &str) -> Result<(), String> { diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 35faeb2c6a49..b671d0792e07 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -144,7 +144,7 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult max_parallelism; if parallelism_limited { - tracing::warn!("Too many parallelism, use {} instead", max_parallelism); + // TODO(var-vnode): may return error here? + tracing::warn!( + "Too many parallelism, use max parallelism {} instead", + max_parallelism + ); } let parallelism = parallelism.min(max_parallelism); From df51fa2646f854dfb1bfa331b06df5321b51161f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 24 Sep 2024 15:26:10 +0800 Subject: [PATCH 09/13] fix e2e Signed-off-by: Bugen Zhao --- e2e_test/batch/catalog/pg_settings.slt.part | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e_test/batch/catalog/pg_settings.slt.part b/e2e_test/batch/catalog/pg_settings.slt.part index e05d466c3a4d..641ce8ac65fd 100644 --- a/e2e_test/batch/catalog/pg_settings.slt.part +++ b/e2e_test/batch/catalog/pg_settings.slt.part @@ -58,6 +58,7 @@ user sink_decouple user source_rate_limit user standard_conforming_strings user statement_timeout +user streaming_max_parallelism user streaming_parallelism user streaming_use_arrangement_backfill user streaming_use_snapshot_backfill From 961d2bf786d8ff60343191de0ce9663b395ab93d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 24 Sep 2024 15:44:37 +0800 Subject: [PATCH 10/13] add non-null constrait to new column (https://github.com/risingwavelabs/risingwave/pull/18444#discussion_r1772740660) Signed-off-by: Bugen Zhao --- .../migration/src/m20240911_083152_variable_vnode_count.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs index 0f93c9e3dc3d..4a30b0828f9b 100644 --- a/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs +++ b/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs @@ -15,6 +15,7 @@ impl MigrationTrait for Migration { .add_column( ColumnDef::new(Table::VnodeCount) .integer() + .not_null() .default(VNODE_COUNT), ) .to_owned(), @@ -28,6 +29,7 @@ impl MigrationTrait for Migration { .add_column( ColumnDef::new(Fragment::VnodeCount) .integer() + .not_null() .default(VNODE_COUNT), ) .to_owned(), From a3a5472363191a7081f027beee695a4dc0ade6f8 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 24 Sep 2024 17:20:32 +0800 Subject: [PATCH 11/13] update vnode count check message Signed-off-by: Bugen Zhao --- src/common/src/session_config/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 813172ac20a6..ec3f18a8e420 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -311,6 +311,7 @@ pub struct SessionConfig { /// /// It's not always a good idea to set this to a very large number, as it may cause performance /// degradation when performing range scans on the table or the materialized view. + // a.k.a. vnode count #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)] streaming_max_parallelism: usize, } @@ -339,12 +340,14 @@ fn check_bytea_output(val: &str) -> Result<(), String> { } } +/// Check if the provided value is a valid vnode count. +/// Note that we use term `max_parallelism` when it's user-facing. fn check_vnode_count(val: &usize) -> Result<(), String> { match val { - 0 => Err("VNODE_COUNT must be greater than 0".to_owned()), + 0 => Err("STREAMING_MAX_PARALLELISM must be greater than 0".to_owned()), 1..=VirtualNode::MAX_COUNT => Ok(()), _ => Err(format!( - "VNODE_COUNT must be less than or equal to {}", + "STREAMING_MAX_PARALLELISM must be less than or equal to {}", VirtualNode::MAX_COUNT )), } From e8bee177d06c2b7c51c0834c294b69065b6c081a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 26 Sep 2024 14:56:03 +0800 Subject: [PATCH 12/13] bail out if specified parallelism cannot be satisfied Signed-off-by: Bugen Zhao --- src/meta/src/error.rs | 12 +++-- src/meta/src/rpc/ddl_controller.rs | 84 ++++++++++++++++++------------ 2 files changed, 58 insertions(+), 38 deletions(-) diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a..2807980236c6 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -27,9 +27,13 @@ use crate::storage::MetaStoreError; pub type MetaResult = std::result::Result; #[derive( - thiserror::Error, thiserror_ext::ReportDebug, thiserror_ext::Arc, thiserror_ext::Construct, + thiserror::Error, + thiserror_ext::ReportDebug, + thiserror_ext::Arc, + thiserror_ext::Construct, + thiserror_ext::Macro, )] -#[thiserror_ext(newtype(name = MetaError, backtrace))] +#[thiserror_ext(newtype(name = MetaError, backtrace), macro(path = "crate::error"))] pub enum MetaErrorInner { #[error("MetaStore transaction error: {0}")] TransactionError( @@ -66,7 +70,7 @@ pub enum MetaErrorInner { InvalidWorker(WorkerId, String), #[error("Invalid parameter: {0}")] - InvalidParameter(String), + InvalidParameter(#[message] String), // Used for catalog errors. #[error("{0} id not found: {1}")] @@ -80,7 +84,7 @@ pub enum MetaErrorInner { Duplicated(&'static str, String), #[error("Service unavailable: {0}")] - Unavailable(String), + Unavailable(#[message] String), #[error("Election failed: {0}")] Election(#[source] BoxedError), diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 70908a880acb..0d8ea1c2ae38 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -68,6 +68,7 @@ use tracing::log::warn; use tracing::Instrument; use crate::barrier::BarrierManagerRef; +use crate::error::{bail_invalid_parameter, bail_unavailable}; use crate::manager::{ CatalogManagerRef, ConnectionId, DatabaseId, DdlType, FragmentManagerRef, FunctionId, IdCategory, IdCategoryType, IndexId, LocalNotification, MetaSrvEnv, MetadataManager, @@ -1540,33 +1541,60 @@ impl DdlController { Ok(version) } + /// Resolve the parallelism of the stream job based on the given information. + /// + /// Returns error if user specifies a parallelism that cannot be satisfied. fn resolve_stream_parallelism( &self, - specified_parallelism: Option, + specified: Option, + max: NonZeroUsize, cluster_info: &StreamingClusterInfo, ) -> MetaResult { - let available_parallelism = cluster_info.parallelism(); - if available_parallelism == 0 { - return Err(MetaError::unavailable("No available slots to schedule")); - } + let available = cluster_info.parallelism(); + let Some(available) = NonZeroUsize::new(available) else { + bail_unavailable!("no available slots to schedule"); + }; - let available_parallelism = NonZeroUsize::new(available_parallelism).unwrap(); + if let Some(specified) = specified { + if specified > max { + bail_invalid_parameter!( + "specified parallelism {} should not exceed max parallelism {}", + specified, + max, + ); + } + if specified > available { + bail_unavailable!( + "not enough parallelism to schedule, required: {}, available: {}", + specified, + available, + ); + } + Ok(specified) + } else { + // Use configured parallelism if no default parallelism is specified. + let default_parallelism = match self.env.opts.default_parallelism { + DefaultParallelism::Full => available, + DefaultParallelism::Default(num) => { + if num > available { + bail_unavailable!( + "not enough parallelism to schedule, required: {}, available: {}", + num, + available, + ); + } + num + } + }; - // Use configured parallelism if no default parallelism is specified. - let parallelism = - specified_parallelism.unwrap_or_else(|| match &self.env.opts.default_parallelism { - DefaultParallelism::Full => available_parallelism, - DefaultParallelism::Default(num) => *num, - }); - - if parallelism > available_parallelism { - return Err(MetaError::unavailable(format!( - "Not enough parallelism to schedule, required: {}, available: {}", - parallelism, available_parallelism - ))); + if default_parallelism > max { + tracing::warn!( + "too many parallelism available, use max parallelism {} instead", + max + ); + } + Ok(default_parallelism.min(max)) } - - Ok(parallelism) } /// Builds the actor graph: @@ -1631,16 +1659,8 @@ impl DdlController { // 2. Build the actor graph. let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; - let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - let parallelism_limited = parallelism > max_parallelism; - if parallelism_limited { - // TODO(var-vnode): may return error here? - tracing::warn!( - "Too many parallelism, use max parallelism {} instead", - max_parallelism - ); - } - let parallelism = parallelism.min(max_parallelism); + let parallelism = + self.resolve_stream_parallelism(specified_parallelism, max_parallelism, &cluster_info)?; let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; @@ -1663,10 +1683,6 @@ impl DdlController { // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE. // Otherwise, it defaults to FIXED based on deduction. let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) { - (None, DefaultParallelism::Full) if parallelism_limited => { - tracing::warn!("Parallelism limited to {max_parallelism} in ADAPTIVE mode"); - TableParallelism::Adaptive - } (None, DefaultParallelism::Full) => TableParallelism::Adaptive, _ => TableParallelism::Fixed(parallelism.get()), }; From f27da15633b62df9256e29881da2ff9747c206b0 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 26 Sep 2024 16:01:31 +0800 Subject: [PATCH 13/13] check parallelism for alter on meta, instead of frontend Signed-off-by: Bugen Zhao --- src/frontend/src/handler/alter_parallelism.rs | 32 +-------------- src/meta/src/stream/stream_manager.rs | 39 ++++++++++++++++++- .../scale/streaming_parallelism.rs | 13 ++++--- 3 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 3a823330e974..57aedc0e1490 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -14,9 +14,8 @@ use pgwire::pg_response::StatementType; use risingwave_common::bail; -use risingwave_common::hash::VirtualNode; use risingwave_pb::meta::table_parallelism::{ - AdaptiveParallelism, FixedParallelism, Parallelism, PbParallelism, + AdaptiveParallelism, FixedParallelism, PbParallelism, }; use risingwave_pb::meta::{PbTableParallelism, TableParallelism}; use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value}; @@ -95,37 +94,8 @@ pub async fn handle_alter_parallelism( let target_parallelism = extract_table_parallelism(parallelism)?; - let available_parallelism = session - .env() - .worker_node_manager() - .list_worker_nodes() - .iter() - .filter(|w| w.is_streaming_schedulable()) - .map(|w| w.parallelism) - .sum::(); - let mut builder = RwPgResponse::builder(stmt_type); - // TODO(var-vnode): get correct max parallelism from catalogs. - // Although the meta service will clamp the value for us and print warnings there, - // we may still check it here for better UI. - let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; - - match &target_parallelism.parallelism { - Some(Parallelism::Adaptive(_)) | Some(Parallelism::Auto(_)) => { - if available_parallelism > max_parallelism as u32 { - builder = builder.notice("Available parallelism may exceed the maximum parallelism limit, the actual parallelism will be limited"); - } - } - Some(Parallelism::Fixed(FixedParallelism { parallelism })) => { - if *parallelism > max_parallelism as u32 { - builder = builder.notice("Provided parallelism may exceed the maximum parallelism limit, will be reset to FIXED(max_parallelism)"); - // Rewriting will be done in meta service. - } - } - _ => {} - }; - let catalog_writer = session.catalog_writer()?; catalog_writer .alter_parallelism(table_id, target_parallelism, deferred) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 118252038dbb..f12de49cccbb 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -19,6 +19,7 @@ use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableId; +use risingwave_common::hash::VirtualNode; use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; @@ -33,6 +34,7 @@ use crate::barrier::{ BarrierScheduler, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType, ReplaceTablePlan, SnapshotBackfillInfo, }; +use crate::error::bail_invalid_parameter; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SourceManagerRef; @@ -666,14 +668,47 @@ impl GlobalStreamManager { let worker_nodes = self .metadata_manager .list_active_streaming_compute_nodes() - .await?; + .await? + .into_iter() + .filter(|w| w.is_streaming_schedulable()) + .collect_vec(); let worker_ids = worker_nodes .iter() - .filter(|w| w.property.as_ref().map_or(true, |p| !p.is_unschedulable)) .map(|node| node.id) .collect::>(); + // Check if the provided parallelism is valid. + let available_parallelism = worker_nodes + .iter() + .map(|w| w.parallelism as usize) + .sum::(); + // TODO(var-vnode): get correct max parallelism from catalogs. + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; + + match parallelism { + TableParallelism::Adaptive => { + if available_parallelism > max_parallelism { + tracing::warn!( + "too many parallelism available, use max parallelism {} will be limited", + max_parallelism + ); + } + } + TableParallelism::Fixed(parallelism) => { + if parallelism > max_parallelism { + bail_invalid_parameter!( + "specified parallelism {} should not exceed max parallelism {}", + parallelism, + max_parallelism + ); + } + } + TableParallelism::Custom => { + bail_invalid_parameter!("should not alter parallelism to custom") + } + } + let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]); if deferred { diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index 7494b2bcc53b..1604de034b6d 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -190,13 +190,14 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { .await? .assert_result_eq("FIXED(1)"); - session + let result = session .run(format!("alter table t set parallelism = {}", vnode_max + 1)) - .await?; - session - .run("select parallelism from rw_streaming_parallelism where name = 't'") - .await? - .assert_result_eq(format!("FIXED({})", vnode_max)); + .await; + + // This should be rejected. + // TODO(var-vnode): showing that it's rejected for different vnode counts. + assert!(result.is_err(), "{result:?}"); + Ok(()) }