Skip to content

Commit

Permalink
rename vnode count to count_for_compat
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 16, 2024
1 parent 65dfe36 commit 1a294f2
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ pub struct MetaConfig {
pub do_not_config_object_storage_lifecycle: bool,

/// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically.
/// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table.
/// Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table.
#[serde(default = "default::meta::partition_vnode_count")]
pub partition_vnode_count: u32,

Expand Down Expand Up @@ -347,7 +347,7 @@ pub struct MetaConfig {

/// Count of partitions of tables in default group and materialized view group.
/// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment.
/// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Set it zero to disable this feature.
#[serde(default = "default::meta::hybrid_partition_vnode_count")]
pub hybrid_partition_vnode_count: u32,
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ impl Bitmap {
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1.
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
pub fn singleton() -> &'static Self {
Self::singleton_arc()
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT`] and only the [`SINGLETON_VNODE`] set to 1.
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
pub fn singleton_arc() -> &'static Arc<Self> {
static SINGLETON: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT);
builder.set(SINGLETON_VNODE.to_index(), true);
builder.finish().into()
});
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::vnode::VirtualNode;

/// A trait for accessing the vnode count field with backward compatibility.
pub trait VnodeCountCompat {
/// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set,
/// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set,
/// typically for backward compatibility.
///
/// See the documentation on the field of the implementing type for more details.
Expand All @@ -29,7 +29,7 @@ macro_rules! impl_maybe_vnode_count_compat {
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _)
}
}
)*
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {

/// Create a vnode mapping with the single item. Should only be used for singletons.
///
/// For backwards compatibility, [`VirtualNode::COUNT`] is used as the vnode count.
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT)
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
19 changes: 11 additions & 8 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,22 @@ impl Crc32HashCode {
}

impl VirtualNode {
/// The total count of virtual nodes.
// TODO(var-vnode): remove this and only keep `COUNT_FOR_TEST`
pub const COUNT: usize = 1 << 8;
/// The maximum value of the virtual node.
// TODO(var-vnode): remove this and only keep `MAX_FOR_TEST`
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// The total count of virtual nodes, for compatibility purposes **ONLY**.
///
/// Typical use cases:
///
/// - As the default value for the session configuration.
/// - As the vnode count for all streaming jobs, fragments, and tables that were created before
/// the variable vnode count support was introduced.
/// - As the vnode count for singletons.
pub const COUNT_FOR_COMPAT: usize = 1 << 8;
}

impl VirtualNode {
/// The total count of virtual nodes, for testing purposes.
pub const COUNT_FOR_TEST: usize = Self::COUNT;
pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
/// The maximum value of the virtual node, for testing purposes.
pub const MAX_FOR_TEST: VirtualNode = Self::MAX;
pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
}

impl VirtualNode {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub struct SessionConfig {
#[parameter(default = false)]
bypass_cluster_limits: bool,

#[parameter(default = VirtualNode::COUNT, check_hook = check_vnode_count)]
#[parameter(default = VirtualNode::COUNT_FOR_COMPAT, check_hook = check_vnode_count)]
vnode_count: usize,
}

Expand Down
14 changes: 10 additions & 4 deletions src/common/src/util/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::cmp::Ordering;
use std::time::SystemTime;

use static_assertions::const_assert;
use itertools::Itertools;

use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
use crate::hash::VirtualNode;
Expand All @@ -25,13 +25,13 @@ const VNODE_ID_SHIFT_BITS: u8 = 12;
const SEQUENCE_UPPER_BOUND: u16 = 1 << 12;
const VNODE_ID_UPPER_BOUND: u32 = 1 << 10;

const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::COUNT as u32);

/// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format:
///
/// | timestamp | vnode id | sequence |
/// |-----------|----------|----------|
/// | 41 bits | 10 bits | 12 bits |
// TODO(var-vnode): this limits vnode count to 1024, which can be insufficient for large clusters.
// Find a new representation that can support more vnodes.
#[derive(Debug)]
pub struct RowIdGenerator {
/// Specific base timestamp using for generating row ids.
Expand Down Expand Up @@ -72,10 +72,16 @@ impl RowIdGenerator {
/// Create a new `RowIdGenerator` with given virtual nodes.
pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>) -> Self {
let base = *UNIX_RISINGWAVE_DATE_EPOCH;
let vnodes = vnodes.into_iter().collect_vec();

for vnode in &vnodes {
assert!(vnode.to_index() < VNODE_ID_UPPER_BOUND as usize);
}

Self {
base,
last_timestamp_ms: base.elapsed().unwrap().as_millis() as i64,
vnodes: vnodes.into_iter().collect(),
vnodes,
vnodes_index: 0,
sequence: 0,
}
Expand Down
4 changes: 2 additions & 2 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ This page is automatically generated by `./risedev generate-example-config`
| full_gc_interval_sec | Interval of automatic hummock full GC. | 86400 |
| hummock_time_travel_snapshot_interval | The interval at which a Hummock version snapshot is taken for time travel. Larger value indicates less storage overhead but worse query performance. | 100 |
| hummock_version_checkpoint_interval_sec | Interval of hummock version checkpoint. | 30 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
| hybrid_partition_vnode_count | Count of partitions of tables in default group and materialized view group. The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment. Each partition contains aligned data of `vnode_count / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table. Set it zero to disable this feature. | 4 |
| max_heartbeat_interval_secs | Maximum allowed heartbeat interval in seconds. | 60 |
| meta_leader_lease_secs | | 30 |
| min_delta_log_num_for_hummock_version_checkpoint | The minimum delta log number a new checkpoint should compact, otherwise the checkpoint attempt is rejected. | 10 |
Expand All @@ -52,7 +52,7 @@ This page is automatically generated by `./risedev generate-example-config`
| parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 |
| parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 |
| parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 |
| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 |
| partition_vnode_count | Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically. Each partition contains aligned data of `vnode_count / partition_vnode_count` consecutive virtual-nodes of one state table. | 16 |
| periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 |
| periodic_space_reclaim_compaction_interval_sec | Schedule `space_reclaim` compaction for all compaction groups with this interval. | 3600 |
| periodic_split_compact_group_interval_sec | | 10 |
Expand Down
6 changes: 4 additions & 2 deletions src/expr/core/src/expr_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ pub fn capture_expr_context() -> ExprResult<ExprContext> {
Ok(ExprContext { time_zone })
}

/// Get the vnode count from the context, or [`VirtualNode::COUNT`] if not set.
/// Get the vnode count from the context, or [`VirtualNode::COUNT_FOR_COMPAT`] if not set.
// TODO(var-vnode): the only case where this is not set is for batch queries, is it still
// necessary to support `rw_vnode` expression in batch queries?
pub fn vnode_count() -> usize {
VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT)
VNODE_COUNT::try_with(|&x| x).unwrap_or(VirtualNode::COUNT_FOR_COMPAT)
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
Expand Down
14 changes: 10 additions & 4 deletions src/expr/impl/src/scalar/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ mod tests {
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::Row;
use risingwave_expr::expr::build_from_pretty;
use risingwave_expr::expr_context::VNODE_COUNT;

#[tokio::test]
async fn test_vnode_expr_eval() {
let vnode_count = 32;
let expr = build_from_pretty("(vnode:int2 $0:int4 $0:int8 $0:varchar)");
let input = DataChunk::from_pretty(
"i I T
Expand All @@ -79,17 +81,21 @@ mod tests {
);

// test eval
let output = expr.eval(&input).await.unwrap();
let output = VNODE_COUNT::scope(vnode_count, expr.eval(&input))
.await
.unwrap();
for vnode in output.iter() {
let vnode = vnode.unwrap().into_int16();
assert!((0..VirtualNode::COUNT as i16).contains(&vnode));
assert!((0..vnode_count as i16).contains(&vnode));
}

// test eval_row
for row in input.rows() {
let result = expr.eval_row(&row.to_owned_row()).await.unwrap();
let result = VNODE_COUNT::scope(vnode_count, expr.eval_row(&row.to_owned_row()))
.await
.unwrap();
let vnode = result.unwrap().into_int16();
assert!((0..VirtualNode::COUNT as i16).contains(&vnode));
assert!((0..vnode_count as i16).contains(&vnode));
}
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_parallelism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub async fn handle_alter_parallelism(
.sum::<u32>();
// TODO(var-vnode): get max parallelism from catalogs.
// Although the meta service will clamp the value for us, we should still check it here for better UI.
let max_parallelism = VirtualNode::COUNT;
let max_parallelism = VirtualNode::COUNT_FOR_COMPAT;

let mut builder = RwPgResponse::builder(stmt_type);

Expand Down
4 changes: 2 additions & 2 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ impl<'a> Deref for JavaBindingIterator<'a> {

#[no_mangle]
extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint {
// TODO(var-vnode): vnode count can vary for different tables.
VirtualNode::COUNT as jint
// TODO(var-vnode): vnode count can vary for different tables, use real ones.
VirtualNode::COUNT_FOR_COMPAT as jint
}

#[cfg_or_panic(not(madsim))]
Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/serving/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ impl ServingVnodeMapping {
None
};
// TODO(var-vnode): also fetch vnode count for each fragment
place_vnode(old_mapping, workers, max_parallelism, VirtualNode::COUNT)
place_vnode(
old_mapping,
workers,
max_parallelism,
VirtualNode::COUNT_FOR_COMPAT,
)
};
match new_mapping {
None => {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ impl Distribution {

/// Get the vnode count of the distribution.
///
/// For backwards compatibility, [`VirtualNode::COUNT`] is used for singleton.
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton.
pub fn vnode_count(&self) -> usize {
match self {
Distribution::Singleton(_) => VirtualNode::COUNT,
Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT,
Distribution::Hash(mapping) => mapping.len(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ where
concurrent_uploading_sst_count: Option<usize>,
) -> Self {
// TODO(var-vnode): should use value from caller
let vnode_count = VirtualNode::COUNT;
let vnode_count = VirtualNode::COUNT_FOR_COMPAT;

Self {
builder_factory,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ impl ActorContext {
mview_definition: stream_actor.mview_definition.clone(),
vnode_count: (stream_actor.vnode_bitmap.as_ref())
// An unset `vnode_bitmap` means the actor is a singleton.
// For backwards compatibility, `VirtualNode::COUNT` is used for singleton.
.map_or(VirtualNode::COUNT, |b| Bitmap::from(b).len()),
// For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton.
.map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn test_streaming_parallelism_index() -> Result<()> {

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let vnode_max = VirtualNode::COUNT_FOR_COMPAT;
let mut configuration = Configuration::for_auto_parallelism(
MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE,
true,
Expand Down Expand Up @@ -177,7 +177,7 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> {

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let vnode_max = VirtualNode::COUNT_FOR_COMPAT;
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 100;
Expand All @@ -202,7 +202,7 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> {

#[tokio::test]
async fn test_parallelism_exceed_virtual_node_max_alter_adaptive() -> Result<()> {
let vnode_max = VirtualNode::COUNT;
let vnode_max = VirtualNode::COUNT_FOR_COMPAT;
let mut configuration = Configuration::for_scale();
configuration.compute_nodes = 1;
configuration.compute_node_cores = vnode_max + 100;
Expand Down

0 comments on commit 1a294f2

Please sign in to comment.