diff --git a/src/common/src/config.rs b/src/common/src/config.rs index e2b4dd7b0f97..9ed8c185036b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -318,7 +318,7 @@ pub struct MetaConfig { pub do_not_config_object_storage_lifecycle: bool, /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. - /// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. #[serde(default = "default::meta::partition_vnode_count")] pub partition_vnode_count: u32, @@ -347,7 +347,7 @@ pub struct MetaConfig { /// Count of partitions of tables in default group and materialized view group. /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. - /// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. /// Set it zero to disable this feature. #[serde(default = "default::meta::hybrid_partition_vnode_count")] pub hybrid_partition_vnode_count: u32, diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index 397335b2e7ea..a40946273a0a 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 0885314b8f73..20451ebb3146 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -16,7 +16,7 @@ use super::vnode::VirtualNode; /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { - /// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set, + /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, /// typically for backward compatibility. /// /// See the documentation on the field of the implementing type for more details. @@ -29,7 +29,7 @@ macro_rules! impl_maybe_vnode_count_compat { impl VnodeCountCompat for $ty { fn vnode_count(&self) -> usize { self.maybe_vnode_count - .map_or(VirtualNode::COUNT, |v| v as _) + .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) } } )* diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 644349b10d7e..1e7bca125fc5 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -141,9 +141,9 @@ impl VnodeMapping { /// Create a vnode mapping with the single item. Should only be used for singletons. /// - /// For backwards compatibility, [`VirtualNode::COUNT`] is used as the vnode count. + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) + Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) } /// The length (or count) of the vnode in this mapping. diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 388083b81bff..2a7390cfbbc5 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -51,19 +51,22 @@ impl Crc32HashCode { } impl VirtualNode { - /// The total count of virtual nodes. - // TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST` - pub const COUNT: usize = 1 << 8; - /// The maximum value of the virtual node. - // TODO(var-vnode): remove this and only keep `MAX_FOR_TEST` - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); + /// The total count of virtual nodes, for compatibility purposes **ONLY**. + /// + /// Typical use cases: + /// + /// - As the default value for the session configuration. + /// - As the vnode count for all streaming jobs, fragments, and tables that were created before + /// the variable vnode count support was introduced. + /// - As the vnode count for singletons. + pub const COUNT_FOR_COMPAT: usize = 1 << 8; } impl VirtualNode { /// The total count of virtual nodes, for testing purposes. - pub const COUNT_FOR_TEST: usize = Self::COUNT; + pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT; /// The maximum value of the virtual node, for testing purposes. - pub const MAX_FOR_TEST: VirtualNode = Self::MAX; + pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1); } impl VirtualNode { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index ea85e0cb266e..5509284e80dd 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -300,7 +300,7 @@ pub struct SessionConfig { #[parameter(default = false)] bypass_cluster_limits: bool, - #[parameter(default = VirtualNode::COUNT, check_hook = check_vnode_count)] + #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)] vnode_count: usize, } diff --git a/src/config/docs.md b/src/config/docs.md index bcce61d8bb45..ee76d8891d32 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config` | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | -| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | +| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | | meta_leader_lease_secs | | 30 | | min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 | @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config` | parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 | | parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 | | parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 | -| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | +| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | | periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 | | periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 | | periodic_split_compact_group_interval_sec | | 10 | diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index 547aef01fef1..d6798a861315 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -30,9 +30,11 @@ pub fn capture_expr_context() -> ExprResult { Ok(ExprContext { time_zone }) } -/// Get the vnode count from the context, or [`VirtualNode::COUNT`] if not set. +/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set. +// TODO(var-vnode): the only case where this is not set is for batch queries, is it still +// necessary to support `rw_vnode` expression in batch queries? pub fn vnode_count() -> usize { - VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT) + VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT) } pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 960d71ca809d..8fdecda841d8 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -67,9 +67,11 @@ mod tests { use risingwave_common::hash::VirtualNode; use risingwave_common::row::Row; use risingwave_expr::expr::build_from_pretty; + use risingwave_expr::expr_context::VNODE_COUNT; #[tokio::test] async fn test_vnode_expr_eval() { + let vnode_count = 32; let expr = build_from_pretty("(vnode:int2 $0:int4 $0:int8 $0:varchar)"); let input = DataChunk::from_pretty( "i I T @@ -79,17 +81,21 @@ mod tests { ); // test eval - let output = expr.eval(&input).await.unwrap(); + let output = VNODE_COUNT::scope(vnode_count, expr.eval(&input)) + .await + .unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } // test eval_row for row in input.rows() { - let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + let result = VNODE_COUNT::scope(vnode_count, expr.eval_row(&row.to_owned_row())) + .await + .unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 8c3dbd6e419c..23ccc6706fa0 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -105,7 +105,7 @@ pub async fn handle_alter_parallelism( .sum::(); // TODO(var-vnode): get max parallelism from catalogs. // Although the meta service will clamp the value for us, we should still check it here for better UI. - let max_parallelism = VirtualNode::COUNT; + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; let mut builder = RwPgResponse::builder(stmt_type); diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index bb7dca9e0231..efcd8c0b8b59 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -320,8 +320,8 @@ impl<'a> Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - // TODO(var-vnode): vnode count can vary for different tables. - VirtualNode::COUNT as jint + // TODO(var-vnode): vnode count can vary for different tables, use real ones. + VirtualNode::COUNT_FOR_COMPAT as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 3d7ab9888362..a7d719369d7c 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -58,7 +58,12 @@ impl ServingVnodeMapping { None }; // TODO(var-vnode): also fetch vnode count for each fragment - place_vnode(old_mapping, workers, max_parallelism, VirtualNode::COUNT) + place_vnode( + old_mapping, + workers, + max_parallelism, + VirtualNode::COUNT_FOR_COMPAT, + ) }; match new_mapping { None => { diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index ef2691deb180..55ae23171bdf 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -153,10 +153,10 @@ impl Distribution { /// Get the vnode count of the distribution. /// - /// For backwards compatibility, [`VirtualNode::COUNT`] is used for singleton. + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT, + Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 25bdd54df720..45870257bc04 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -94,7 +94,7 @@ where concurrent_uploading_sst_count: Option, ) -> Self { // TODO(var-vnode): should use value from caller - let vnode_count = VirtualNode::COUNT; + let vnode_count = VirtualNode::COUNT_FOR_COMPAT; Self { builder_factory, diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index acb6bc291a8a..a8a1ac8004f8 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -104,8 +104,8 @@ impl ActorContext { mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT` is used for singleton. - .map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()), + // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. + .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index bef6b1ca1f1c..7494b2bcc53b 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100;