Skip to content

Commit

Permalink
use more vnode count struct
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 21, 2024
1 parent 225d31b commit c644a1b
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VnodeCountCompat;
use crate::hash::{VnodeCount, VnodeCountCompat};
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -117,7 +117,7 @@ impl TableDesc {
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
maybe_vnode_count: Some(self.vnode_count as u32),
maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
})
}

Expand Down
16 changes: 9 additions & 7 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ pub enum VnodeCount {

impl VnodeCount {
/// Creates a `VnodeCount` set to the given value.
pub fn set(v: impl TryInto<NonZeroUsize>) -> Self {
VnodeCount::Set(
v.try_into()
.unwrap_or_else(|_| panic!("vnode count must be non-zero")),
)
pub fn set(v: impl TryInto<usize> + Copy + std::fmt::Debug) -> Self {
let v = (v.try_into().ok())
.filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v))
.unwrap_or_else(|| panic!("invalid vnode count {v:?}"));

VnodeCount::Set(NonZeroUsize::new(v).unwrap())
}

/// Converts to protobuf representation for `maybe_vnode_count`.
Expand All @@ -49,9 +50,9 @@ impl VnodeCount {
/// Converts from protobuf representation of `maybe_vnode_count`.
pub fn from_protobuf(v: Option<u32>) -> Self {
match v {
None => VnodeCount::Compat,
Some(0) => VnodeCount::Placeholder,
Some(v) => VnodeCount::set(v as usize),
None => VnodeCount::Compat,
}
}

Expand All @@ -66,7 +67,8 @@ impl VnodeCount {

/// Returns the value of the vnode count. Panics if it's a placeholder.
pub fn value(self) -> usize {
self.value_opt().expect("vnode count not set")
self.value_opt()
.expect("vnode count is a placeholder that must be filled by the meta service first")
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ mod tests {
initialized_at_cluster_version: None,
version_column_index: None,
cdc_table_id: None,
maybe_vnode_count: Some(233),
maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
}
.into();

Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::mem::swap;
use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::{VnodeCountCompat, WorkerSlotId};
use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::util::worker_util::WorkerNodeId;
use risingwave_meta_model::actor::ActorStatus;
Expand Down Expand Up @@ -496,7 +496,7 @@ impl CatalogController {
actors: pb_actors,
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
maybe_vnode_count: Some(vnode_count as _),
maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
};

Ok((pb_fragment, pb_actor_status, pb_actor_splits))
Expand Down Expand Up @@ -1497,7 +1497,7 @@ mod tests {
use std::collections::{BTreeMap, HashMap};

use itertools::Itertools;
use risingwave_common::hash::{ActorMapping, VirtualNode};
use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model::actor::ActorStatus;
Expand Down Expand Up @@ -1627,7 +1627,7 @@ mod tests {
.values()
.flat_map(|m| m.keys().map(|x| *x as _))
.collect(),
maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as _),
maybe_vnode_count: VnodeCount::set(VirtualNode::COUNT_FOR_TEST).to_protobuf(),
};

let pb_actor_status = (0..actor_count)
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::BTreeMap;

use anyhow::anyhow;
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::epoch::Epoch;
use risingwave_meta_model::{
connection, database, function, index, object, schema, secret, sink, source, subscription,
Expand Down Expand Up @@ -164,7 +165,7 @@ impl From<ObjectModel<table::Model>> for PbTable {
created_at_cluster_version: value.1.created_at_cluster_version,
retention_seconds: value.0.retention_seconds.map(|id| id as u32),
cdc_table_id: value.0.cdc_table_id,
maybe_vnode_count: Some(value.0.vnode_count as _),
maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::{
generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM,
};
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::stream_graph_visitor;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
Expand Down Expand Up @@ -1126,7 +1127,7 @@ impl CompleteStreamFragmentGraph {
actors,
state_table_ids,
upstream_fragment_ids,
maybe_vnode_count: Some(vnode_count as _),
maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
}
}

Expand Down

0 comments on commit c644a1b

Please sign in to comment.