Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): clarify the completeness of internal table catalogs #18944

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 44 additions & 18 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ impl SchemaCatalog {
}

pub fn drop_table(&mut self, id: TableId) {
let table_ref = self.table_by_id.remove(&id).unwrap();
let Some(table_ref) = self.table_by_id.remove(&id) else {
tracing::warn!(?id, "table to drop not found, cleaning up?");
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
return;
};

self.table_by_name.remove(&table_ref.name).unwrap();
self.indexes_by_table_id.remove(&table_ref.id);
}
Expand Down Expand Up @@ -190,7 +194,11 @@ impl SchemaCatalog {
}

pub fn drop_index(&mut self, id: IndexId) {
let index_ref = self.index_by_id.remove(&id).unwrap();
let Some(index_ref) = self.index_by_id.remove(&id) else {
tracing::warn!(?id, "index to drop not found, cleaning up?");
return;
};

self.index_by_name.remove(&index_ref.name).unwrap();
match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
Occupied(mut entry) => {
Expand Down Expand Up @@ -225,7 +233,11 @@ impl SchemaCatalog {
}

pub fn drop_source(&mut self, id: SourceId) {
let source_ref = self.source_by_id.remove(&id).unwrap();
let Some(source_ref) = self.source_by_id.remove(&id) else {
tracing::warn!(?id, "source to drop not found, cleaning up?");
return;
};

self.source_by_name.remove(&source_ref.name).unwrap();
if let Some(connection_id) = source_ref.connection_id {
if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) {
Expand Down Expand Up @@ -274,7 +286,11 @@ impl SchemaCatalog {
}

pub fn drop_sink(&mut self, id: SinkId) {
let sink_ref = self.sink_by_id.remove(&id).unwrap();
let Some(sink_ref) = self.sink_by_id.remove(&id) else {
tracing::warn!(?id, "sink to drop not found, cleaning up?");
return;
};

self.sink_by_name.remove(&sink_ref.name).unwrap();
if let Some(connection_id) = sink_ref.connection_id {
if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) {
Expand Down Expand Up @@ -318,7 +334,11 @@ impl SchemaCatalog {
}

pub fn drop_subscription(&mut self, id: SubscriptionId) {
let subscription_ref = self.subscription_by_id.remove(&id).unwrap();
let Some(subscription_ref) = self.subscription_by_id.remove(&id) else {
tracing::warn!(?id, "subscription to drop not found, cleaning up?");
return;
};

self.subscription_by_name
.remove(&subscription_ref.name)
.unwrap();
Expand Down Expand Up @@ -354,7 +374,11 @@ impl SchemaCatalog {
}

pub fn drop_view(&mut self, id: ViewId) {
let view_ref = self.view_by_id.remove(&id).unwrap();
let Some(view_ref) = self.view_by_id.remove(&id) else {
tracing::warn!(?id, "view to drop not found, cleaning up?");
return;
};

self.view_by_name.remove(&view_ref.name).unwrap();
}

Expand Down Expand Up @@ -411,10 +435,10 @@ impl SchemaCatalog {
}

pub fn drop_function(&mut self, id: FunctionId) {
let function_ref = self
.function_by_id
.remove(&id)
.expect("function not found by id");
let Some(function_ref) = self.function_by_id.remove(&id) else {
tracing::warn!(?id, "function to drop not found, cleaning up?");
return;
};

self.function_registry
.remove(Self::get_func_sign(&function_ref))
Expand Down Expand Up @@ -483,10 +507,11 @@ impl SchemaCatalog {
}

pub fn drop_connection(&mut self, connection_id: ConnectionId) {
let connection_ref = self
.connection_by_id
.remove(&connection_id)
.expect("connection not found by id");
let Some(connection_ref) = self.connection_by_id.remove(&connection_id) else {
tracing::warn!(?connection_id, "connection to drop not found, cleaning up?");
return;
};

self.connection_by_name
.remove(&connection_ref.name)
.expect("connection not found by name");
Expand Down Expand Up @@ -523,10 +548,11 @@ impl SchemaCatalog {
}

pub fn drop_secret(&mut self, secret_id: SecretId) {
let secret_ref = self
.secret_by_id
.remove(&secret_id)
.expect("secret not found by id");
let Some(secret_ref) = self.secret_by_id.remove(&secret_id) else {
tracing::warn!(?secret_id, "secret to drop not found, cleaning up?");
return;
};

self.secret_by_name
.remove(&secret_ref.name)
.expect("secret not found by name");
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl FrontendObserverNode {
let Some(info) = resp.info.as_ref() else {
return;
};
tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");

let mut catalog_guard = self.catalog.write();
match info {
Expand Down
13 changes: 7 additions & 6 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ use tracing::info;
use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs};
use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs};
use crate::controller::utils::{
build_relation_group, check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty,
ensure_user_id, extract_external_table_name_from_definition, get_referring_objects,
build_relation_group_for_delete, check_connection_name_duplicate,
check_database_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
extract_external_table_name_from_definition, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
Expand Down Expand Up @@ -891,7 +892,7 @@ impl CatalogController {

txn.commit().await?;

let relation_group = build_relation_group(
let relation_group = build_relation_group_for_delete(
dirty_mview_objs
.into_iter()
.chain(dirty_mview_internal_table_objs.into_iter())
Expand Down Expand Up @@ -2305,7 +2306,7 @@ impl CatalogController {

// notify about them.
self.notify_users_update(user_infos).await;
let relation_group = build_relation_group(to_drop_objects);
let relation_group = build_relation_group_for_delete(to_drop_objects);

let version = self
.notify_frontend(NotificationOperation::Delete, relation_group)
Expand Down
62 changes: 38 additions & 24 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::barrier::{ReplaceTablePlan, Reschedule};
use crate::controller::catalog::CatalogController;
use crate::controller::rename::ReplaceTableExprRewriter;
use crate::controller::utils::{
build_relation_group, check_relation_name_duplicate, check_sink_into_table_cycle,
build_relation_group_for_delete, check_relation_name_duplicate, check_sink_into_table_cycle,
ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings,
rebuild_fragment_mapping_from_actors, PartialObject,
};
Expand Down Expand Up @@ -333,16 +333,20 @@ impl CatalogController {
Ok(())
}

/// Create catalogs for internal tables. Some of the fields in the given arguments are
/// placeholders will be updated later in `prepare_streaming_job`.
///
/// Returns a mapping from the temporary table id to the actual global table id.
pub async fn create_internal_table_catalog(
&self,
job: &StreamingJob,
mut internal_tables: Vec<PbTable>,
incomplete_internal_tables: Vec<PbTable>,
) -> MetaResult<HashMap<u32, u32>> {
let job_id = job.id() as ObjectId;
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let mut table_id_map = HashMap::new();
for table in &mut internal_tables {
for table in incomplete_internal_tables {
let table_id = Self::create_object(
&txn,
ObjectType::Table,
Expand All @@ -353,33 +357,39 @@ impl CatalogController {
.await?
.oid;
table_id_map.insert(table.id, table_id as u32);
table.id = table_id as _;
let mut table_model: table::ActiveModel = table.clone().into();
table_model.table_id = Set(table_id as _);
table_model.belongs_to_job_id = Set(Some(job_id));
table_model.fragment_id = NotSet;

let table_model = table::ActiveModel {
table_id: Set(table_id as _),
belongs_to_job_id: Set(Some(job_id)),
fragment_id: NotSet,
..table.into()
};
Table::insert(table_model).exec(&txn).await?;
}
txn.commit().await?;

if job.is_materialized_view() {
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;
}

Ok(table_id_map)
}

/// Notify frontend about the given internal tables before the streaming job finishes creating.
/// Should only be called for materialized views.
pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> {
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;

Ok(())
}

pub async fn prepare_streaming_job(
&self,
table_fragment: PbTableFragments,
Expand Down Expand Up @@ -565,7 +575,11 @@ impl CatalogController {
txn.commit().await?;

if !objs.is_empty() {
self.notify_frontend(Operation::Delete, build_relation_group(objs))
// We **may** also have notified the frontend about these objects,
// so we need to notify the frontend to delete them here.
// The frontend will ignore the request if the object does not exist,
// so it's safe to always notify.
self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs))
Copy link
Member Author

@BugenZhao BugenZhao Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the notification now goes after successfully scheduling and building of the job, a failure here does not always mean that the notification was sent. That's why we changed all DROP handlers in the frontend to be DROP .. IF EXISTS.

An alternative could be introducing a new operation like Cleanup for this purposes, ensuring Delete remains strictly applied. But it seems too ad-hoc to me as there's no much other usage of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a race condition? Whereby we have Delete of some catalog followed by Add of that catalog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not because try_abort_creating_streaming_job is called only after create_streaming_job_inner has returned with error, where Add is either pushed into the notification queue or not issued at all. After entering this try_abort function, no further Add operations will occur.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 We ensured that CREATE and DROP will be properly paired in #18476 .Even if the CREATE notification fails to send due to either a FE or meta node reboot or a broken connection between them, the creation of the mview and its internal table catalogs will still be synchronized through an initial snapshot.

.await;
}
Ok(true)
Expand Down
8 changes: 7 additions & 1 deletion src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,13 @@ where
))
}

pub(crate) fn build_relation_group(relation_objects: Vec<PartialObject>) -> NotificationInfo {
/// Build a relation group for notifying the deletion of the given objects.
///
/// Note that only id fields are filled in the relation info, as the arguments are partial objects.
/// As a result, the returned notification info should only be used for deletion.
pub(crate) fn build_relation_group_for_delete(
relation_objects: Vec<PartialObject>,
) -> NotificationInfo {
let mut relations = vec![];
for obj in relation_objects {
match obj.obj_type {
Expand Down
21 changes: 21 additions & 0 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::ops::AddAssign;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{VirtualNode, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::{SourceId, WorkerId};
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand Down Expand Up @@ -564,6 +566,25 @@ impl TableFragments {
}
}

/// Retrieve the internal tables map of the whole graph.
///
/// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`].
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
BugenZhao marked this conversation as resolved.
Show resolved Hide resolved
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
stream_graph_visitor::visit_stream_node_internal_tables(
&mut fragment.actors[0].nodes.clone().unwrap(),
|table, _| {
let table_id = table.id;
tables
.try_insert(table_id, table.clone())
.unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
},
);
}
tables
}

/// Returns the internal table ids without the mview table.
pub fn internal_table_ids(&self) -> Vec<u32> {
self.fragments
Expand Down
19 changes: 16 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,11 +1040,14 @@ impl DdlController {
streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id());

// create internal table catalogs and refill table id.
let internal_tables = fragment_graph.internal_tables().into_values().collect_vec();
let incomplete_internal_tables = fragment_graph
.incomplete_internal_tables()
.into_values()
.collect_vec();
let table_id_map = self
.metadata_manager
.catalog_controller
.create_internal_table_catalog(&streaming_job, internal_tables)
.create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
.await?;
fragment_graph.refill_internal_table_ids(table_id_map);

Expand Down Expand Up @@ -1079,6 +1082,16 @@ impl DdlController {
.await?;

let streaming_job = &ctx.streaming_job;
let internal_tables = ctx.internal_tables();

// Now that all fields in `internal_tables` are initialized,
// we can notify frontend for these relations.
if streaming_job.is_materialized_view() {
self.metadata_manager
.catalog_controller
.pre_notify_internal_tables(&internal_tables)
.await?;
}

match streaming_job {
StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
Expand Down Expand Up @@ -1560,7 +1573,6 @@ impl DdlController {
) -> MetaResult<(CreateStreamingJobContext, TableFragments)> {
let id = stream_job.id();
let specified_parallelism = fragment_graph.specified_parallelism();
let internal_tables = fragment_graph.internal_tables();
let expr_context = stream_ctx.to_expr_context();
let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();

Expand Down Expand Up @@ -1643,6 +1655,7 @@ impl DdlController {
table_parallelism,
max_parallelism.get(),
);
let internal_tables = table_fragments.internal_tables();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal_tables field in CreateStreamingJobContext will now be complete.


if let Some(mview_fragment) = table_fragments.mview_fragment() {
stream_job.set_table_vnode_count(mview_fragment.vnode_count());
Expand Down
Loading
Loading