diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index c057cf847c7d..d08b82dd8bb1 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; -use std::ops::Index; +use std::ops::{Index, Sub}; use educe::Educe; use itertools::Itertools; @@ -341,6 +341,12 @@ impl ActorMapping { } } +#[derive(thiserror::Error, Debug)] +pub enum ParallelUnitError { + #[error("parallel units {0:?} are not covered by the worker slot mapping")] + NotCovered(HashSet), +} + impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { @@ -382,8 +388,18 @@ impl ParallelUnitMapping { self.transform(to_map) } - /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. - pub fn to_worker_slot(&self, to_map: &HashMap) -> WorkerSlotMapping { + pub fn as_delete_worker_slot_mapping(&self) -> WorkerSlotMapping { + VnodeMapping { + original_indices: self.original_indices.clone(), + data: self.data.iter().map(|_| WorkerSlotId(u64::MAX)).collect(), + } + } + + /// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`. + pub fn to_worker_slot( + &self, + to_map: &HashMap, + ) -> Result { let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); for (parallel_unit_id, worker_id) in to_map { worker_to_parallel_units @@ -401,7 +417,17 @@ impl ParallelUnitMapping { } } - self.transform(¶llel_unit_to_worker_slot) + let available_parallel_unit_ids: HashSet<_> = + parallel_unit_to_worker_slot.keys().copied().collect(); + + let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect(); + + let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids); + if sub_set.is_empty() { + Ok(self.transform(¶llel_unit_to_worker_slot)) + } else { + Err(ParallelUnitError::NotCovered(sub_set)) + } } /// Create a parallel unit mapping from the protobuf representation. diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 050539b12c06..5f057c076a6a 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -281,8 +281,6 @@ impl CatalogController { .all(&txn) .await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; let fragment_mappings = fragment_mappings @@ -296,7 +294,7 @@ impl CatalogController { fragment_id, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker_slot(¶llel_unit_to_worker) + .as_delete_worker_slot_mapping() .to_protobuf(), ), } @@ -2111,8 +2109,6 @@ impl CatalogController { } let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - txn.commit().await?; // notify about them. @@ -2199,7 +2195,7 @@ impl CatalogController { fragment_id, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker_slot(¶llel_unit_to_worker) + .as_delete_worker_slot_mapping() .to_protobuf(), ), } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index fb8810071f16..8fd1dc59df71 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; @@ -79,19 +79,12 @@ impl CatalogControllerInner { let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - Ok(fragment_mappings - .into_iter() - .map(move |(fragment_id, mapping)| { - let worker_slot_mapping = - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(); + let mappings = CatalogController::convert_fragment_mappings( + fragment_mappings, + ¶llel_unit_to_worker, + )?; - FragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some(worker_slot_mapping), - } - })) + Ok(mappings.into_iter()) } } @@ -960,27 +953,37 @@ impl CatalogController { let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let fragment_worker_slot_mapping = + Self::convert_fragment_mappings(fragment_mapping, ¶llel_unit_to_worker)?; + txn.commit().await?; - self.notify_fragment_mapping( - NotificationOperation::Update, - fragment_mapping - .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - }) - .collect(), - ) - .await; + self.notify_fragment_mapping(NotificationOperation::Update, fragment_worker_slot_mapping) + .await; Ok(()) } + pub(crate) fn convert_fragment_mappings( + fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)>, + parallel_unit_to_worker: &HashMap, + ) -> MetaResult> { + let mut result = vec![]; + + for (fragment_id, mapping) in fragment_mappings { + result.push(PbFragmentWorkerSlotMapping { + fragment_id: fragment_id as _, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker_slot(¶llel_unit_to_worker)? + .to_protobuf(), + ), + }) + } + + Ok(result) + } + pub async fn all_inuse_parallel_units(&self) -> MetaResult> { let inner = self.inner.read().await; let parallel_units: Vec = Actor::find() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index ad852cf8ac51..3e2c5db4776d 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1443,7 +1443,7 @@ impl CatalogController { fragment.update(&txn).await?; let worker_slot_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) - .to_worker_slot(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker)? .to_protobuf(); fragment_mapping_to_notify.push(FragmentWorkerSlotMapping { diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 4597170ec364..0f5af2816b06 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -43,6 +43,7 @@ use sea_orm::{ Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, }; +use crate::controller::catalog::CatalogController; use crate::{MetaError, MetaResult}; /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. @@ -852,17 +853,7 @@ where .all(db) .await?; - Ok(fragment_mappings - .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - }) - .collect()) + CatalogController::convert_fragment_mappings(fragment_mappings, ¶llel_unit_to_worker) } /// `get_fragment_mappings_by_jobs` returns the fragment vnode mappings of the given job list. diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a..9d3d558ac483 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::error::BoxedError; +use risingwave_common::hash::ParallelUnitError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -125,6 +126,13 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, + + #[error("ParallelUnit error: {0}")] + ParallelUnit( + #[from] + #[backtrace] + ParallelUnitError, + ), } impl MetaError { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 6c8390ff9299..3ef9ab8e191a 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -70,10 +70,13 @@ impl FragmentManagerCore { .values() .map(move |fragment| FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some(FragmentManager::convert_mapping( - &table_fragments.actor_status, - fragment.vnode_mapping.as_ref().unwrap(), - )), + mapping: Some( + FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + ) + .unwrap(), + ), }) }) } @@ -208,15 +211,27 @@ impl FragmentManager { async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let fragment_mapping = FragmentWorkerSlotMapping { - fragment_id: fragment.fragment_id, - mapping: Some(Self::convert_mapping( - &table_fragment.actor_status, - fragment - .vnode_mapping - .as_ref() - .expect("no data distribution found"), - )), + let vnode_mapping = fragment + .vnode_mapping + .as_ref() + .expect("no data distribution found"); + + let fragment_mapping = if let Operation::Delete = operation { + FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: Some( + ParallelUnitMapping::from_protobuf(vnode_mapping) + .as_delete_worker_slot_mapping() + .to_protobuf(), + ), + } + } else { + FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: Some( + Self::convert_mapping(&table_fragment.actor_status, vnode_mapping).unwrap(), + ), + } }; self.env @@ -1289,7 +1304,7 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); - let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); + let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping)?; // Notify fragment mapping to frontend nodes. let fragment_mapping = FragmentWorkerSlotMapping { @@ -1426,7 +1441,7 @@ impl FragmentManager { fn convert_mapping( actor_status: &BTreeMap, vnode_mapping: &PbParallelUnitMapping, - ) -> PbWorkerSlotMapping { + ) -> MetaResult { let parallel_unit_to_worker = actor_status .values() .map(|actor_status| { @@ -1435,9 +1450,9 @@ impl FragmentManager { }) .collect(); - ParallelUnitMapping::from_protobuf(vnode_mapping) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf() + Ok(ParallelUnitMapping::from_protobuf(vnode_mapping) + .to_worker_slot(¶llel_unit_to_worker)? + .to_protobuf()) } pub async fn table_node_actors(