Skip to content

Commit

Permalink
chore: Remove redundant import, refactor get_fragment_mappings in cat…
Browse files Browse the repository at this point in the history
…alog (#17607)
  • Loading branch information
shanicky authored Jul 8, 2024
1 parent 0e873a0 commit dd98abf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
38 changes: 15 additions & 23 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
};
use risingwave_pb::meta::{
FragmentParallelUnitMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup,
};
use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::FragmentTypeFlag;
use risingwave_pb::user::PbUserInfo;
Expand All @@ -60,15 +58,14 @@ use sea_orm::{
};
use tokio::sync::{RwLock, RwLockReadGuard};

use super::utils::check_subscription_name_duplicate;
use super::utils::{check_subscription_name_duplicate, get_fragment_ids_by_jobs};
use crate::controller::rename::{alter_relation_rename, alter_relation_rename_refs};
use crate::controller::utils::{
check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty,
ensure_user_id, get_fragment_mappings_by_jobs, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
ensure_user_id, get_referring_objects, get_referring_objects_cascade, get_user_privilege,
list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
Expand Down Expand Up @@ -280,15 +277,13 @@ impl CatalogController {
.all(&txn)
.await?;

let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone())
let fragment_mappings = get_fragment_ids_by_jobs(&txn, streaming_jobs.clone())
.await?
.into_iter()
.map(
|FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping {
fragment_id,
mapping: None,
},
)
.map(|fragment_id| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: None,
})
.collect();

// The schema and objects in the database will be delete cascade.
Expand Down Expand Up @@ -2073,8 +2068,7 @@ impl CatalogController {
let (source_fragments, removed_actors, removed_fragments) =
resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;

let fragment_mappings =
get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
let fragment_ids = get_fragment_ids_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?;

// Find affect users with privileges on all this objects.
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
Expand Down Expand Up @@ -2174,14 +2168,12 @@ impl CatalogController {
)
.await;

let fragment_mappings = fragment_mappings
let fragment_mappings = fragment_ids
.into_iter()
.map(
|FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping {
fragment_id,
mapping: None,
},
)
.map(|fragment_id| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: None,
})
.collect();

self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
Expand Down
18 changes: 18 additions & 0 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,24 @@ where
.collect())
}

pub async fn get_fragment_ids_by_jobs<C>(
db: &C,
job_ids: Vec<ObjectId>,
) -> MetaResult<Vec<FragmentId>>
where
C: ConnectionTrait,
{
let fragment_ids: Vec<FragmentId> = Fragment::find()
.select_only()
.column(fragment::Column::FragmentId)
.filter(fragment::Column::JobId.is_in(job_ids))
.into_tuple()
.all(db)
.await?;

Ok(fragment_ids)
}

/// `get_fragment_actor_ids` returns the fragment actor ids of the given fragments.
pub async fn get_fragment_actor_ids<C>(
db: &C,
Expand Down

0 comments on commit dd98abf

Please sign in to comment.