Skip to content

Commit

Permalink
set vnode count to 1 for singleton tables inside distributed fragments
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 22, 2024
1 parent 72a8c8c commit 62d1d31
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 21 deletions.
6 changes: 5 additions & 1 deletion src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,7 +1232,11 @@ fn derive_partitions(
// The vnode count mismatch occurs only in special cases where a hash-distributed fragment
// contains singleton internal tables. e.g., the state table of `Source` executors.
// In this case, we reduce the vnode mapping to a single vnode as only `SINGLETON_VNODE` is used.
assert_eq!(table_desc.vnode_count, 1);
assert!(
table_desc.vnode_count == 1,
"fragment vnode count {} does not match table vnode count {}",
vnode_mapping.len(), table_desc.vnode_count,
);
&WorkerSlotMapping::new_single(vnode_mapping.iter().next().unwrap())
} else {
vnode_mapping
Expand Down
35 changes: 23 additions & 12 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::num::NonZeroUsize;

use itertools::Itertools;
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_common::{bail, current_cluster_version};
Expand All @@ -41,8 +42,7 @@ use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
};
use risingwave_pb::meta::{
PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation,
RelationGroup,
PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, Relation, RelationGroup,
};
use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
Expand All @@ -69,7 +69,7 @@ use crate::controller::utils::{
};
use crate::controller::ObjectModel;
use crate::manager::{NotificationVersion, StreamingJob};
use crate::model::{StreamContext, TableParallelism};
use crate::model::{StreamContext, TableFragments, TableParallelism};
use crate::stream::SplitAssignment;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -395,14 +395,18 @@ impl CatalogController {
Ok(table_id_map)
}

// TODO: In this function, we also update the `Table` model in the meta store.
// Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
// making them the source of truth and performing a full replacement for those in the meta store?
pub async fn prepare_streaming_job(
&self,
table_fragment: PbTableFragments,
table_fragments: &TableFragments,
streaming_job: &StreamingJob,
for_replace: bool,
) -> MetaResult<()> {
let fragment_actors =
Self::extract_fragment_and_actors_from_table_fragments(table_fragment)?;
Self::extract_fragment_and_actors_from_table_fragments(table_fragments.to_protobuf())?;
let all_tables = table_fragments.all_tables();
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

Expand All @@ -414,7 +418,6 @@ impl CatalogController {
for fragment in fragments {
let fragment_id = fragment.fragment_id;
let state_table_ids = fragment.state_table_ids.inner_ref().clone();
let vnode_count = fragment.vnode_count;

let fragment = fragment.into_active_model();
Fragment::insert(fragment).exec(&txn).await?;
Expand All @@ -423,10 +426,19 @@ impl CatalogController {
// After table fragments are created, update them for all internal tables.
if !for_replace {
for state_table_id in state_table_ids {
// Table's vnode count is not always the fragment's vnode count, so we have to
// look up the table from `TableFragments`.
// See `ActorGraphBuilder::new`.
let table = all_tables
.get(&(state_table_id as u32))
.unwrap_or_else(|| panic!("table {} not found", state_table_id));
assert_eq!(table.fragment_id, fragment_id as u32);
let vnode_count = table.vnode_count();

table::ActiveModel {
table_id: Set(state_table_id as _),
fragment_id: Set(Some(fragment_id)),
vnode_count: Set(vnode_count),
vnode_count: Set(vnode_count as _),
..Default::default()
}
.update(&txn)
Expand Down Expand Up @@ -1024,25 +1036,24 @@ impl CatalogController {
table.incoming_sinks = Set(incoming_sinks.into());
let table = table.update(txn).await?;

// Fields including `fragment_id` and `vnode_count` were placeholder values before.
// Fields including `fragment_id` were placeholder values before.
// After table fragments are created, update them for all internal tables.
let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find()
let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::StateTableIds,
fragment::Column::VnodeCount,
])
.filter(fragment::Column::JobId.eq(dummy_id))
.into_tuple()
.all(txn)
.await?;
for (fragment_id, state_table_ids, vnode_count) in fragment_info {
for (fragment_id, state_table_ids) in fragment_info {
for state_table_id in state_table_ids.into_inner() {
table::ActiveModel {
table_id: Set(state_table_id as _),
fragment_id: Set(Some(fragment_id)),
vnode_count: Set(vnode_count),
// No need to update `vnode_count` because it must remain the same.
..Default::default()
}
.update(txn)
Expand Down
13 changes: 12 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,21 @@ impl TableFragments {
/// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`],
/// the table catalogs returned here are complete, with all fields filled.
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
self.collect_tables_inner(true)
}

/// `internal_tables()` with additional table in `Materialize` node.
pub fn all_tables(&self) -> BTreeMap<u32, Table> {
self.collect_tables_inner(false)
}

fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
stream_graph_visitor::visit_stream_node_internal_tables(
stream_graph_visitor::visit_stream_node_tables_inner(
&mut fragment.actors[0].nodes.clone().unwrap(),
internal_tables_only,
true,
|table, _| {
let table_id = table.id;
tables
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ impl DdlController {

self.metadata_manager
.catalog_controller
.prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false)
.prepare_streaming_job(&table_fragments, streaming_job, false)
.await?;

// create streaming jobs.
Expand Down Expand Up @@ -1237,7 +1237,7 @@ impl DdlController {

self.metadata_manager
.catalog_controller
.prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true)
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.await?;

self.stream_manager
Expand Down Expand Up @@ -1435,7 +1435,7 @@ impl DdlController {

self.metadata_manager
.catalog_controller
.prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true)
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.await?;

self.stream_manager
Expand Down
19 changes: 16 additions & 3 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use assert_matches::assert_matches;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::{ActorId, ActorMapping, WorkerSlotId};
use risingwave_common::hash::{ActorId, ActorMapping, IsSingleton, VnodeCount, WorkerSlotId};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::stream_graph_visitor::visit_tables;
use risingwave_meta_model::WorkerId;
Expand Down Expand Up @@ -683,9 +683,22 @@ impl ActorGraphBuilder {
// Fill the vnode count for each internal table, based on schedule result.
let mut fragment_graph = fragment_graph;
for (id, fragment) in fragment_graph.building_fragments_mut() {
let vnode_count = distributions[id].vnode_count();
let fragment_vnode_count = distributions[id].vnode_count();
visit_tables(fragment, |table, _| {
table.maybe_vnode_count = Some(vnode_count as _);
// There are special cases where a hash-distributed fragment contains singleton
// internal tables, e.g., the state table of `Source` executors.
let vnode_count = if table.is_singleton() {
if fragment_vnode_count > 1 {
tracing::info!(
table.name,
"found singleton table in hash-distributed fragment"
);
}
1
} else {
fragment_vnode_count
};
table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf();
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl GlobalStreamManager {
if let Some((streaming_job, context, table_fragments)) = replace_table_job_info {
self.metadata_manager
.catalog_controller
.prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true)
.prepare_streaming_job(&table_fragments, &streaming_job, true)
.await?;

let dummy_table_id = table_fragments.table_id();
Expand Down

0 comments on commit 62d1d31

Please sign in to comment.