Skip to content

Commit

Permalink
refactor: use 1 for vnode count of singletons for backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
BugenZhao committed Oct 21, 2024
1 parent 932a1ca commit 5cb3e8f
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 25 deletions.
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ message Table {
// Use `VnodeCountCompat::vnode_count` to access it.
//
// - Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the
// corresponding job is still in `Creating` status, in which case calling `vnode_count`
// will panic.
Expand Down
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ message TableFragments {
// Duplicated from the length of the vnode bitmap in any actor of the fragment.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
Expand Down
2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ message StorageTableDesc {
// Total vnode count of the table.
//
// Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// supported, in which case a default value of 256 (or 1 for singleton) should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 12;
}
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/hash/consistent_hash/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ impl Bitmap {
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
/// 1 and the only [`SINGLETON_VNODE`] set to true.
pub fn singleton() -> &'static Self {
Self::singleton_arc()
}

/// Get the reference to a vnode bitmap for singleton actor or table, i.e., with length
/// [`VirtualNode::COUNT_FOR_COMPAT`] and only the [`SINGLETON_VNODE`] set to 1.
/// 1 and the only [`SINGLETON_VNODE`] set to true.
pub fn singleton_arc() -> &'static Arc<Self> {
static SINGLETON: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_COMPAT);
let mut builder = BitmapBuilder::zeroed(1);
builder.set(SINGLETON_VNODE.to_index(), true);
builder.finish().into()
});
Expand Down
58 changes: 49 additions & 9 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,23 @@ impl VnodeCount {
}

/// A trait for accessing the vnode count field with backward compatibility.
///
/// # `maybe_`?
///
/// The reason why there's a `maybe_` prefix on the protobuf field is that, a getter
/// method with the same name as the field will be generated for `prost` structs.
/// Directly naming it `vnode_count` will lead to the method `vnode_count()` returning
/// `0` when the field is unset, which can be misleading sometimes.
///
/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count`
/// through this trait, ensuring that backward compatibility is handled properly.
pub trait VnodeCountCompat {
/// Get the `maybe_vnode_count` field.
fn vnode_count_inner(&self) -> VnodeCount;

/// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set,
/// typically for backward compatibility. Panics if the field is a placeholder.
///
/// Equivalent to `self.vnode_count_inner().value()`.
/// Returns the vnode count if it's set. Otherwise, returns [`VirtualNode::COUNT_FOR_COMPAT`]
/// for distributed tables/fragments, and `1` for singleton tables/fragments, for backward
/// compatibility. Panics if the field is a placeholder.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize {
Expand Down Expand Up @@ -116,8 +125,39 @@ macro_rules! impl_maybe_vnode_count_compat {
};
}

impl_maybe_vnode_count_compat!(
risingwave_pb::plan_common::StorageTableDesc,
risingwave_pb::catalog::Table,
risingwave_pb::meta::table_fragments::Fragment,
);
// TODO!!!!!!!!!!!!!!!!!

// impl VnodeCountCompat for risingwave_pb::catalog::Table {
// fn vnode_count(&self) -> usize {
// if let Some(vnode_count) = self.maybe_vnode_count {
// return vnode_count as _;
// }

// // Compatibility: derive vnode count from distribution.
// if self.distribution_key.is_empty()
// && self.dist_key_in_pk.is_empty()
// && self.vnode_col_index.is_none()
// {
// // Singleton table.
// 1
// } else {
// VirtualNode::COUNT_FOR_COMPAT
// }
// }
// }

// impl VnodeCountCompat for risingwave_pb::plan_common::StorageTableDesc {
// fn vnode_count(&self) -> usize {
// if let Some(vnode_count) = self.maybe_vnode_count {
// return vnode_count as _;
// }

// // Compatibility: derive vnode count from distribution.
// if self.dist_key_in_pk_indices.is_empty() && self.vnode_col_idx_in_pk.is_none() {
// // Singleton table.
// 1
// } else {
// VirtualNode::COUNT_FOR_COMPAT
// }
// }
// }
7 changes: 4 additions & 3 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}
}

/// Create a vnode mapping with the single item. Should only be used for singletons.
/// Create a vnode mapping with the single item and length of 1.
///
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used as the vnode count.
/// Should only be used for singletons. If you want a different vnode count, call
/// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length.
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT_FOR_COMPAT)
Self::new_uniform(std::iter::once(item), 1)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
15 changes: 15 additions & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ impl TableCatalogBuilder {
Some(w) => w,
None => FixedBitSet::with_capacity(self.columns.len()),
};

// If `dist_key_in_pk` is set, check if it matches with `distribution_key`.
// Note that we cannot derive in the opposite direction, because there can be a column
// appearing multiple times in the PK.
if let Some(dist_key_in_pk) = &self.dist_key_in_pk {
let derived_dist_key = dist_key_in_pk
.iter()
.map(|idx| self.pk[*idx].column_index)
.collect_vec();
assert_eq!(
derived_dist_key, distribution_key,
"dist_key mismatch with dist_key_in_pk"
);
}

TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ impl MigrationTrait for Migration {
)
.await?;

// Fill vnode count with 1 for singleton tables.
manager
.exec_stmt(
UpdateStatement::new()
.table(Table::Table)
.values([(Table::VnodeCount, Expr::value(1))])
.and_where(Expr::col(Table::DistributionKey).eq(Expr::value("[]")))
.and_where(Expr::col(Table::DistKeyInPk).eq(Expr::value("[]")))
.and_where(Expr::col(Table::VnodeColIndex).is_null())
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
Expand All @@ -30,6 +43,17 @@ impl MigrationTrait for Migration {
)
.await?;

// Fill vnode count with 1 for singleton fragments.
manager
.exec_stmt(
UpdateStatement::new()
.table(Fragment::Table)
.values([(Fragment::VnodeCount, Expr::value(1))])
.and_where(Expr::col(Fragment::DistributionType).eq(Expr::value("SINGLE")))
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
Expand Down Expand Up @@ -74,12 +98,16 @@ impl MigrationTrait for Migration {
enum Fragment {
Table,
VnodeCount,
DistributionType,
}

#[derive(DeriveIden)]
enum Table {
Table,
VnodeCount,
DistributionKey,
DistKeyInPk,
VnodeColIndex,
}

#[derive(DeriveIden)]
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use either::Either;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{ActorMapping, VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::{bail, hash};
use risingwave_meta_model::WorkerId;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand Down Expand Up @@ -152,11 +152,9 @@ impl Distribution {
}

/// Get the vnode count of the distribution.
///
/// For backwards compatibility, [`VirtualNode::COUNT_FOR_COMPAT`] is used for singleton.
pub fn vnode_count(&self) -> usize {
match self {
Distribution::Singleton(_) => VirtualNode::COUNT_FOR_COMPAT,
Distribution::Singleton(_) => 1, // only `SINGLETON_VNODE`
Distribution::Hash(mapping) => mapping.len(),
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ impl ActorContext {
fragment_id: stream_actor.fragment_id,
mview_definition: stream_actor.mview_definition.clone(),
vnode_count: (stream_actor.vnode_bitmap.as_ref())
// An unset `vnode_bitmap` means the actor is a singleton.
// For backwards compatibility, `VirtualNode::COUNT_FOR_COMPAT` is used for singleton.
.map_or(VirtualNode::COUNT_FOR_COMPAT, |b| Bitmap::from(b).len()),
// An unset `vnode_bitmap` means the actor is a singleton,
// where only `SINGLETON_VNODE` is set.
.map_or(1, |b| Bitmap::from(b).len()),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
total_mem_val,
Expand Down

0 comments on commit 5cb3e8f

Please sign in to comment.