Skip to content

Commit

Permalink
minor refactor
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 30, 2024
1 parent 2659bf2 commit dc46eac
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 84 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ c583e2c6c054764249acf484438c7bf7197765f4

# chore: cleanup v2 naming for sql metastore (#18941)
9a6a7f9052d5679165ff57cc01417c742c95351c

# refactor: split catalog to smaller files (#19870)
d6341b74be3f1913cc93993a95c147999df1ff74
43 changes: 15 additions & 28 deletions src/meta/src/controller/catalog/drop_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ impl CatalogController {
}
});

let (source_fragments, removed_actors, removed_fragments) =
resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;

let fragment_ids = get_fragment_ids_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
let (removed_source_fragments, removed_actors, removed_fragments) =
get_fragments_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;

// Find affect users with privileges on all this objects.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
Expand Down Expand Up @@ -245,10 +243,10 @@ impl CatalogController {
.notify_frontend(NotificationOperation::Delete, relation_group)
.await;

let fragment_mappings = fragment_ids
.into_iter()
let fragment_mappings = removed_fragments
.iter()
.map(|fragment_id| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
fragment_id: *fragment_id as _,
mapping: None,
})
.collect();
Expand All @@ -259,11 +257,10 @@ impl CatalogController {
Ok((
ReleaseContext {
database_id,
streaming_job_ids: to_drop_streaming_jobs,
state_table_ids: to_drop_state_table_ids,
source_ids: to_drop_source_ids,
connections: vec![],
source_fragments,
removed_streaming_job_ids: to_drop_streaming_jobs,
removed_state_table_ids: to_drop_state_table_ids,
removed_source_ids: to_drop_source_ids,
removed_source_fragments,
removed_actors,
removed_fragments,
},
Expand Down Expand Up @@ -418,8 +415,8 @@ impl CatalogController {
.all(&txn)
.await?;

let (source_fragments, removed_actors, removed_fragments) =
resolve_source_register_info_for_jobs(&txn, streaming_jobs.clone()).await?;
let (removed_source_fragments, removed_actors, removed_fragments) =
get_fragments_for_jobs(&txn, streaming_jobs.clone()).await?;

let state_table_ids: Vec<TableId> = Table::find()
.select_only()
Expand All @@ -445,15 +442,6 @@ impl CatalogController {
.all(&txn)
.await?;

let connections = Connection::find()
.inner_join(Object)
.filter(object::Column::DatabaseId.eq(Some(database_id)))
.all(&txn)
.await?
.into_iter()
.map(|conn| conn.connection_id)
.collect_vec();

// Find affect users with privileges on the database and the objects in the database.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
.select_only()
Expand Down Expand Up @@ -503,11 +491,10 @@ impl CatalogController {
Ok((
ReleaseContext {
database_id,
streaming_job_ids: streaming_jobs,
state_table_ids,
source_ids,
connections,
source_fragments,
removed_streaming_job_ids: streaming_jobs,
removed_state_table_ids: state_table_ids,
removed_source_ids: source_ids,
removed_source_fragments,
removed_actors,
removed_fragments,
},
Expand Down
31 changes: 10 additions & 21 deletions src/meta/src/controller/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,7 @@ use super::utils::{
rename_relation, rename_relation_refer,
};
use crate::controller::catalog::util::update_internal_tables;
use crate::controller::utils::{
build_relation_group_for_delete, check_connection_name_duplicate,
check_database_name_duplicate, check_function_signature_duplicate,
check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
extract_external_table_name_from_definition, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::utils::*;
use crate::controller::ObjectModel;
use crate::manager::{
get_referred_connection_ids_from_source, get_referred_secret_ids_from_source, MetaSrvEnv,
Expand Down Expand Up @@ -121,20 +113,17 @@ pub struct CatalogController {
#[derive(Clone, Default)]
pub struct ReleaseContext {
pub(crate) database_id: DatabaseId,
pub(crate) streaming_job_ids: Vec<ObjectId>,
pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
/// Dropped state table list, need to unregister from hummock.
pub(crate) state_table_ids: Vec<TableId>,
/// Dropped source list, need to unregister from source manager.
pub(crate) source_ids: Vec<SourceId>,
/// Dropped connection list, need to delete from vpc endpoints.
#[allow(dead_code)]
pub(crate) connections: Vec<ConnectionId>,

/// Dropped fragments that are fetching data from the target source.
pub(crate) source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
/// Dropped actors.
pub(crate) removed_actors: HashSet<ActorId>,
pub(crate) removed_state_table_ids: Vec<TableId>,

/// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
pub(crate) removed_source_ids: Vec<SourceId>,
/// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
/// need to unregister from source manager.
pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,

pub(crate) removed_actors: HashSet<ActorId>,
pub(crate) removed_fragments: HashSet<FragmentId>,
}

Expand Down
39 changes: 12 additions & 27 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
use risingwave_pb::meta::{
FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup,
};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource};
use risingwave_pb::stream_plan::PbFragmentTypeFlag;
use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject};
use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
use risingwave_sqlparser::ast::Statement as SqlStatement;
Expand Down Expand Up @@ -1059,25 +1058,11 @@ where
Ok(fragment_actors.into_iter().into_group_map())
}

/// Find the external stream source info inside the stream node, if any.
pub fn find_stream_source(stream_node: &PbStreamNode) -> Option<&StreamSource> {
if let Some(NodeBody::Source(source)) = &stream_node.node_body {
if let Some(inner) = &source.source_inner {
return Some(inner);
}
}

for child in &stream_node.input {
if let Some(source) = find_stream_source(child) {
return Some(source);
}
}

None
}

/// Resolve fragment list that are subscribing to sources and actor lists.
pub async fn resolve_source_register_info_for_jobs<C>(
/// For the given streaming jobs, returns
/// - All source fragments
/// - All actors
/// - All fragments
pub async fn get_fragments_for_jobs<C>(
db: &C,
streaming_jobs: Vec<ObjectId>,
) -> MetaResult<(
Expand Down Expand Up @@ -1113,28 +1098,28 @@ where
.all(db)
.await?;

let removed_fragments = fragments
let fragment_ids = fragments
.iter()
.map(|(fragment_id, _, _)| *fragment_id)
.collect();

let mut source_fragment_ids = HashMap::new();
let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
for (fragment_id, mask, stream_node) in fragments {
if mask & PbFragmentTypeFlag::Source as i32 == 0 {
continue;
}
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
source_fragment_ids
.entry(source.source_id as SourceId)
.or_insert_with(BTreeSet::new)
.entry(source_id as _)
.or_default()
.insert(fragment_id);
}
}

Ok((
source_fragment_ids,
actors.into_iter().collect(),
removed_fragments,
fragment_ids,
))
}

Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,24 +1252,24 @@ impl DdlController {

let ReleaseContext {
database_id,
streaming_job_ids,
state_table_ids,
source_ids,
source_fragments,
removed_streaming_job_ids,
removed_state_table_ids,
removed_source_ids,
removed_source_fragments,
removed_actors,
removed_fragments,
..
} = release_ctx;

// unregister sources.
self.source_manager
.unregister_sources(source_ids.into_iter().map(|id| id as _).collect())
.unregister_sources(removed_source_ids.into_iter().map(|id| id as _).collect())
.await;

// unregister fragments and actors from source manager.
self.source_manager
.drop_source_fragments(
source_fragments
removed_source_fragments
.into_iter()
.map(|(source_id, fragments)| {
(
Expand All @@ -1287,8 +1287,8 @@ impl DdlController {
.drop_streaming_jobs(
risingwave_common::catalog::DatabaseId::new(database_id as _),
removed_actors.into_iter().map(|id| id as _).collect(),
streaming_job_ids,
state_table_ids,
removed_streaming_job_ids,
removed_state_table_ids,
removed_fragments.iter().map(|id| *id as _).collect(),
)
.await;
Expand Down

0 comments on commit dc46eac

Please sign in to comment.