diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index b008da4efe4b0..b55c3349b02c1 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1232,7 +1232,11 @@ fn derive_partitions( // The vnode count mismatch occurs only in special cases where a hash-distributed fragment // contains singleton internal tables. e.g., the state table of `Source` executors. // In this case, we reduce the vnode mapping to a single vnode as only `SINGLETON_VNODE` is used. - assert_eq!(table_desc.vnode_count, 1); + assert!( + table_desc.vnode_count == 1, + "fragment vnode count {} does not match table vnode count {}", + vnode_mapping.len(), table_desc.vnode_count, + ); &WorkerSlotMapping::new_single(vnode_mapping.iter().next().unwrap()) } else { vnode_mapping diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 516adbf8b6c90..2514b91f11cc2 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; use itertools::Itertools; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::{bail, current_cluster_version}; @@ -41,8 +42,7 @@ use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; use risingwave_pb::meta::{ - PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation, - RelationGroup, + PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, Relation, RelationGroup, }; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -69,7 +69,7 @@ use crate::controller::utils::{ }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, StreamingJob}; -use crate::model::{StreamContext, TableParallelism}; +use crate::model::{StreamContext, TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -395,14 +395,18 @@ impl CatalogController { Ok(table_id_map) } + // TODO: In this function, we also update the `Table` model in the meta store. + // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider + // making them the source of truth and performing a full replacement for those in the meta store? pub async fn prepare_streaming_job( &self, - table_fragment: PbTableFragments, + table_fragments: &TableFragments, streaming_job: &StreamingJob, for_replace: bool, ) -> MetaResult<()> { let fragment_actors = - Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?; + Self::extract_fragment_and_actors_from_table_fragments(table_fragments.to_protobuf())?; + let all_tables = table_fragments.all_tables(); let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -414,7 +418,6 @@ impl CatalogController { for fragment in fragments { let fragment_id = fragment.fragment_id; let state_table_ids = fragment.state_table_ids.inner_ref().clone(); - let vnode_count = fragment.vnode_count; let fragment = fragment.into_active_model(); Fragment::insert(fragment).exec(&txn).await?; @@ -423,10 +426,19 @@ impl CatalogController { // After table fragments are created, update them for all internal tables. if !for_replace { for state_table_id in state_table_ids { + // Table's vnode count is not always the fragment's vnode count, so we have to + // look up the table from `TableFragments`. + // See `ActorGraphBuilder::new`. + let table = all_tables + .get(&(state_table_id as u32)) + .unwrap_or_else(|| panic!("table {} not found", state_table_id)); + assert_eq!(table.fragment_id, fragment_id as u32); + let vnode_count = table.vnode_count(); + table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), - vnode_count: Set(vnode_count), + vnode_count: Set(vnode_count as _), ..Default::default() } .update(&txn) @@ -1024,25 +1036,24 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); let table = table.update(txn).await?; - // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // Fields including `fragment_id` were placeholder values before. // After table fragments are created, update them for all internal tables. - let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find() + let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, fragment::Column::StateTableIds, - fragment::Column::VnodeCount, ]) .filter(fragment::Column::JobId.eq(dummy_id)) .into_tuple() .all(txn) .await?; - for (fragment_id, state_table_ids, vnode_count) in fragment_info { + for (fragment_id, state_table_ids) in fragment_info { for state_table_id in state_table_ids.into_inner() { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), - vnode_count: Set(vnode_count), + // No need to update `vnode_count` because it must remain the same. ..Default::default() } .update(txn) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 2ad917eb8e050..97376cde2ffa7 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -571,10 +571,21 @@ impl TableFragments { /// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`], /// the table catalogs returned here are complete, with all fields filled. pub fn internal_tables(&self) -> BTreeMap { + self.collect_tables_inner(true) + } + + /// `internal_tables()` with additional table in `Materialize` node. + pub fn all_tables(&self) -> BTreeMap { + self.collect_tables_inner(false) + } + + fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap { let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { - stream_graph_visitor::visit_stream_node_internal_tables( + stream_graph_visitor::visit_stream_node_tables_inner( &mut fragment.actors[0].nodes.clone().unwrap(), + internal_tables_only, + true, |table, _| { let table_id = table.id; tables diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index baffdbd72fba9..4289e864c4884 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1104,7 +1104,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false) + .prepare_streaming_job(&table_fragments, streaming_job, false) .await?; // create streaming jobs. @@ -1237,7 +1237,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; self.stream_manager @@ -1435,7 +1435,7 @@ impl DdlController { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; self.stream_manager diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 3446a2661d962..d7e1b0b1b6df7 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -20,7 +20,7 @@ use assert_matches::assert_matches; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorId, ActorMapping, WorkerSlotId}; +use risingwave_common::hash::{ActorId, ActorMapping, IsSingleton, VnodeCount, WorkerSlotId}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor::visit_tables; use risingwave_meta_model::WorkerId; @@ -683,9 +683,22 @@ impl ActorGraphBuilder { // Fill the vnode count for each internal table, based on schedule result. let mut fragment_graph = fragment_graph; for (id, fragment) in fragment_graph.building_fragments_mut() { - let vnode_count = distributions[id].vnode_count(); + let fragment_vnode_count = distributions[id].vnode_count(); visit_tables(fragment, |table, _| { - table.maybe_vnode_count = Some(vnode_count as _); + // There are special cases where a hash-distributed fragment contains singleton + // internal tables, e.g., the state table of `Source` executors. + let vnode_count = if table.is_singleton() { + if fragment_vnode_count > 1 { + tracing::info!( + table.name, + "found singleton table in hash-distributed fragment" + ); + } + 1 + } else { + fragment_vnode_count + }; + table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(); }) } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 21a642f1be1c7..7bf4058c647cf 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -350,7 +350,7 @@ impl GlobalStreamManager { if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.metadata_manager .catalog_controller - .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .prepare_streaming_job(&table_fragments, &streaming_job, true) .await?; let dummy_table_id = table_fragments.table_id();