Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): clarify the completeness of internal table catalogs #18944

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl FrontendObserverNode {
let Some(info) = resp.info.as_ref() else {
return;
};
tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");

let mut catalog_guard = self.catalog.write();
match info {
Expand Down
13 changes: 7 additions & 6 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ use tracing::info;
use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs};
use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs};
use crate::controller::utils::{
build_relation_group, check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty,
ensure_user_id, extract_external_table_name_from_definition, get_referring_objects,
build_relation_group_for_delete, check_connection_name_duplicate,
check_database_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
extract_external_table_name_from_definition, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
Expand Down Expand Up @@ -891,7 +892,7 @@ impl CatalogController {

txn.commit().await?;

let relation_group = build_relation_group(
let relation_group = build_relation_group_for_delete(
dirty_mview_objs
.into_iter()
.chain(dirty_mview_internal_table_objs.into_iter())
Expand Down Expand Up @@ -2305,7 +2306,7 @@ impl CatalogController {

// notify about them.
self.notify_users_update(user_infos).await;
let relation_group = build_relation_group(to_drop_objects);
let relation_group = build_relation_group_for_delete(to_drop_objects);

let version = self
.notify_frontend(NotificationOperation::Delete, relation_group)
Expand Down
35 changes: 26 additions & 9 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::barrier::{ReplaceTablePlan, Reschedule};
use crate::controller::catalog::CatalogController;
use crate::controller::rename::ReplaceTableExprRewriter;
use crate::controller::utils::{
build_relation_group, check_relation_name_duplicate, check_sink_into_table_cycle,
build_relation_group_for_delete, check_relation_name_duplicate, check_sink_into_table_cycle,
ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings,
rebuild_fragment_mapping_from_actors, PartialObject,
};
Expand Down Expand Up @@ -99,6 +99,11 @@ impl CatalogController {
Ok(obj.oid)
}

/// Create catalogs for the streaming job, then notify frontend about them if the job is a
/// materialized view.
///
/// Some of the fields in the given streaming job are placeholders, which will
/// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
pub async fn create_job_catalog(
&self,
streaming_job: &mut StreamingJob,
Expand Down Expand Up @@ -333,16 +338,23 @@ impl CatalogController {
Ok(())
}

/// Create catalogs for internal tables, then notify frontend about them if the job is a
/// materialized view.
///
/// Some of the fields in the given "incomplete" internal tables are placeholders, which will
/// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
///
/// Returns a mapping from the temporary table id to the actual global table id.
pub async fn create_internal_table_catalog(
&self,
job: &StreamingJob,
mut internal_tables: Vec<PbTable>,
mut incomplete_internal_tables: Vec<PbTable>,
) -> MetaResult<HashMap<u32, u32>> {
let job_id = job.id() as ObjectId;
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let mut table_id_map = HashMap::new();
for table in &mut internal_tables {
for table in &mut incomplete_internal_tables {
let table_id = Self::create_object(
&txn,
ObjectType::Table,
Expand All @@ -354,10 +366,13 @@ impl CatalogController {
.oid;
table_id_map.insert(table.id, table_id as u32);
table.id = table_id as _;
let mut table_model: table::ActiveModel = table.clone().into();
table_model.table_id = Set(table_id as _);
table_model.belongs_to_job_id = Set(Some(job_id));
table_model.fragment_id = NotSet;

let table_model = table::ActiveModel {
table_id: Set(table_id as _),
belongs_to_job_id: Set(Some(job_id)),
fragment_id: NotSet,
..table.clone().into()
};
Table::insert(table_model).exec(&txn).await?;
}
txn.commit().await?;
Expand All @@ -366,7 +381,7 @@ impl CatalogController {
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: internal_tables
relations: incomplete_internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
Expand Down Expand Up @@ -565,7 +580,9 @@ impl CatalogController {
txn.commit().await?;

if !objs.is_empty() {
self.notify_frontend(Operation::Delete, build_relation_group(objs))
// We also have notified the frontend about these objects,
// so we need to notify the frontend to delete them here.
self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs))
Copy link
Member Author

@BugenZhao BugenZhao Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the notification now goes after successfully scheduling and building of the job, a failure here does not always mean that the notification was sent. That's why we changed all DROP handlers in the frontend to be DROP .. IF EXISTS.

An alternative could be introducing a new operation like Cleanup for this purposes, ensuring Delete remains strictly applied. But it seems too ad-hoc to me as there's no much other usage of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a race condition? Whereby we have Delete of some catalog followed by Add of that catalog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not because try_abort_creating_streaming_job is called only after create_streaming_job_inner has returned with error, where Add is either pushed into the notification queue or not issued at all. After entering this try_abort function, no further Add operations will occur.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 We ensured that CREATE and DROP will be properly paired in #18476 .Even if the CREATE notification fails to send due to either a FE or meta node reboot or a broken connection between them, the creation of the mview and its internal table catalogs will still be synchronized through an initial snapshot.

.await;
}
Ok(true)
Expand Down
8 changes: 7 additions & 1 deletion src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,13 @@ where
))
}

pub(crate) fn build_relation_group(relation_objects: Vec<PartialObject>) -> NotificationInfo {
/// Build a relation group for notifying the deletion of the given objects.
///
/// Note that only id fields are filled in the relation info, as the arguments are partial objects.
/// As a result, the returned notification info should only be used for deletion.
pub(crate) fn build_relation_group_for_delete(
relation_objects: Vec<PartialObject>,
) -> NotificationInfo {
let mut relations = vec![];
for obj in relation_objects {
match obj.obj_type {
Expand Down
22 changes: 22 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::ops::AddAssign;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{VirtualNode, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::{SourceId, WorkerId};
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand Down Expand Up @@ -564,6 +566,26 @@ impl TableFragments {
}
}

/// Retrieve the **complete** internal tables map of the whole graph.
///
/// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`],
/// the table catalogs returned here are complete, with all fields filled.
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
stream_graph_visitor::visit_stream_node_internal_tables(
&mut fragment.actors[0].nodes.clone().unwrap(),
|table, _| {
let table_id = table.id;
tables
.try_insert(table_id, table.clone())
.unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
},
);
}
tables
}

/// Returns the internal table ids without the mview table.
pub fn internal_table_ids(&self) -> Vec<u32> {
self.fragments
Expand Down
9 changes: 6 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,11 +1040,14 @@ impl DdlController {
streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id());

// create internal table catalogs and refill table id.
let internal_tables = fragment_graph.internal_tables().into_values().collect_vec();
let incomplete_internal_tables = fragment_graph
.incomplete_internal_tables()
.into_values()
.collect_vec();
let table_id_map = self
.metadata_manager
.catalog_controller
.create_internal_table_catalog(&streaming_job, internal_tables)
.create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
.await?;
fragment_graph.refill_internal_table_ids(table_id_map);

Expand Down Expand Up @@ -1560,7 +1563,6 @@ impl DdlController {
) -> MetaResult<(CreateStreamingJobContext, TableFragments)> {
let id = stream_job.id();
let specified_parallelism = fragment_graph.specified_parallelism();
let internal_tables = fragment_graph.internal_tables();
let expr_context = stream_ctx.to_expr_context();
let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();

Expand Down Expand Up @@ -1643,6 +1645,7 @@ impl DdlController {
table_parallelism,
max_parallelism.get(),
);
let internal_tables = table_fragments.internal_tables();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal_tables field in CreateStreamingJobContext will now be complete.


if let Some(mview_fragment) = table_fragments.mview_fragment() {
stream_job.set_table_vnode_count(mview_fragment.vnode_count());
Expand Down
15 changes: 13 additions & 2 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ impl StreamFragmentGraph {
) -> MetaResult<Self> {
let fragment_id_gen =
GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
// Note: in SQL backend, the ids generated here are fake and will be overwritten again
// with `refill_internal_table_ids` later.
// TODO: refactor the code to remove this step.
let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);

// Create nodes.
Expand Down Expand Up @@ -417,8 +420,14 @@ impl StreamFragmentGraph {
})
}

/// Retrieve the internal tables map of the whole graph.
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
/// Retrieve the **incomplete** internal tables map of the whole graph.
///
/// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
/// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
/// Be careful when using the returned values.
///
/// See also [`crate::model::TableFragments::internal_tables`].
pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
for table in fragment.extract_internal_tables() {
Expand All @@ -431,6 +440,8 @@ impl StreamFragmentGraph {
tables
}

/// Refill the internal tables' `table_id`s according to the given map, typically obtained from
/// `create_internal_table_catalog`.
pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
for fragment in self.fragments.values_mut() {
stream_graph_visitor::visit_internal_tables(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ async fn test_graph_builder() -> MetaResult<()> {
time_zone: graph.ctx.as_ref().unwrap().timezone.clone(),
};
let fragment_graph = StreamFragmentGraph::new(&env, graph, &job)?;
let internal_tables = fragment_graph.internal_tables();
let internal_tables = fragment_graph.incomplete_internal_tables();

let actor_graph_builder = ActorGraphBuilder::new(
job.id(),
Expand Down
Loading