Skip to content

Commit

Permalink
refactor: use 1 for vnode count of singletons for backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Oct 8, 2024
1 parent 23a1d96 commit 778ca89
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 45 deletions.
4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ message Table {
// because the vnode count of each fragment (then internal tables) is determined
// by the meta service.
// - The table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
// supported, in which case a default value of 256 (or 1 for singleton) should
// be used. Use `VnodeCountCompat::vnode_count` to access it.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
Expand Down
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ message TableFragments {
// Duplicated from the length of the vnode bitmap in any actor of the fragment.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ message StorageTableDesc {
// Total vnode count of the table.
//
// Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 12;
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ impl Bitmap {
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
/// 1 and the only [`SINGLETON_VNODE`] set to true.
pub fn singleton() -> &'static Self {
Self::singleton_arc()
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
/// 1 and the only [`SINGLETON_VNODE`] set to true.
pub fn singleton_arc() -> &'static Arc<Self> {
static SINGLETON: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT);
let mut builder = BitmapBuilder::zeroed(1);
builder.set(SINGLETON_VNODE.to_index(), true);
builder.finish().into()
});
Expand Down
101 changes: 74 additions & 27 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,85 @@
use super::vnode::VirtualNode;

/// A trait for accessing the vnode count field with backward compatibility.
///
/// # `maybe_`?
///
/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter
/// method with the same name as the field will be generated for `prost` structs.
/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning
/// `0` when the field is unset, which can be misleading sometimes.
///
/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count`
/// through this trait, ensuring that backward compatibility is handled properly.
pub trait VnodeCountCompat {
/// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set,
/// typically for backward compatibility.
/// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`]
/// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward
/// compatibility.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize;
}

/// Implement the trait for given types by delegating to the `maybe_vnode_count` field.
///
/// The reason why there's a `maybe_` prefix is that, a getter method with the same name
/// as the field will be generated for `prost` structs. Directly naming it `vnode_count`
/// will lead to the method `vnode_count()` returning `0` when the field is unset, which
/// can be misleading sometimes.
///
/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count`
/// through this trait, ensuring that backward compatibility is handled properly.
macro_rules! impl_maybe_vnode_count_compat {
($($ty:ty),* $(,)?) => {
$(
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _)
}
}
)*
};
impl VnodeCountCompat for risingwave_pb::meta::table_fragments::Fragment {
fn vnode_count(&self) -> usize {
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;

if let Some(vnode_count) = self.maybe_vnode_count {
return vnode_count as _;
}

// Compatibility: derive vnode count from distribution.
match self.distribution_type() {
FragmentDistributionType::Unspecified => unreachable!(),
FragmentDistributionType::Single => 1,
FragmentDistributionType::Hash => VirtualNode::COUNT_FOR_COMPAT,
}
}
}

impl VnodeCountCompat for risingwave_pb::catalog::Table {
fn vnode_count(&self) -> usize {
if let Some(vnode_count) = self.maybe_vnode_count {
return vnode_count as _;
}

// Compatibility: derive vnode count from distribution.
if self.distribution_key.is_empty() {
// Singleton table.
assert!(
self.dist_key_in_pk.is_empty(),
"empty dist key, while dist key in pk is set: {:?}",
self.dist_key_in_pk
);
assert!(
self.vnode_col_index.is_none(),
"empty dist key, while vnode col index is set: {:?}",
self.vnode_col_index
);
1
} else {
VirtualNode::COUNT_FOR_COMPAT
}
}
}

impl_maybe_vnode_count_compat!(
risingwave_pb::plan_common::StorageTableDesc,
risingwave_pb::catalog::Table,
risingwave_pb::meta::table_fragments::Fragment,
);
impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc {
fn vnode_count(&self) -> usize {
if let Some(vnode_count) = self.maybe_vnode_count {
return vnode_count as _;
}

// Compatibility: derive vnode count from distribution.
if self.dist_key_in_pk_indices.is_empty() {
// Singleton table.
assert!(
self.vnode_col_idx_in_pk.is_none(),
"empty dist key in pk indices, while vnode col index is set: {:?}",
self.vnode_col_idx_in_pk
);
1
} else {
VirtualNode::COUNT_FOR_COMPAT
}
}
}
6 changes: 2 additions & 4 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,9 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}
}

/// Create a vnode mapping with the single item. Should only be used for singletons.
///
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count.
/// Create a vnode mapping with the single item and length of 1. Only for singletons.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT)
Self::new_uniform(std::iter::once(item), 1)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::{bail, hash};
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::meta::table_fragments::fragment::{
Expand Down Expand Up @@ -152,11 +152,9 @@ impl Distribution {
}

/// Get the vnode count of the distribution.
///
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton.
pub fn vnode_count(&self) -> usize {
match self {
Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT,
Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE`
Distribution::Hash(mapping) => mapping.len(),
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ impl ActorContext {
fragment_id: stream_actor.fragment_id,
mview_definition: stream_actor.mview_definition.clone(),
vnode_count: (stream_actor.vnode_bitmap.as_ref())
// An unset `vnode_bitmap` means the actor is a singleton.
// For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton.
.map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()),
// An unset `vnode_bitmap` means the actor is a singleton,
// where only `SINGLETON_VNODE` is set.
.map_or(1, |b| Bitmap::from(b).len()),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val,
Expand Down

0 comments on commit 778ca89

Please sign in to comment.