Skip to content

Commit

Permalink
revert logical changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 21, 2024
1 parent 1ab43fd commit 5adb7fe
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 84 deletions.
62 changes: 18 additions & 44 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ impl SchemaCatalog {
}

pub fn drop_table(&mut self, id: TableId) {
let Some(table_ref) = self.table_by_id.remove(&id) else {
tracing::warn!(?id, "table to drop not found, cleaning up?");
return;
};

let table_ref = self.table_by_id.remove(&id).unwrap();
self.table_by_name.remove(&table_ref.name).unwrap();
self.indexes_by_table_id.remove(&table_ref.id);
}
Expand Down Expand Up @@ -194,11 +190,7 @@ impl SchemaCatalog {
}

pub fn drop_index(&mut self, id: IndexId) {
let Some(index_ref) = self.index_by_id.remove(&id) else {
tracing::warn!(?id, "index to drop not found, cleaning up?");
return;
};

let index_ref = self.index_by_id.remove(&id).unwrap();
self.index_by_name.remove(&index_ref.name).unwrap();
match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
Occupied(mut entry) => {
Expand Down Expand Up @@ -233,11 +225,7 @@ impl SchemaCatalog {
}

pub fn drop_source(&mut self, id: SourceId) {
let Some(source_ref) = self.source_by_id.remove(&id) else {
tracing::warn!(?id, "source to drop not found, cleaning up?");
return;
};

let source_ref = self.source_by_id.remove(&id).unwrap();
self.source_by_name.remove(&source_ref.name).unwrap();
if let Some(connection_id) = source_ref.connection_id {
if let Occupied(mut e) = self.connection_source_ref.entry(connection_id) {
Expand Down Expand Up @@ -286,11 +274,7 @@ impl SchemaCatalog {
}

pub fn drop_sink(&mut self, id: SinkId) {
let Some(sink_ref) = self.sink_by_id.remove(&id) else {
tracing::warn!(?id, "sink to drop not found, cleaning up?");
return;
};

let sink_ref = self.sink_by_id.remove(&id).unwrap();
self.sink_by_name.remove(&sink_ref.name).unwrap();
if let Some(connection_id) = sink_ref.connection_id {
if let Occupied(mut e) = self.connection_sink_ref.entry(connection_id.0) {
Expand Down Expand Up @@ -334,11 +318,7 @@ impl SchemaCatalog {
}

pub fn drop_subscription(&mut self, id: SubscriptionId) {
let Some(subscription_ref) = self.subscription_by_id.remove(&id) else {
tracing::warn!(?id, "subscription to drop not found, cleaning up?");
return;
};

let subscription_ref = self.subscription_by_id.remove(&id).unwrap();
self.subscription_by_name
.remove(&subscription_ref.name)
.unwrap();
Expand Down Expand Up @@ -374,11 +354,7 @@ impl SchemaCatalog {
}

pub fn drop_view(&mut self, id: ViewId) {
let Some(view_ref) = self.view_by_id.remove(&id) else {
tracing::warn!(?id, "view to drop not found, cleaning up?");
return;
};

let view_ref = self.view_by_id.remove(&id).unwrap();
self.view_by_name.remove(&view_ref.name).unwrap();
}

Expand Down Expand Up @@ -435,10 +411,10 @@ impl SchemaCatalog {
}

pub fn drop_function(&mut self, id: FunctionId) {
let Some(function_ref) = self.function_by_id.remove(&id) else {
tracing::warn!(?id, "function to drop not found, cleaning up?");
return;
};
let function_ref = self
.function_by_id
.remove(&id)
.expect("function not found by id");

self.function_registry
.remove(Self::get_func_sign(&function_ref))
Expand Down Expand Up @@ -507,11 +483,10 @@ impl SchemaCatalog {
}

pub fn drop_connection(&mut self, connection_id: ConnectionId) {
let Some(connection_ref) = self.connection_by_id.remove(&connection_id) else {
tracing::warn!(?connection_id, "connection to drop not found, cleaning up?");
return;
};

let connection_ref = self
.connection_by_id
.remove(&connection_id)
.expect("connection not found by id");
self.connection_by_name
.remove(&connection_ref.name)
.expect("connection not found by name");
Expand Down Expand Up @@ -548,11 +523,10 @@ impl SchemaCatalog {
}

pub fn drop_secret(&mut self, secret_id: SecretId) {
let Some(secret_ref) = self.secret_by_id.remove(&secret_id) else {
tracing::warn!(?secret_id, "secret to drop not found, cleaning up?");
return;
};

let secret_ref = self
.secret_by_id
.remove(&secret_id)
.expect("secret not found by id");
self.secret_by_name
.remove(&secret_ref.name)
.expect("secret not found by name");
Expand Down
57 changes: 30 additions & 27 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl CatalogController {
Ok(obj.oid)
}

/// Create catalogs for the streaming job, then notify frontend about them if the job is a
/// materialized view.
///
/// Some of the fields in the given streaming job are placeholders, which will
/// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
pub async fn create_job_catalog(
&self,
streaming_job: &mut StreamingJob,
Expand Down Expand Up @@ -333,20 +338,23 @@ impl CatalogController {
Ok(())
}

/// Create catalogs for internal tables. Some of the fields in the given arguments are
/// placeholders will be updated later in `prepare_streaming_job`.
/// Create catalogs for internal tables, then notify frontend about them if the job is a
/// materialized view.
///
/// Some of the fields in the given "incomplete" internal tables are placeholders, which will
/// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
///
/// Returns a mapping from the temporary table id to the actual global table id.
pub async fn create_internal_table_catalog(
&self,
job: &StreamingJob,
incomplete_internal_tables: Vec<PbTable>,
mut incomplete_internal_tables: Vec<PbTable>,
) -> MetaResult<HashMap<u32, u32>> {
let job_id = job.id() as ObjectId;
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let mut table_id_map = HashMap::new();
for table in incomplete_internal_tables {
for table in &mut incomplete_internal_tables {
let table_id = Self::create_object(
&txn,
ObjectType::Table,
Expand All @@ -357,37 +365,34 @@ impl CatalogController {
.await?
.oid;
table_id_map.insert(table.id, table_id as u32);
table.id = table_id as _;

let table_model = table::ActiveModel {
table_id: Set(table_id as _),
belongs_to_job_id: Set(Some(job_id)),
fragment_id: NotSet,
..table.into()
..table.clone().into()
};
Table::insert(table_model).exec(&txn).await?;
}
txn.commit().await?;

Ok(table_id_map)
}

/// Notify frontend about the given internal tables before the streaming job finishes creating.
/// Should only be called for materialized views.
pub async fn pre_notify_internal_tables(&self, internal_tables: &[PbTable]) -> MetaResult<()> {
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;
if job.is_materialized_view() {
self.notify_frontend(
Operation::Add,
Info::RelationGroup(RelationGroup {
relations: incomplete_internal_tables
.iter()
.map(|table| Relation {
relation_info: Some(RelationInfo::Table(table.clone())),
})
.collect(),
}),
)
.await;
}

Ok(())
Ok(table_id_map)
}

pub async fn prepare_streaming_job(
Expand Down Expand Up @@ -575,10 +580,8 @@ impl CatalogController {
txn.commit().await?;

if !objs.is_empty() {
// We **may** also have notified the frontend about these objects,
// We also have notified the frontend about these objects,
// so we need to notify the frontend to delete them here.
// The frontend will ignore the request if the object does not exist,
// so it's safe to always notify.
self.notify_frontend(Operation::Delete, build_relation_group_for_delete(objs))
.await;
}
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,10 @@ impl TableFragments {
}
}

/// Retrieve the internal tables map of the whole graph.
/// Retrieve the **complete** internal tables map of the whole graph.
///
/// See also [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`].
/// Compared to [`crate::stream::StreamFragmentGraph::incomplete_internal_tables`],
/// the table catalogs returned here are complete, with all fields filled.
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
Expand Down
10 changes: 0 additions & 10 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,16 +1082,6 @@ impl DdlController {
.await?;

let streaming_job = &ctx.streaming_job;
let internal_tables = ctx.internal_tables();

// Now that all fields in `internal_tables` are initialized,
// we can notify frontend for these relations.
if streaming_job.is_materialized_view() {
self.metadata_manager
.catalog_controller
.pre_notify_internal_tables(&internal_tables)
.await?;
}

match streaming_job {
StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl StreamFragmentGraph {
})
}

/// Retrieve the internal tables map of the whole graph.
/// Retrieve the **incomplete** internal tables map of the whole graph.
///
/// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
/// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
Expand Down

0 comments on commit 5adb7fe

Please sign in to comment.