Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: distinguish between placeholder and compat for vnode count (#18976) #19047

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,13 @@ message Table {

// Total vnode count of the table.
//
// Can be unset if...
// - The catalog is generated by the frontend and not yet persisted, this is
// because the vnode count of each fragment (then internal tables) is determined
// by the meta service.
// - The table is created in older versions where variable vnode count is not
// Use `VnodeCountCompat::vnode_count` to access it.
//
// - Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
// - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the
// corresponding job is still in `Creating` status, in which case calling `vnode_count`
// will panic.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VnodeCountCompat;
use crate::hash::{VnodeCount, VnodeCountCompat};
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -117,7 +117,7 @@ impl TableDesc {
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
maybe_vnode_count: Some(self.vnode_count as u32),
maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(),
})
}

Expand Down
81 changes: 76 additions & 5 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,87 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroUsize;

use super::vnode::VirtualNode;

/// The different cases of `maybe_vnode_count` field in the protobuf message.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum VnodeCount {
/// The field is a placeholder and has to be filled first before using it.
#[default]
Placeholder,
/// The field is set to a specific value.
Set(NonZeroUsize),
/// The field is unset because it's persisted in an older version.
Compat,
}

impl VnodeCount {
/// Creates a `VnodeCount` set to the given value.
pub fn set(v: impl TryInto<usize> + Copy + std::fmt::Debug) -> Self {
let v = (v.try_into().ok())
.filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v))
.unwrap_or_else(|| panic!("invalid vnode count {v:?}"));

VnodeCount::Set(NonZeroUsize::new(v).unwrap())
}

/// Creates a `VnodeCount` set to the value for testing.
///
/// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`.
pub fn for_test() -> Self {
Self::set(VirtualNode::COUNT_FOR_TEST)
}

/// Converts to protobuf representation for `maybe_vnode_count`.
pub fn to_protobuf(self) -> Option<u32> {
match self {
VnodeCount::Placeholder => Some(0),
VnodeCount::Set(v) => Some(v.get() as _),
VnodeCount::Compat => None,
}
}

/// Converts from protobuf representation of `maybe_vnode_count`.
pub fn from_protobuf(v: Option<u32>) -> Self {
match v {
Some(0) => VnodeCount::Placeholder,
Some(v) => VnodeCount::set(v as usize),
None => VnodeCount::Compat,
}
}

/// Returns the value of the vnode count, or `None` if it's a placeholder.
pub fn value_opt(self) -> Option<usize> {
match self {
VnodeCount::Placeholder => None,
VnodeCount::Set(v) => Some(v.get()),
VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT),
}
}

/// Returns the value of the vnode count. Panics if it's a placeholder.
pub fn value(self) -> usize {
self.value_opt()
.expect("vnode count is a placeholder that must be filled by the meta service first")
}
}

/// A trait for accessing the vnode count field with backward compatibility.
pub trait VnodeCountCompat {
/// Get the `maybe_vnode_count` field.
fn vnode_count_inner(&self) -> VnodeCount;

/// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set,
/// typically for backward compatibility.
/// typically for backward compatibility. Panics if the field is a placeholder.
///
/// Equivalent to `self.vnode_count_inner().value()`.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize;
fn vnode_count(&self) -> usize {
self.vnode_count_inner().value()
}
}

/// Implement the trait for given types by delegating to the `maybe_vnode_count` field.
Expand All @@ -36,9 +108,8 @@ macro_rules! impl_maybe_vnode_count_compat {
($($ty:ty),* $(,)?) => {
$(
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _)
fn vnode_count_inner(&self) -> VnodeCount {
VnodeCount::from_protobuf(self.maybe_vnode_count)
}
}
)*
Expand Down
36 changes: 18 additions & 18 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::{HashMap, HashSet};

use fixedbitset::FixedBitSet;
Expand All @@ -20,7 +21,7 @@ use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc,
TableId, TableVersionId,
};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::hash::{VnodeCount, VnodeCountCompat};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
Expand Down Expand Up @@ -174,19 +175,13 @@ pub struct TableCatalog {

/// Total vnode count of the table.
///
/// Can be unset if the catalog is generated by the frontend and not yet persisted. This is
/// because the vnode count of each fragment (then internal tables) is determined by the
/// meta service. See also [`StreamMaterialize::derive_table_catalog`] and
/// [`TableCatalogBuilder::build`].
///
/// On the contrary, if this comes from a [`PbTable`], the field must be `Some` no matter
/// whether the table is created before or after the version we introduced variable vnode
/// count support. This is because we've already handled backward compatibility during
/// conversion.
/// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the
/// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`]
/// will panic.
///
/// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog
/// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build
pub vnode_count: Option<usize>,
pub vnode_count: VnodeCount,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -408,8 +403,7 @@ impl TableCatalog {
///
/// Panics if it's called on an incomplete (and not yet persisted) table catalog.
pub fn vnode_count(&self) -> usize {
self.vnode_count
.expect("vnode count unset, called on an incomplete table catalog?")
self.vnode_count.value()
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbTable {
Expand Down Expand Up @@ -457,7 +451,7 @@ impl TableCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
retention_seconds: self.retention_seconds,
cdc_table_id: self.cdc_table_id.clone(),
maybe_vnode_count: self.vnode_count.map(|v| v as _),
maybe_vnode_count: self.vnode_count.to_protobuf(),
}
}

Expand Down Expand Up @@ -563,7 +557,13 @@ impl From<PbTable> for TableCatalog {
OptionalAssociatedSourceId::AssociatedSourceId(id) => id,
});
let name = tb.name.clone();
let vnode_count = tb.vnode_count();

let vnode_count = tb.vnode_count_inner();
if let VnodeCount::Placeholder = vnode_count {
// Only allow placeholder vnode count for creating tables.
// After the table is created, an `Update` notification will be used to update the vnode count field.
assert_matches!(stream_job_status, PbStreamJobStatus::Creating);
}

let mut col_names = HashSet::new();
let mut col_index: HashMap<i32, usize> = HashMap::new();
Expand Down Expand Up @@ -634,7 +634,7 @@ impl From<PbTable> for TableCatalog {
.map(TableId::from)
.collect_vec(),
cdc_table_id: tb.cdc_table_id,
vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */
vnode_count,
}
}
}
Expand Down Expand Up @@ -725,7 +725,7 @@ mod tests {
initialized_at_cluster_version: None,
version_column_index: None,
cdc_table_id: None,
maybe_vnode_count: Some(233),
maybe_vnode_count: VnodeCount::set(233).to_protobuf(),
}
.into();

Expand Down Expand Up @@ -789,7 +789,7 @@ mod tests {
dependent_relations: vec![],
version_column_index: None,
cdc_table_id: None,
vnode_count: Some(233),
vnode_count: VnodeCount::set(233),
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use anyhow::{anyhow, Context};
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::hash::VnodeCount;
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::{bail, bail_not_implemented};
Expand Down Expand Up @@ -244,7 +245,7 @@ pub async fn get_replace_table_plan(
// Set some fields ourselves so that the meta service does not need to maintain them.
let mut table = table;
table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
table.maybe_vnode_count = Some(original_catalog.vnode_count() as _);
table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf();

Ok((source, table, graph, col_index_mapping, job_type))
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{
ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER,
};
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
Expand Down Expand Up @@ -283,7 +284,7 @@ impl StreamMaterialize {
created_at_cluster_version: None,
retention_seconds: retention_seconds.map(|i| i.into()),
cdc_table_id: None,
vnode_count: None, // will be filled in by the meta service later
vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
})
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_common::catalog::{
use risingwave_common::constants::log_store::v2::{
KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX,
};
use risingwave_common::hash::VnodeCount;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use crate::catalog::table_catalog::TableType;
Expand Down Expand Up @@ -179,7 +180,7 @@ impl TableCatalogBuilder {
created_at_cluster_version: None,
retention_seconds: None,
cdc_table_id: None,
vnode_count: None, // will be filled in by the meta service later
vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub(crate) mod tests {
ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus,
DEFAULT_SUPER_USER_ID,
};
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{VirtualNode, VnodeCount, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::types::DataType;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
Expand Down Expand Up @@ -589,7 +589,7 @@ pub(crate) mod tests {
initialized_at_cluster_version: None,
created_at_cluster_version: None,
cdc_table_id: None,
vnode_count: Some(vnode_count),
vnode_count: VnodeCount::set(vnode_count),
};
let batch_plan_node: PlanRef = LogicalScan::create(
"".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::catalog::{
FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER,
DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME,
};
use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat};
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::cluster_limit::ClusterLimit;
Expand Down Expand Up @@ -281,6 +282,7 @@ impl CatalogWriter for MockCatalogWriter {
) -> Result<()> {
table.id = self.gen_id();
table.stream_job_status = PbStreamJobStatus::Created as _;
table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
self.catalog.write().create_table(&table);
self.add_table_or_source_id(table.id, table.schema_id, table.database_id);
self.hummock_snapshot_manager
Expand Down Expand Up @@ -320,6 +322,7 @@ impl CatalogWriter for MockCatalogWriter {
_job_type: TableJobType,
) -> Result<()> {
table.stream_job_status = PbStreamJobStatus::Created as _;
assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST);
self.catalog.write().update_table(&table);
Ok(())
}
Expand Down Expand Up @@ -353,6 +356,7 @@ impl CatalogWriter for MockCatalogWriter {
) -> Result<()> {
index_table.id = self.gen_id();
index_table.stream_job_status = PbStreamJobStatus::Created as _;
index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf();
self.catalog.write().create_table(&index_table);
self.add_table_or_index_id(
index_table.id,
Expand Down
12 changes: 10 additions & 2 deletions src/meta/model_v2/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,15 @@ impl From<PbTable> for ActiveModel {
fn from(pb_table: PbTable) -> Self {
let table_type = pb_table.table_type();
let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
let vnode_count = pb_table.vnode_count();

// `PbTable` here should be sourced from the wire, not from persistence.
// A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
// the compatibility code.
let vnode_count = pb_table
.vnode_count_inner()
.value_opt()
.map(|v| v as _)
.map_or(NotSet, Set);

let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
NotSet
Expand Down Expand Up @@ -258,7 +266,7 @@ impl From<PbTable> for ActiveModel {
retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
incoming_sinks: Set(pb_table.incoming_sinks.into()),
cdc_table_id: Set(pb_table.cdc_table_id),
vnode_count: Set(vnode_count as _),
vnode_count,
}
}
}
Loading
Loading