From 1a294f24d82420e89e98d537b3369d922ed2420f Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 12 Sep 2024 17:26:37 +0800 Subject: [PATCH] rename vnode count to count_for_compat Signed-off-by: Bugen Zhao --- src/common/src/config.rs | 4 ++-- src/common/src/hash/consistent_hash/bitmap.rs | 6 +++--- src/common/src/hash/consistent_hash/compat.rs | 4 ++-- .../src/hash/consistent_hash/mapping.rs | 4 ++-- src/common/src/hash/consistent_hash/vnode.rs | 19 +++++++++++-------- src/common/src/session_config/mod.rs | 2 +- src/common/src/util/row_id.rs | 14 ++++++++++---- src/config/docs.md | 4 ++-- src/expr/core/src/expr_context.rs | 6 ++++-- src/expr/impl/src/scalar/vnode.rs | 14 ++++++++++---- src/frontend/src/handler/alter_parallelism.rs | 2 +- src/jni_core/src/lib.rs | 4 ++-- src/meta/src/serving/mod.rs | 7 ++++++- src/meta/src/stream/stream_graph/schedule.rs | 4 ++-- .../src/hummock/sstable/multi_builder.rs | 2 +- src/stream/src/executor/actor.rs | 4 ++-- .../scale/streaming_parallelism.rs | 6 +++--- 17 files changed, 64 insertions(+), 42 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index e2b4dd7b0f97c..9ed8c185036bf 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -318,7 +318,7 @@ pub struct MetaConfig { pub do_not_config_object_storage_lifecycle: bool, /// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. - /// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. #[serde(default = "default::meta::partition_vnode_count")] pub partition_vnode_count: u32, @@ -347,7 +347,7 @@ pub struct MetaConfig { /// Count of partitions of tables in default group and materialized view group. /// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. - /// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. + /// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. /// Set it zero to disable this feature. #[serde(default = "default::meta::hybrid_partition_vnode_count")] pub hybrid_partition_vnode_count: u32, diff --git a/src/common/src/hash/consistent_hash/bitmap.rs b/src/common/src/hash/consistent_hash/bitmap.rs index 397335b2e7ea0..a40946273a0a7 100644 --- a/src/common/src/hash/consistent_hash/bitmap.rs +++ b/src/common/src/hash/consistent_hash/bitmap.rs @@ -45,16 +45,16 @@ impl Bitmap { } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. pub fn singleton() -> &'static Self { Self::singleton_arc() } /// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length - /// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1. + /// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1. pub fn singleton_arc() -> &'static Arc { static SINGLETON: LazyLock> = LazyLock::new(|| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT); builder.set(SINGLETON_VNODE.to_index(), true); builder.finish().into() }); diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 0885314b8f737..20451ebb31464 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -16,7 +16,7 @@ use super::vnode::VirtualNode; /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { - /// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set, + /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, /// typically for backward compatibility. /// /// See the documentation on the field of the implementing type for more details. @@ -29,7 +29,7 @@ macro_rules! impl_maybe_vnode_count_compat { impl VnodeCountCompat for $ty { fn vnode_count(&self) -> usize { self.maybe_vnode_count - .map_or(VirtualNode::COUNT, |v| v as _) + .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) } } )* diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 644349b10d7ed..1e7bca125fc50 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -141,9 +141,9 @@ impl VnodeMapping { /// Create a vnode mapping with the single item. Should only be used for singletons. /// - /// For backwards compatibility, [`VirtualNode::COUNT`] is used as the vnode count. + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count. pub fn new_single(item: T::Item) -> Self { - Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) + Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT) } /// The length (or count) of the vnode in this mapping. diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 4dcfaf8d4a105..ad8ac8b3bcd1d 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -51,19 +51,22 @@ impl Crc32HashCode { } impl VirtualNode { - /// The total count of virtual nodes. - // TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST` - pub const COUNT: usize = 1 << 8; - /// The maximum value of the virtual node. - // TODO(var-vnode): remove this and only keep `MAX_FOR_TEST` - pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1); + /// The total count of virtual nodes, for compatibility purposes **ONLY**. + /// + /// Typical use cases: + /// + /// - As the default value for the session configuration. + /// - As the vnode count for all streaming jobs, fragments, and tables that were created before + /// the variable vnode count support was introduced. + /// - As the vnode count for singletons. + pub const COUNT_FOR_COMPAT: usize = 1 << 8; } impl VirtualNode { /// The total count of virtual nodes, for testing purposes. - pub const COUNT_FOR_TEST: usize = Self::COUNT; + pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT; /// The maximum value of the virtual node, for testing purposes. - pub const MAX_FOR_TEST: VirtualNode = Self::MAX; + pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1); } impl VirtualNode { diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index ea85e0cb266e3..5509284e80dde 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -300,7 +300,7 @@ pub struct SessionConfig { #[parameter(default = false)] bypass_cluster_limits: bool, - #[parameter(default = VirtualNode::COUNT, check_hook = check_vnode_count)] + #[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)] vnode_count: usize, } diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index ef41f61b5f535..76bfc743035d5 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering; use std::time::SystemTime; -use static_assertions::const_assert; +use itertools::Itertools; use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH; use crate::hash::VirtualNode; @@ -25,13 +25,13 @@ const VNODE_ID_SHIFT_BITS: u8 = 12; const SEQUENCE_UPPER_BOUND: u16 = 1 << 12; const VNODE_ID_UPPER_BOUND: u32 = 1 << 10; -const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::COUNT as u32); - /// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format: /// /// | timestamp | vnode id | sequence | /// |-----------|----------|----------| /// | 41 bits | 10 bits | 12 bits | +// TODO(var-vnode): this limits vnode count to 1024, which can be insufficient for large clusters. +// Find a new representation that can support more vnodes. #[derive(Debug)] pub struct RowIdGenerator { /// Specific base timestamp using for generating row ids. @@ -72,10 +72,16 @@ impl RowIdGenerator { /// Create a new `RowIdGenerator` with given virtual nodes. pub fn new(vnodes: impl IntoIterator) -> Self { let base = *UNIX_RISINGWAVE_DATE_EPOCH; + let vnodes = vnodes.into_iter().collect_vec(); + + for vnode in &vnodes { + assert!(vnode.to_index() < VNODE_ID_UPPER_BOUND as usize); + } + Self { base, last_timestamp_ms: base.elapsed().unwrap().as_millis() as i64, - vnodes: vnodes.into_iter().collect(), + vnodes, vnodes_index: 0, sequence: 0, } diff --git a/src/config/docs.md b/src/config/docs.md index bcce61d8bb456..ee76d8891d325 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config` | full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 | | hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 | | hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 | -| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | +| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 | | max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 | | meta_leader_lease_secs | | 30 | | min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 | @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config` | parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 | | parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 | | parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 | -| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | +| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 | | periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 | | periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 | | periodic_split_compact_group_interval_sec | | 10 | diff --git a/src/expr/core/src/expr_context.rs b/src/expr/core/src/expr_context.rs index 547aef01fef18..d6798a861315f 100644 --- a/src/expr/core/src/expr_context.rs +++ b/src/expr/core/src/expr_context.rs @@ -30,9 +30,11 @@ pub fn capture_expr_context() -> ExprResult { Ok(ExprContext { time_zone }) } -/// Get the vnode count from the context, or [`VirtualNode::COUNT`] if not set. +/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set. +// TODO(var-vnode): the only case where this is not set is for batch queries, is it still +// necessary to support `rw_vnode` expression in batch queries? pub fn vnode_count() -> usize { - VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT) + VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT) } pub async fn expr_context_scope(expr_context: ExprContext, future: Fut) -> Fut::Output diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 960d71ca809d8..8fdecda841d85 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -67,9 +67,11 @@ mod tests { use risingwave_common::hash::VirtualNode; use risingwave_common::row::Row; use risingwave_expr::expr::build_from_pretty; + use risingwave_expr::expr_context::VNODE_COUNT; #[tokio::test] async fn test_vnode_expr_eval() { + let vnode_count = 32; let expr = build_from_pretty("(vnode:int2 $0:int4 $0:int8 $0:varchar)"); let input = DataChunk::from_pretty( "i I T @@ -79,17 +81,21 @@ mod tests { ); // test eval - let output = expr.eval(&input).await.unwrap(); + let output = VNODE_COUNT::scope(vnode_count, expr.eval(&input)) + .await + .unwrap(); for vnode in output.iter() { let vnode = vnode.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } // test eval_row for row in input.rows() { - let result = expr.eval_row(&row.to_owned_row()).await.unwrap(); + let result = VNODE_COUNT::scope(vnode_count, expr.eval_row(&row.to_owned_row())) + .await + .unwrap(); let vnode = result.unwrap().into_int16(); - assert!((0..VirtualNode::COUNT as i16).contains(&vnode)); + assert!((0..vnode_count as i16).contains(&vnode)); } } } diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index 8c3dbd6e419cd..23ccc6706fa0d 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -105,7 +105,7 @@ pub async fn handle_alter_parallelism( .sum::(); // TODO(var-vnode): get max parallelism from catalogs. // Although the meta service will clamp the value for us, we should still check it here for better UI. - let max_parallelism = VirtualNode::COUNT; + let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; let mut builder = RwPgResponse::builder(stmt_type); diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index bb7dca9e02319..efcd8c0b8b593 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -320,8 +320,8 @@ impl<'a> Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - // TODO(var-vnode): vnode count can vary for different tables. - VirtualNode::COUNT as jint + // TODO(var-vnode): vnode count can vary for different tables, use real ones. + VirtualNode::COUNT_FOR_COMPAT as jint } #[cfg_or_panic(not(madsim))] diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 3d7ab9888362f..a7d719369d7c3 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -58,7 +58,12 @@ impl ServingVnodeMapping { None }; // TODO(var-vnode): also fetch vnode count for each fragment - place_vnode(old_mapping, workers, max_parallelism, VirtualNode::COUNT) + place_vnode( + old_mapping, + workers, + max_parallelism, + VirtualNode::COUNT_FOR_COMPAT, + ) }; match new_mapping { None => { diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index ef2691deb1807..55ae23171bdf2 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -153,10 +153,10 @@ impl Distribution { /// Get the vnode count of the distribution. /// - /// For backwards compatibility, [`VirtualNode::COUNT`] is used for singleton. + /// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton. pub fn vnode_count(&self) -> usize { match self { - Distribution::Singleton(_) => VirtualNode::COUNT, + Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT, Distribution::Hash(mapping) => mapping.len(), } } diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 25bdd54df720f..45870257bc04e 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -94,7 +94,7 @@ where concurrent_uploading_sst_count: Option, ) -> Self { // TODO(var-vnode): should use value from caller - let vnode_count = VirtualNode::COUNT; + let vnode_count = VirtualNode::COUNT_FOR_COMPAT; Self { builder_factory, diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index acb6bc291a8a4..a8a1ac8004f8d 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -104,8 +104,8 @@ impl ActorContext { mview_definition: stream_actor.mview_definition.clone(), vnode_count: (stream_actor.vnode_bitmap.as_ref()) // An unset `vnode_bitmap` means the actor is a singleton. - // For backwards compatibility, `VirtualNode::COUNT` is used for singleton. - .map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()), + // For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton. + .map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index bef6b1ca1f1c2..7494b2bcc53b7 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_auto_parallelism( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, true, @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100; @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> { - let vnode_max = VirtualNode::COUNT; + let vnode_max = VirtualNode::COUNT_FOR_COMPAT; let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; configuration.compute_node_cores = vnode_max + 100;