diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index c057cf847c7db..59460e7966220 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; -use std::ops::Index; +use std::ops::{Index, Sub}; use educe::Educe; use itertools::Itertools; @@ -341,6 +341,12 @@ impl ActorMapping { } } +#[derive(thiserror::Error, Debug)] +pub enum ParallelUnitError { + #[error("parallel units {0:?} are not covered by the worker slot mapping")] + NotCovered(HashSet), +} + impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { @@ -382,8 +388,11 @@ impl ParallelUnitMapping { self.transform(to_map) } - /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. - pub fn to_worker_slot(&self, to_map: &HashMap) -> WorkerSlotMapping { + /// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`. + pub fn to_worker_slot( + &self, + to_map: &HashMap, + ) -> Result { let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); for (parallel_unit_id, worker_id) in to_map { worker_to_parallel_units @@ -401,7 +410,17 @@ impl ParallelUnitMapping { } } - self.transform(¶llel_unit_to_worker_slot) + let available_parallel_unit_ids: HashSet<_> = + parallel_unit_to_worker_slot.keys().copied().collect(); + + let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect(); + + let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids); + if sub_set.is_empty() { + Ok(self.transform(¶llel_unit_to_worker_slot)) + } else { + Err(ParallelUnitError::NotCovered(sub_set)) + } } /// Create a parallel unit mapping from the protobuf representation. diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 200736725cd07..a85d9077e1acd 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -67,9 +66,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, - ensure_user_id, get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, - get_referring_objects, get_referring_objects_cascade, get_user_privilege, - list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, + ensure_user_id, get_fragment_mappings_by_jobs, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -279,25 +278,13 @@ impl CatalogController { .all(&txn) .await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; - - let fragment_mappings = fragment_mappings + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()) + .await? .into_iter() .map( - |FragmentParallelUnitMapping { - fragment_id, - mapping, - }| { - PbFragmentWorkerSlotMapping { - fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - } + |FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping { + fragment_id, + mapping: None, }, ) .collect(); @@ -2108,8 +2095,6 @@ impl CatalogController { } let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - txn.commit().await?; // notify about them. @@ -2188,18 +2173,9 @@ impl CatalogController { let fragment_mappings = fragment_mappings .into_iter() .map( - |FragmentParallelUnitMapping { - fragment_id, - mapping, - }| { - PbFragmentWorkerSlotMapping { - fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - } + |FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping { + fragment_id, + mapping: None, }, ) .collect(); diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index fa53773741484..bbdde48eb6a0a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -78,19 +78,12 @@ impl CatalogControllerInner { let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - Ok(fragment_mappings - .into_iter() - .map(move |(fragment_id, mapping)| { - let worker_slot_mapping = - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(); + let mappings = CatalogController::convert_fragment_mappings( + fragment_mappings, + ¶llel_unit_to_worker, + )?; - FragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some(worker_slot_mapping), - } - })) + Ok(mappings.into_iter()) } } @@ -952,27 +945,37 @@ impl CatalogController { let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let fragment_worker_slot_mapping = + Self::convert_fragment_mappings(fragment_mapping, ¶llel_unit_to_worker)?; + txn.commit().await?; - self.notify_fragment_mapping( - NotificationOperation::Update, - fragment_mapping - .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - }) - .collect(), - ) - .await; + self.notify_fragment_mapping(NotificationOperation::Update, fragment_worker_slot_mapping) + .await; Ok(()) } + pub(crate) fn convert_fragment_mappings( + fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)>, + parallel_unit_to_worker: &HashMap, + ) -> MetaResult> { + let mut result = vec![]; + + for (fragment_id, mapping) in fragment_mappings { + result.push(PbFragmentWorkerSlotMapping { + fragment_id: fragment_id as _, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker_slot(parallel_unit_to_worker)? + .to_protobuf(), + ), + }) + } + + Ok(result) + } + pub async fn all_inuse_parallel_units(&self) -> MetaResult> { let inner = self.inner.read().await; let parallel_units: Vec = Actor::find() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 3066ce223785e..ad2b4037cc45d 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1454,7 +1454,7 @@ impl CatalogController { fragment.update(&txn).await?; let worker_slot_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) - .to_worker_slot(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker)? .to_protobuf(); fragment_mapping_to_notify.push(FragmentWorkerSlotMapping { diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index b98788248a115..0766a1d59726d 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -16,7 +16,6 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; @@ -43,6 +42,7 @@ use sea_orm::{ Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, }; +use crate::controller::catalog::CatalogController; use crate::{MetaError, MetaResult}; /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. @@ -852,17 +852,7 @@ where .all(db) .await?; - Ok(fragment_mappings - .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - }) - .collect()) + CatalogController::convert_fragment_mappings(fragment_mappings, ¶llel_unit_to_worker) } /// `get_fragment_mappings_by_jobs` returns the fragment vnode mappings of the given job list. diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a8..9d3d558ac4839 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::error::BoxedError; +use risingwave_common::hash::ParallelUnitError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -125,6 +126,13 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, + + #[error("ParallelUnit error: {0}")] + ParallelUnit( + #[from] + #[backtrace] + ParallelUnitError, + ), } impl MetaError { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 9cb0d25a09ae0..4b4075f6e3a71 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -69,10 +69,13 @@ impl FragmentManagerCore { .values() .map(move |fragment| FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some(FragmentManager::convert_mapping( - &table_fragments.actor_status, - fragment.vnode_mapping.as_ref().unwrap(), - )), + mapping: Some( + FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + ) + .unwrap(), + ), }) }) } @@ -198,15 +201,23 @@ impl FragmentManager { async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let fragment_mapping = FragmentWorkerSlotMapping { - fragment_id: fragment.fragment_id, - mapping: Some(Self::convert_mapping( - &table_fragment.actor_status, - fragment - .vnode_mapping - .as_ref() - .expect("no data distribution found"), - )), + let vnode_mapping = fragment + .vnode_mapping + .as_ref() + .expect("no data distribution found"); + + let fragment_mapping = if let Operation::Delete = operation { + FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: None, + } + } else { + FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: Some( + Self::convert_mapping(&table_fragment.actor_status, vnode_mapping).unwrap(), + ), + } }; self.env @@ -1273,7 +1284,7 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); - let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); + let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping)?; // Notify fragment mapping to frontend nodes. let fragment_mapping = FragmentWorkerSlotMapping { @@ -1410,7 +1421,7 @@ impl FragmentManager { fn convert_mapping( actor_status: &BTreeMap, vnode_mapping: &PbParallelUnitMapping, - ) -> PbWorkerSlotMapping { + ) -> MetaResult { let parallel_unit_to_worker = actor_status .values() .map(|actor_status| { @@ -1419,9 +1430,9 @@ impl FragmentManager { }) .collect(); - ParallelUnitMapping::from_protobuf(vnode_mapping) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf() + Ok(ParallelUnitMapping::from_protobuf(vnode_mapping) + .to_worker_slot(¶llel_unit_to_worker)? + .to_protobuf()) } pub async fn table_node_actors(