Skip to content

Commit

Permalink
refactor: distinguish between placeholder and compat for vnode count (#…
Browse files Browse the repository at this point in the history
…18976)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 21, 2024
1 parent 3e7fffd commit ff61c88
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 44 deletions.
12 changes: 6 additions & 6 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,13 @@ message Table {

// Total vnode count of the table.
//
// Can be unset if...
// - The catalog is generated by the frontend and not yet persisted, this is
// because the vnode count of each fragment (then internal tables) is determined
// by the meta service.
// - The table is created in older versions where variable vnode count is not
// Use `VnodeCountCompat::vnode_count` to access it.
//
// - Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
// - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the
// corresponding job is still in `Creating` status, in which case calling `vnode_count`
// will panic.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VnodeCountCompat;
use crate::hash::{VnodeCount, VnodeCountCompat};
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -117,7 +117,7 @@ impl TableDesc {
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
maybe_vnode_count: Some(self.vnode_count as u32),
maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
})
}

Expand Down
81 changes: 76 additions & 5 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,87 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroUsize;

use super::vnode::VirtualNode;

/// The different cases of `maybe_vnode_count` field in the protobuf message.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum VnodeCount {
/// The field is a placeholder and has to be filled first before using it.
#[default]
Placeholder,
/// The field is set to a specific value.
Set(NonZeroUsize),
/// The field is unset because it's persisted in an older version.
Compat,
}

impl VnodeCount {
/// Creates a `VnodeCount` set to the given value.
pub fn set(v: impl TryInto<usize> + Copy + std::fmt::Debug) -> Self {
let v = (v.try_into().ok())
.filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v))
.unwrap_or_else(|| panic!("invalid vnode count {v:?}"));

VnodeCount::Set(NonZeroUsize::new(v).unwrap())
}

/// Creates a `VnodeCount` set to the value for testing.
///
/// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`.
pub fn for_test() -> Self {
Self::set(VirtualNode::COUNT_FOR_TEST)
}

/// Converts to protobuf representation for `maybe_vnode_count`.
pub fn to_protobuf(self) -> Option<u32> {
match self {
VnodeCount::Placeholder => Some(0),
VnodeCount::Set(v) => Some(v.get() as _),
VnodeCount::Compat => None,
}
}

/// Converts from protobuf representation of `maybe_vnode_count`.
pub fn from_protobuf(v: Option<u32>) -> Self {
match v {
Some(0) => VnodeCount::Placeholder,
Some(v) => VnodeCount::set(v as usize),
None => VnodeCount::Compat,
}
}

/// Returns the value of the vnode count, or `None` if it's a placeholder.
pub fn value_opt(self) -> Option<usize> {
match self {
VnodeCount::Placeholder => None,
VnodeCount::Set(v) => Some(v.get()),
VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT),
}
}

/// Returns the value of the vnode count. Panics if it's a placeholder.
pub fn value(self) -> usize {
self.value_opt()
.expect("vnode count is a placeholder that must be filled by the meta service first")
}
}

/// A trait for accessing the vnode count field with backward compatibility.
pub trait VnodeCountCompat {
/// Get the `maybe_vnode_count` field.
fn vnode_count_inner(&self) -> VnodeCount;

/// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set,
/// typically for backward compatibility.
/// typically for backward compatibility. Panics if the field is a placeholder.
///
/// Equivalent to `self.vnode_count_inner().value()`.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize;
fn vnode_count(&self) -> usize {
self.vnode_count_inner().value()
}
}

/// Implement the trait for given types by delegating to the `maybe_vnode_count` field.
Expand All @@ -36,9 +108,8 @@ macro_rules! impl_maybe_vnode_count_compat {
($($ty:ty),* $(,)?) => {
$(
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _)
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count)
}
}
)*
Expand Down
36 changes: 18 additions & 18 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::{HashMap, HashSet};

use fixedbitset::FixedBitSet;
Expand All @@ -20,7 +21,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc,
TableId, TableVersionId,
};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
Expand Down Expand Up @@ -174,19 +175,13 @@ pub struct TableCatalog {

/// Total vnode count of the table.
///
/// Can be unset if the catalog is generated by the frontend and not yet persisted. This is
/// because the vnode count of each fragment (then internal tables) is determined by the
/// meta service. See also [`StreamMaterialize::derive_table_catalog`] and
/// [`TableCatalogBuilder::build`].
///
/// On the contrary, if this comes from a [`PbTable`], the field must be `Some` no matter
/// whether the table is created before or after the version we introduced variable vnode
/// count support. This is because we've already handled backward compatibility during
/// conversion.
/// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
/// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
/// will panic.
///
/// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
/// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
pub vnode_count: Option<usize>,
pub vnode_count: VnodeCount,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -408,8 +403,7 @@ impl TableCatalog {
///
/// Panics if it's called on an incomplete (and not yet persisted) table catalog.
pub fn vnode_count(&self) -> usize {
self.vnode_count
.expect("vnode count unset, called on an incomplete table catalog?")
self.vnode_count.value()
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbTable {
Expand Down Expand Up @@ -457,7 +451,7 @@ impl TableCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
retention_seconds: self.retention_seconds,
cdc_table_id: self.cdc_table_id.clone(),
maybe_vnode_count: self.vnode_count.map(|v| v as _),
maybe_vnode_count: self.vnode_count.to_protobuf(),
}
}

Expand Down Expand Up @@ -563,7 +557,13 @@ impl From<PbTable> for TableCatalog {
OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
});
let name = tb.name.clone();
let vnode_count = tb.vnode_count();

let vnode_count = tb.vnode_count_inner();
if let VnodeCount::Placeholder = vnode_count {
// Only allow placeholder vnode count for creating tables.
// After the table is created, an `Update` notification will be used to update the vnode count field.
assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
}

let mut col_names = HashSet::new();
let mut col_index: HashMap<i32, usize> = HashMap::new();
Expand Down Expand Up @@ -634,7 +634,7 @@ impl From<PbTable> for TableCatalog {
.map(TableId::from)
.collect_vec(),
cdc_table_id: tb.cdc_table_id,
vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */
vnode_count,
}
}
}
Expand Down Expand Up @@ -725,7 +725,7 @@ mod tests {
initialized_at_cluster_version: None,
version_column_index: None,
cdc_table_id: None,
maybe_vnode_count: Some(233),
maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
}
.into();

Expand Down Expand Up @@ -789,7 +789,7 @@ mod tests {
dependent_relations: vec![],
version_column_index: None,
cdc_table_id: None,
vnode_count: Some(233),
vnode_count: VnodeCount::set(233),
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::{anyhow, Context};
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -244,7 +245,7 @@ pub async fn get_replace_table_plan(
// Set some fields ourselves so that the meta service does not need to maintain them.
let mut table = table;
table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
table.maybe_vnode_count = Some(original_catalog.vnode_count() as _);
table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf();

Ok((source, table, graph, col_index_mapping, job_type))
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
Expand Down Expand Up @@ -283,7 +284,7 @@ impl StreamMaterialize {
created_at_cluster_version: None,
retention_seconds: retention_seconds.map(|i| i.into()),
cdc_table_id: None,
vnode_count: None, // will be filled in by the meta service later
vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
})
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::catalog::{
use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
};
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::TableType;
Expand Down Expand Up @@ -179,7 +180,7 @@ impl TableCatalogBuilder {
created_at_cluster_version: None,
retention_seconds: None,
cdc_table_id: None,
vnode_count: None, // will be filled in by the meta service later
vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub(crate) mod tests {
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus,
DEFAULT_SUPER_USER_ID,
};
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{VirtualNode, VnodeCount, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::types::DataType;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
Expand Down Expand Up @@ -589,7 +589,7 @@ pub(crate) mod tests {
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
vnode_count: Some(vnode_count),
vnode_count: VnodeCount::set(vnode_count),
};
let batch_plan_node: PlanRef = LogicalScan::create(
"".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::catalog::{
FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME,
};
use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::cluster_limit::ClusterLimit;
Expand Down Expand Up @@ -281,6 +282,7 @@ impl CatalogWriter for MockCatalogWriter {
) -> Result<()> {
table.id = self.gen_id();
table.stream_job_status = PbStreamJobStatus::Created as _;
table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
self.catalog.write().create_table(&table);
self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
self.hummock_snapshot_manager
Expand Down Expand Up @@ -320,6 +322,7 @@ impl CatalogWriter for MockCatalogWriter {
_job_type: TableJobType,
) -> Result<()> {
table.stream_job_status = PbStreamJobStatus::Created as _;
assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
self.catalog.write().update_table(&table);
Ok(())
}
Expand Down Expand Up @@ -353,6 +356,7 @@ impl CatalogWriter for MockCatalogWriter {
) -> Result<()> {
index_table.id = self.gen_id();
index_table.stream_job_status = PbStreamJobStatus::Created as _;
index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
self.catalog.write().create_table(&index_table);
self.add_table_or_index_id(
index_table.id,
Expand Down
12 changes: 10 additions & 2 deletions src/meta/model/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,15 @@ impl From<PbTable> for ActiveModel {
fn from(pb_table: PbTable) -> Self {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
let vnode_count = pb_table.vnode_count();

// `PbTable` here should be sourced from the wire, not from persistence.
// A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
// the compatibility code.
let vnode_count = pb_table
.vnode_count_inner()
.value_opt()
.map(|v| v as _)
.map_or(NotSet, Set);

let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
NotSet
Expand Down Expand Up @@ -258,7 +266,7 @@ impl From<PbTable> for ActiveModel {
retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
incoming_sinks: Set(pb_table.incoming_sinks.into()),
cdc_table_id: Set(pb_table.cdc_table_id),
vnode_count: Set(vnode_count as _),
vnode_count,
}
}
}
Loading

0 comments on commit ff61c88

Please sign in to comment.