diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index a49fc7a8a0690..cbdcd89350e8b 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -45,9 +45,7 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup, -}; +use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; @@ -60,15 +58,14 @@ use sea_orm::{ }; use tokio::sync::{RwLock, RwLockReadGuard}; -use super::utils::check_subscription_name_duplicate; +use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs}; use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs}; use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, - ensure_user_id, get_fragment_mappings_by_jobs, get_referring_objects, - get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, - resolve_source_register_info_for_jobs, PartialObject, + ensure_user_id, get_referring_objects, get_referring_objects_cascade, get_user_privilege, + list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -280,15 +277,13 @@ impl CatalogController { .all(&txn) .await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()) + let fragment_mappings = get_fragment_ids_by_jobs(&txn, streaming_jobs.clone()) .await? .into_iter() - .map( - |FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping { - fragment_id, - mapping: None, - }, - ) + .map(|fragment_id| PbFragmentWorkerSlotMapping { + fragment_id: fragment_id as _, + mapping: None, + }) .collect(); // The schema and objects in the database will be delete cascade. @@ -2073,8 +2068,7 @@ impl CatalogController { let (source_fragments, removed_actors, removed_fragments) = resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; - let fragment_mappings = - get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; + let fragment_ids = get_fragment_ids_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; // Find affect users with privileges on all this objects. let to_update_user_ids: Vec = UserPrivilege::find() @@ -2174,14 +2168,12 @@ impl CatalogController { ) .await; - let fragment_mappings = fragment_mappings + let fragment_mappings = fragment_ids .into_iter() - .map( - |FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping { - fragment_id, - mapping: None, - }, - ) + .map(|fragment_id| PbFragmentWorkerSlotMapping { + fragment_id: fragment_id as _, + mapping: None, + }) .collect(); self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 8a8f1b2d71cda..8358057663312 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -884,6 +884,24 @@ where .collect()) } +pub async fn get_fragment_ids_by_jobs( + db: &C, + job_ids: Vec, +) -> MetaResult> +where + C: ConnectionTrait, +{ + let fragment_ids: Vec = Fragment::find() + .select_only() + .column(fragment::Column::FragmentId) + .filter(fragment::Column::JobId.is_in(job_ids)) + .into_tuple() + .all(db) + .await?; + + Ok(fragment_ids) +} + /// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments. pub async fn get_fragment_actor_ids( db: &C,