Skip to content

Commit

Permalink
refactor(meta): only notify complete catalogs when creating mv
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 16, 2024
1 parent b089d1e commit 55de07d
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 27 deletions.
57 changes: 35 additions & 22 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,13 @@ impl CatalogController {
pub async fn create_internal_table_catalog(
&self,
job: &StreamingJob,
mut internal_tables: Vec<PbTable>,
incomplete_internal_tables: Vec<PbTable>,
) -> MetaResult<HashMap<u32, u32>> {
let job_id = job.id() as ObjectId;
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let mut table_id_map = HashMap::new();
for table in &mut internal_tables {
for table in incomplete_internal_tables {
let table_id = Self::create_object(
&txn,
ObjectType::Table,
Expand All @@ -353,33 +353,46 @@ impl CatalogController {
.await?
.oid;
table_id_map.insert(table.id, table_id as u32);
table.id = table_id as _;
let mut table_model: table::ActiveModel = table.clone().into();
table_model.table_id = Set(table_id as _);
table_model.belongs_to_job_id = Set(Some(job_id));
table_model.fragment_id = NotSet;

let table_model = table::ActiveModel {
table_id: Set(table_id as _),
belongs_to_job_id: Set(Some(job_id)),
fragment_id: NotSet,
..table.into()
};
Table::insert(table_model).exec(&txn).await?;
}
txn.commit().await?;

if job.is_materialized_view() {
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;
}

Ok(table_id_map)
}

pub async fn pre_notify_relations_for_mv(
&self,
job: &StreamingJob,
internal_tables: &[PbTable],
) -> MetaResult<()> {
let StreamingJob::MaterializedView(table) = job else {
return Ok(());
};

let tables = std::iter::once(table).chain(internal_tables);

self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: tables
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;

Ok(())
}

pub async fn prepare_streaming_job(
&self,
table_fragment: PbTableFragments,
Expand Down
19 changes: 19 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::ops::AddAssign;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{VirtualNode, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::{SourceId, WorkerId};
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand Down Expand Up @@ -564,6 +566,23 @@ impl TableFragments {
}
}

/// Retrieve the internal tables map of the whole graph.
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
stream_graph_visitor::visit_stream_node_internal_tables(
&mut fragment.actors[0].nodes.clone().unwrap(),
|table, _| {
let table_id = table.id;
tables
.try_insert(table_id, table.clone())
.unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
},
);
}
tables
}

/// Returns the internal table ids without the mview table.
pub fn internal_table_ids(&self) -> Vec<u32> {
self.fragments
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,6 @@ impl DdlController {
) -> MetaResult<(CreateStreamingJobContext, TableFragments)> {
let id = stream_job.id();
let specified_parallelism = fragment_graph.specified_parallelism();
let internal_tables = fragment_graph.internal_tables();
let expr_context = stream_ctx.to_expr_context();
let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();

Expand Down Expand Up @@ -1096,6 +1095,7 @@ impl DdlController {
table_parallelism,
max_parallelism.get(),
);
let internal_tables = table_fragments.internal_tables();

if let Some(mview_fragment) = table_fragments.mview_fragment() {
stream_job.set_table_vnode_count(mview_fragment.vnode_count());
Expand Down
15 changes: 13 additions & 2 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,14 @@ impl DdlController {
streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id());

// create internal table catalogs and refill table id.
let internal_tables = fragment_graph.internal_tables().into_values().collect_vec();
let incomplete_internal_tables = fragment_graph
.incomplete_internal_tables()
.into_values()
.collect_vec();
let table_id_map = self
.metadata_manager
.catalog_controller
.create_internal_table_catalog(&streaming_job, internal_tables)
.create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
.await?;
fragment_graph.refill_internal_table_ids(table_id_map);

Expand Down Expand Up @@ -185,6 +188,14 @@ impl DdlController {
.await?;

let streaming_job = &ctx.streaming_job;
let internal_tables = ctx.internal_tables();

// Now that all fields in `streaming_job` and `internal_tables` are initialized,
// we can notify frontend for these relations.
self.metadata_manager
.catalog_controller
.pre_notify_relations_for_mv(streaming_job, &internal_tables)
.await?;

match streaming_job {
StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl StreamFragmentGraph {
}

/// Retrieve the internal tables map of the whole graph.
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
for table in fragment.extract_internal_tables() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ async fn test_graph_builder() -> MetaResult<()> {
time_zone: graph.ctx.as_ref().unwrap().timezone.clone(),
};
let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?;
let internal_tables = fragment_graph.internal_tables();
let internal_tables = fragment_graph.incomplete_internal_tables();

let actor_graph_builder = ActorGraphBuilder::new(
job.id(),
Expand Down

0 comments on commit 55de07d

Please sign in to comment.