From 94e3b080bee2a1e9382b1be2bb5494f2e1e2dc8e Mon Sep 17 00:00:00 2001 From: Shanicky Chen <> Date: Tue, 18 Jun 2024 19:10:48 +0800 Subject: [PATCH 1/4] Add ParallelUnitError and update mappings in Rust files --- .../src/hash/consistent_hash/mapping.rs | 36 +++++++++-- src/meta/src/controller/catalog.rs | 8 +-- src/meta/src/controller/fragment.rs | 59 ++++++++++--------- src/meta/src/controller/streaming_job.rs | 2 +- src/meta/src/controller/utils.rs | 13 +--- src/meta/src/error.rs | 8 +++ src/meta/src/manager/catalog/fragment.rs | 51 ++++++++++------ 7 files changed, 108 insertions(+), 69 deletions(-) diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index c057cf847c7db..d08b82dd8bb1e 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; -use std::ops::Index; +use std::ops::{Index, Sub}; use educe::Educe; use itertools::Itertools; @@ -341,6 +341,12 @@ impl ActorMapping { } } +#[derive(thiserror::Error, Debug)] +pub enum ParallelUnitError { + #[error("parallel units {0:?} are not covered by the worker slot mapping")] + NotCovered(HashSet), +} + impl WorkerSlotMapping { /// Create a uniform worker mapping from the given worker ids pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self { @@ -382,8 +388,18 @@ impl ParallelUnitMapping { self.transform(to_map) } - /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. - pub fn to_worker_slot(&self, to_map: &HashMap) -> WorkerSlotMapping { + pub fn as_delete_worker_slot_mapping(&self) -> WorkerSlotMapping { + VnodeMapping { + original_indices: self.original_indices.clone(), + data: self.data.iter().map(|_| WorkerSlotId(u64::MAX)).collect(), + } + } + + /// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`. + pub fn to_worker_slot( + &self, + to_map: &HashMap, + ) -> Result { let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new(); for (parallel_unit_id, worker_id) in to_map { worker_to_parallel_units @@ -401,7 +417,17 @@ impl ParallelUnitMapping { } } - self.transform(¶llel_unit_to_worker_slot) + let available_parallel_unit_ids: HashSet<_> = + parallel_unit_to_worker_slot.keys().copied().collect(); + + let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect(); + + let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids); + if sub_set.is_empty() { + Ok(self.transform(¶llel_unit_to_worker_slot)) + } else { + Err(ParallelUnitError::NotCovered(sub_set)) + } } /// Create a parallel unit mapping from the protobuf representation. diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 050539b12c06f..5f057c076a6a0 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -281,8 +281,6 @@ impl CatalogController { .all(&txn) .await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; let fragment_mappings = fragment_mappings @@ -296,7 +294,7 @@ impl CatalogController { fragment_id, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker_slot(¶llel_unit_to_worker) + .as_delete_worker_slot_mapping() .to_protobuf(), ), } @@ -2111,8 +2109,6 @@ impl CatalogController { } let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; - let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - txn.commit().await?; // notify about them. @@ -2199,7 +2195,7 @@ impl CatalogController { fragment_id, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .to_worker_slot(¶llel_unit_to_worker) + .as_delete_worker_slot_mapping() .to_protobuf(), ), } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index fb8810071f162..8fd1dc59df71c 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; @@ -79,19 +79,12 @@ impl CatalogControllerInner { let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; - Ok(fragment_mappings - .into_iter() - .map(move |(fragment_id, mapping)| { - let worker_slot_mapping = - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(); + let mappings = CatalogController::convert_fragment_mappings( + fragment_mappings, + ¶llel_unit_to_worker, + )?; - FragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some(worker_slot_mapping), - } - })) + Ok(mappings.into_iter()) } } @@ -960,27 +953,37 @@ impl CatalogController { let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let fragment_worker_slot_mapping = + Self::convert_fragment_mappings(fragment_mapping, ¶llel_unit_to_worker)?; + txn.commit().await?; - self.notify_fragment_mapping( - NotificationOperation::Update, - fragment_mapping - .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - }) - .collect(), - ) - .await; + self.notify_fragment_mapping(NotificationOperation::Update, fragment_worker_slot_mapping) + .await; Ok(()) } + pub(crate) fn convert_fragment_mappings( + fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)>, + parallel_unit_to_worker: &HashMap, + ) -> MetaResult> { + let mut result = vec![]; + + for (fragment_id, mapping) in fragment_mappings { + result.push(PbFragmentWorkerSlotMapping { + fragment_id: fragment_id as _, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker_slot(¶llel_unit_to_worker)? + .to_protobuf(), + ), + }) + } + + Ok(result) + } + pub async fn all_inuse_parallel_units(&self) -> MetaResult> { let inner = self.inner.read().await; let parallel_units: Vec = Actor::find() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index ad852cf8ac51c..3e2c5db4776db 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1443,7 +1443,7 @@ impl CatalogController { fragment.update(&txn).await?; let worker_slot_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) - .to_worker_slot(¶llel_unit_to_worker) + .to_worker_slot(¶llel_unit_to_worker)? .to_protobuf(); fragment_mapping_to_notify.push(FragmentWorkerSlotMapping { diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 4597170ec3646..0f5af2816b062 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -43,6 +43,7 @@ use sea_orm::{ Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement, }; +use crate::controller::catalog::CatalogController; use crate::{MetaError, MetaResult}; /// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object. @@ -852,17 +853,7 @@ where .all(db) .await?; - Ok(fragment_mappings - .into_iter() - .map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping { - fragment_id: fragment_id as _, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf(), - ), - }) - .collect()) + CatalogController::convert_fragment_mappings(fragment_mappings, ¶llel_unit_to_worker) } /// `get_fragment_mappings_by_jobs` returns the fragment vnode mappings of the given job list. diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 8aeaed2f9c5a8..9d3d558ac4839 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::error::BoxedError; +use risingwave_common::hash::ParallelUnitError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -125,6 +126,13 @@ pub enum MetaErrorInner { // Indicates that recovery was triggered manually. #[error("adhoc recovery triggered")] AdhocRecovery, + + #[error("ParallelUnit error: {0}")] + ParallelUnit( + #[from] + #[backtrace] + ParallelUnitError, + ), } impl MetaError { diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 6c8390ff92991..3ef9ab8e191ae 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -70,10 +70,13 @@ impl FragmentManagerCore { .values() .map(move |fragment| FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some(FragmentManager::convert_mapping( - &table_fragments.actor_status, - fragment.vnode_mapping.as_ref().unwrap(), - )), + mapping: Some( + FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + ) + .unwrap(), + ), }) }) } @@ -208,15 +211,27 @@ impl FragmentManager { async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let fragment_mapping = FragmentWorkerSlotMapping { - fragment_id: fragment.fragment_id, - mapping: Some(Self::convert_mapping( - &table_fragment.actor_status, - fragment - .vnode_mapping - .as_ref() - .expect("no data distribution found"), - )), + let vnode_mapping = fragment + .vnode_mapping + .as_ref() + .expect("no data distribution found"); + + let fragment_mapping = if let Operation::Delete = operation { + FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: Some( + ParallelUnitMapping::from_protobuf(vnode_mapping) + .as_delete_worker_slot_mapping() + .to_protobuf(), + ), + } + } else { + FragmentWorkerSlotMapping { + fragment_id: fragment.fragment_id, + mapping: Some( + Self::convert_mapping(&table_fragment.actor_status, vnode_mapping).unwrap(), + ), + } }; self.env @@ -1289,7 +1304,7 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); - let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); + let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping)?; // Notify fragment mapping to frontend nodes. let fragment_mapping = FragmentWorkerSlotMapping { @@ -1426,7 +1441,7 @@ impl FragmentManager { fn convert_mapping( actor_status: &BTreeMap, vnode_mapping: &PbParallelUnitMapping, - ) -> PbWorkerSlotMapping { + ) -> MetaResult { let parallel_unit_to_worker = actor_status .values() .map(|actor_status| { @@ -1435,9 +1450,9 @@ impl FragmentManager { }) .collect(); - ParallelUnitMapping::from_protobuf(vnode_mapping) - .to_worker_slot(¶llel_unit_to_worker) - .to_protobuf() + Ok(ParallelUnitMapping::from_protobuf(vnode_mapping) + .to_worker_slot(¶llel_unit_to_worker)? + .to_protobuf()) } pub async fn table_node_actors( From b95207892dd29578869533d797d6d39ae0faa18d Mon Sep 17 00:00:00 2001 From: Shanicky Chen <> Date: Wed, 19 Jun 2024 15:09:27 +0800 Subject: [PATCH 2/4] Refactor: Remove funcs, set PbFragment mapping to None --- src/meta/src/controller/catalog.rs | 18 +++++------------- src/meta/src/controller/fragment.rs | 4 ++-- src/meta/src/controller/utils.rs | 1 - src/meta/src/manager/catalog/fragment.rs | 6 +----- 4 files changed, 8 insertions(+), 21 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 5f057c076a6a0..73f63985001e3 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -67,9 +67,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, - ensure_user_id, get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, - get_referring_objects, get_referring_objects_cascade, get_user_privilege, - list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject, + ensure_user_id, get_fragment_mappings_by_jobs, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -292,11 +292,7 @@ impl CatalogController { }| { PbFragmentWorkerSlotMapping { fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .as_delete_worker_slot_mapping() - .to_protobuf(), - ), + mapping: None, } }, ) @@ -2193,11 +2189,7 @@ impl CatalogController { }| { PbFragmentWorkerSlotMapping { fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(&mapping.unwrap()) - .as_delete_worker_slot_mapping() - .to_protobuf(), - ), + mapping: None, } }, ) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 8fd1dc59df71c..e88046ed17467 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; @@ -975,7 +975,7 @@ impl CatalogController { fragment_id: fragment_id as _, mapping: Some( ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) - .to_worker_slot(¶llel_unit_to_worker)? + .to_worker_slot(parallel_unit_to_worker)? .to_protobuf(), ), }) diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 0f5af2816b062..8a8f1b2d71cda 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -16,7 +16,6 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::anyhow; use itertools::Itertools; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 3ef9ab8e191ae..537e2fac01f22 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -219,11 +219,7 @@ impl FragmentManager { let fragment_mapping = if let Operation::Delete = operation { FragmentWorkerSlotMapping { fragment_id: fragment.fragment_id, - mapping: Some( - ParallelUnitMapping::from_protobuf(vnode_mapping) - .as_delete_worker_slot_mapping() - .to_protobuf(), - ), + mapping: None, } } else { FragmentWorkerSlotMapping { From be8eb793a2cd5fc6d5218730acd3ea0c44654f25 Mon Sep 17 00:00:00 2001 From: Shanicky Chen <> Date: Wed, 19 Jun 2024 15:12:09 +0800 Subject: [PATCH 3/4] Simplify PbFragmentWorkerSlotMapping creation in CatalogController --- src/meta/src/controller/catalog.rs | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 73f63985001e3..2301f7699ed26 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; -use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -281,19 +280,13 @@ impl CatalogController { .all(&txn) .await?; - let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; - - let fragment_mappings = fragment_mappings + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()) + .await? .into_iter() .map( - |FragmentParallelUnitMapping { - fragment_id, - mapping, - }| { - PbFragmentWorkerSlotMapping { - fragment_id, - mapping: None, - } + |FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping { + fragment_id, + mapping: None, }, ) .collect(); @@ -2183,14 +2176,9 @@ impl CatalogController { let fragment_mappings = fragment_mappings .into_iter() .map( - |FragmentParallelUnitMapping { - fragment_id, - mapping, - }| { - PbFragmentWorkerSlotMapping { - fragment_id, - mapping: None, - } + |FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping { + fragment_id, + mapping: None, }, ) .collect(); From 5850b8c53140c51259e0b8963e8d3f447ea3bba6 Mon Sep 17 00:00:00 2001 From: Shanicky Chen <> Date: Thu, 20 Jun 2024 15:02:56 +0800 Subject: [PATCH 4/4] Removed `as_delete_worker_slot_mapping` from `ParallelUnitMapping` --- src/common/src/hash/consistent_hash/mapping.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index d08b82dd8bb1e..59460e7966220 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -388,13 +388,6 @@ impl ParallelUnitMapping { self.transform(to_map) } - pub fn as_delete_worker_slot_mapping(&self) -> WorkerSlotMapping { - VnodeMapping { - original_indices: self.original_indices.clone(), - data: self.data.iter().map(|_| WorkerSlotId(u64::MAX)).collect(), - } - } - /// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`. pub fn to_worker_slot( &self,