diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index c8e1f42656e47..12df0e6c66252 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -45,3 +45,6 @@ c583e2c6c054764249acf484438c7bf7197765f4 # chore: cleanup v2 naming for sql metastore (#18941) 9a6a7f9052d5679165ff57cc01417c742c95351c + +# refactor: split catalog to smaller files (#19870) +d6341b74be3f1913cc93993a95c147999df1ff74 diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs index a3e894a05f370..a2105e4ef1927 100644 --- a/src/meta/src/controller/catalog/drop_op.rs +++ b/src/meta/src/controller/catalog/drop_op.rs @@ -207,10 +207,8 @@ impl CatalogController { } }); - let (source_fragments, removed_actors, removed_fragments) = - resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; - - let fragment_ids = get_fragment_ids_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; + let (removed_source_fragments, removed_actors, removed_fragments) = + get_fragments_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; // Find affect users with privileges on all this objects. let to_update_user_ids: Vec = UserPrivilege::find() @@ -245,10 +243,10 @@ impl CatalogController { .notify_frontend(NotificationOperation::Delete, relation_group) .await; - let fragment_mappings = fragment_ids - .into_iter() + let fragment_mappings = removed_fragments + .iter() .map(|fragment_id| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, + fragment_id: *fragment_id as _, mapping: None, }) .collect(); @@ -259,11 +257,10 @@ impl CatalogController { Ok(( ReleaseContext { database_id, - streaming_job_ids: to_drop_streaming_jobs, - state_table_ids: to_drop_state_table_ids, - source_ids: to_drop_source_ids, - connections: vec![], - source_fragments, + removed_streaming_job_ids: to_drop_streaming_jobs, + removed_state_table_ids: to_drop_state_table_ids, + removed_source_ids: to_drop_source_ids, + removed_source_fragments, removed_actors, removed_fragments, }, @@ -418,8 +415,8 @@ impl CatalogController { .all(&txn) .await?; - let (source_fragments, removed_actors, removed_fragments) = - resolve_source_register_info_for_jobs(&txn, streaming_jobs.clone()).await?; + let (removed_source_fragments, removed_actors, removed_fragments) = + get_fragments_for_jobs(&txn, streaming_jobs.clone()).await?; let state_table_ids: Vec = Table::find() .select_only() @@ -445,15 +442,6 @@ impl CatalogController { .all(&txn) .await?; - let connections = Connection::find() - .inner_join(Object) - .filter(object::Column::DatabaseId.eq(Some(database_id))) - .all(&txn) - .await? - .into_iter() - .map(|conn| conn.connection_id) - .collect_vec(); - // Find affect users with privileges on the database and the objects in the database. let to_update_user_ids: Vec = UserPrivilege::find() .select_only() @@ -503,11 +491,10 @@ impl CatalogController { Ok(( ReleaseContext { database_id, - streaming_job_ids: streaming_jobs, - state_table_ids, - source_ids, - connections, - source_fragments, + removed_streaming_job_ids: streaming_jobs, + removed_state_table_ids: state_table_ids, + removed_source_ids: source_ids, + removed_source_fragments, removed_actors, removed_fragments, }, diff --git a/src/meta/src/controller/catalog/mod.rs b/src/meta/src/controller/catalog/mod.rs index 30acf8ad551cd..b41a210d3e6e9 100644 --- a/src/meta/src/controller/catalog/mod.rs +++ b/src/meta/src/controller/catalog/mod.rs @@ -78,15 +78,7 @@ use super::utils::{ rename_relation, rename_relation_refer, }; use crate::controller::catalog::util::update_internal_tables; -use crate::controller::utils::{ - build_relation_group_for_delete, check_connection_name_duplicate, - check_database_name_duplicate, check_function_signature_duplicate, - check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, - ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - extract_external_table_name_from_definition, get_referring_objects, - get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, - resolve_source_register_info_for_jobs, PartialObject, -}; +use crate::controller::utils::*; use crate::controller::ObjectModel; use crate::manager::{ get_referred_connection_ids_from_source, get_referred_secret_ids_from_source, MetaSrvEnv, @@ -121,20 +113,17 @@ pub struct CatalogController { #[derive(Clone, Default)] pub struct ReleaseContext { pub(crate) database_id: DatabaseId, - pub(crate) streaming_job_ids: Vec, + pub(crate) removed_streaming_job_ids: Vec, /// Dropped state table list, need to unregister from hummock. - pub(crate) state_table_ids: Vec, - /// Dropped source list, need to unregister from source manager. - pub(crate) source_ids: Vec, - /// Dropped connection list, need to delete from vpc endpoints. - #[allow(dead_code)] - pub(crate) connections: Vec, - - /// Dropped fragments that are fetching data from the target source. - pub(crate) source_fragments: HashMap>, - /// Dropped actors. - pub(crate) removed_actors: HashSet, + pub(crate) removed_state_table_ids: Vec, + /// Dropped sources (when `DROP SOURCE`), need to unregister from source manager. + pub(crate) removed_source_ids: Vec, + /// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources), + /// need to unregister from source manager. + pub(crate) removed_source_fragments: HashMap>, + + pub(crate) removed_actors: HashSet, pub(crate) removed_fragments: HashSet, } diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 84d3d4da6736c..6db9fe1ab3ca4 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -39,8 +39,7 @@ use risingwave_pb::meta::subscribe_response::Info as NotificationInfo; use risingwave_pb::meta::{ FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, }; -use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; +use risingwave_pb::stream_plan::PbFragmentTypeFlag; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo}; use risingwave_sqlparser::ast::Statement as SqlStatement; @@ -1059,25 +1058,11 @@ where Ok(fragment_actors.into_iter().into_group_map()) } -/// Find the external stream source info inside the stream node, if any. -pub fn find_stream_source(stream_node: &PbStreamNode) -> Option<&StreamSource> { - if let Some(NodeBody::Source(source)) = &stream_node.node_body { - if let Some(inner) = &source.source_inner { - return Some(inner); - } - } - - for child in &stream_node.input { - if let Some(source) = find_stream_source(child) { - return Some(source); - } - } - - None -} - -/// Resolve fragment list that are subscribing to sources and actor lists. -pub async fn resolve_source_register_info_for_jobs( +/// For the given streaming jobs, returns +/// - All source fragments +/// - All actors +/// - All fragments +pub async fn get_fragments_for_jobs( db: &C, streaming_jobs: Vec, ) -> MetaResult<( @@ -1113,20 +1098,20 @@ where .all(db) .await?; - let removed_fragments = fragments + let fragment_ids = fragments .iter() .map(|(fragment_id, _, _)| *fragment_id) .collect(); - let mut source_fragment_ids = HashMap::new(); + let mut source_fragment_ids: HashMap> = HashMap::new(); for (fragment_id, mask, stream_node) in fragments { if mask & PbFragmentTypeFlag::Source as i32 == 0 { continue; } - if let Some(source) = find_stream_source(&stream_node.to_protobuf()) { + if let Some(source_id) = stream_node.to_protobuf().find_stream_source() { source_fragment_ids - .entry(source.source_id as SourceId) - .or_insert_with(BTreeSet::new) + .entry(source_id as _) + .or_default() .insert(fragment_id); } } @@ -1134,7 +1119,7 @@ where Ok(( source_fragment_ids, actors.into_iter().collect(), - removed_fragments, + fragment_ids, )) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 89200b4b63d6c..b0d72f0b6478d 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1252,10 +1252,10 @@ impl DdlController { let ReleaseContext { database_id, - streaming_job_ids, - state_table_ids, - source_ids, - source_fragments, + removed_streaming_job_ids, + removed_state_table_ids, + removed_source_ids, + removed_source_fragments, removed_actors, removed_fragments, .. @@ -1263,13 +1263,13 @@ impl DdlController { // unregister sources. self.source_manager - .unregister_sources(source_ids.into_iter().map(|id| id as _).collect()) + .unregister_sources(removed_source_ids.into_iter().map(|id| id as _).collect()) .await; // unregister fragments and actors from source manager. self.source_manager .drop_source_fragments( - source_fragments + removed_source_fragments .into_iter() .map(|(source_id, fragments)| { ( @@ -1287,8 +1287,8 @@ impl DdlController { .drop_streaming_jobs( risingwave_common::catalog::DatabaseId::new(database_id as _), removed_actors.into_iter().map(|id| id as _).collect(), - streaming_job_ids, - state_table_ids, + removed_streaming_job_ids, + removed_state_table_ids, removed_fragments.iter().map(|id| *id as _).collect(), ) .await;