Skip to content

Commit

Permalink
fix(streaming): fix compatibility upgrading from v2.1.0 for singleton…
Browse files Browse the repository at this point in the history
… vnode count

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Dec 11, 2024
1 parent 3bc2ccb commit b1163d2
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions src/common/src/hash/consistent_hash/vnode_count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ pub enum VnodeCount {
/// The field is a placeholder and has to be filled first before using it.
#[default]
Placeholder,
/// The table/fragment is a singleton, thus the value should always be interpreted as `1`.
Singleton,
/// The field is set to a specific value.
Set(NonZeroUsize),
/// The field is unset because the table/fragment is persisted as hash-distributed
/// in an older version.
CompatHash,
/// The field is unset because the table/fragment is persisted as singleton
/// in an older version.
CompatSingleton,
}

impl VnodeCount {
Expand All @@ -51,18 +50,26 @@ impl VnodeCount {
Self::set(VirtualNode::COUNT_FOR_TEST)
}

/// Converts from protobuf representation of `maybe_vnode_count`. If the value is not set,
/// call `compat_is_singleton` to determine whether it should be treated as a singleton
/// when it comes to backward compatibility.
fn from_protobuf(v: Option<u32>, compat_is_singleton: impl FnOnce() -> bool) -> Self {
/// Converts from protobuf representation of `maybe_vnode_count`.
///
/// The value will be ignored if `is_singleton` returns `true`.
fn from_protobuf(v: Option<u32>, is_singleton: impl FnOnce() -> bool) -> Self {
match v {
Some(0) => VnodeCount::Placeholder,
Some(v) => VnodeCount::set(v as usize),
None => {
if compat_is_singleton() {
VnodeCount::CompatSingleton
_ => {
if is_singleton() {
if let Some(v) = v
&& v != 1
{
tracing::debug!(
vnode_count = v,
"singleton has vnode count set to non-1, \
ignoring as it could be due to backward compatibility"
);
}
VnodeCount::Singleton
} else {
VnodeCount::CompatHash
v.map_or(VnodeCount::CompatHash, VnodeCount::set)
}
}
}
Expand All @@ -79,9 +86,9 @@ impl VnodeCount {
pub fn value_opt(self) -> Option<usize> {
match self {
VnodeCount::Placeholder => None,
VnodeCount::Singleton => Some(1),
VnodeCount::Set(v) => Some(v.get()),
VnodeCount::CompatHash => Some(VirtualNode::COUNT_FOR_COMPAT),
VnodeCount::CompatSingleton => Some(1),
}
}

Expand Down

0 comments on commit b1163d2

Please sign in to comment.