Skip to content

Commit

Permalink
fix: Fix the panic during parallel unit mapping conversion. (#17318)
Browse files Browse the repository at this point in the history
Co-authored-by: Shanicky Chen <>
  • Loading branch information
shanicky authored Jun 20, 2024
1 parent 05268bc commit 7df77a4
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 98 deletions.
29 changes: 24 additions & 5 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::ops::Index;
use std::ops::{Index, Sub};

use educe::Educe;
use itertools::Itertools;
Expand Down Expand Up @@ -341,6 +341,12 @@ impl ActorMapping {
}
}

#[derive(thiserror::Error, Debug)]
pub enum ParallelUnitError {
#[error("parallel units {0:?} are not covered by the worker slot mapping")]
NotCovered(HashSet<ParallelUnitId>),
}

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Expand Down Expand Up @@ -382,8 +388,11 @@ impl ParallelUnitMapping {
self.transform(to_map)
}

/// Transform this parallel unit mapping to an worker mapping, essentially `transform`.
pub fn to_worker_slot(&self, to_map: &HashMap<ParallelUnitId, u32>) -> WorkerSlotMapping {
/// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`.
pub fn to_worker_slot(
&self,
to_map: &HashMap<ParallelUnitId, u32>,
) -> Result<WorkerSlotMapping, ParallelUnitError> {
let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new();
for (parallel_unit_id, worker_id) in to_map {
worker_to_parallel_units
Expand All @@ -401,7 +410,17 @@ impl ParallelUnitMapping {
}
}

self.transform(&parallel_unit_to_worker_slot)
let available_parallel_unit_ids: HashSet<_> =
parallel_unit_to_worker_slot.keys().copied().collect();

let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect();

let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids);
if sub_set.is_empty() {
Ok(self.transform(&parallel_unit_to_worker_slot))
} else {
Err(ParallelUnitError::NotCovered(sub_set))
}
}

/// Create a parallel unit mapping from the protobuf representation.
Expand Down
46 changes: 11 additions & 35 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;
use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS};
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
use risingwave_common::{bail, current_cluster_version};
use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
Expand Down Expand Up @@ -67,9 +66,9 @@ use crate::controller::utils::{
check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
check_secret_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty,
ensure_user_id, get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map,
get_referring_objects, get_referring_objects_cascade, get_user_privilege,
list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject,
ensure_user_id, get_fragment_mappings_by_jobs, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
Expand Down Expand Up @@ -281,25 +280,13 @@ impl CatalogController {
.all(&txn)
.await?;

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?;

let fragment_mappings = fragment_mappings
let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone())
.await?
.into_iter()
.map(
|FragmentParallelUnitMapping {
fragment_id,
mapping,
}| {
PbFragmentWorkerSlotMapping {
fragment_id,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.unwrap())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf(),
),
}
|FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping {
fragment_id,
mapping: None,
},
)
.collect();
Expand Down Expand Up @@ -2111,8 +2098,6 @@ impl CatalogController {
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

txn.commit().await?;

// notify about them.
Expand Down Expand Up @@ -2191,18 +2176,9 @@ impl CatalogController {
let fragment_mappings = fragment_mappings
.into_iter()
.map(
|FragmentParallelUnitMapping {
fragment_id,
mapping,
}| {
PbFragmentWorkerSlotMapping {
fragment_id,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.unwrap())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf(),
),
}
|FragmentParallelUnitMapping { fragment_id, .. }| PbFragmentWorkerSlotMapping {
fragment_id,
mapping: None,
},
)
.collect();
Expand Down
57 changes: 30 additions & 27 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,12 @@ impl CatalogControllerInner {

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

Ok(fragment_mappings
.into_iter()
.map(move |(fragment_id, mapping)| {
let worker_slot_mapping =
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf();
let mappings = CatalogController::convert_fragment_mappings(
fragment_mappings,
&parallel_unit_to_worker,
)?;

FragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(worker_slot_mapping),
}
}))
Ok(mappings.into_iter())
}
}

Expand Down Expand Up @@ -960,27 +953,37 @@ impl CatalogController {

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

let fragment_worker_slot_mapping =
Self::convert_fragment_mappings(fragment_mapping, &parallel_unit_to_worker)?;

txn.commit().await?;

self.notify_fragment_mapping(
NotificationOperation::Update,
fragment_mapping
.into_iter()
.map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf(),
),
})
.collect(),
)
.await;
self.notify_fragment_mapping(NotificationOperation::Update, fragment_worker_slot_mapping)
.await;

Ok(())
}

pub(crate) fn convert_fragment_mappings(
fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)>,
parallel_unit_to_worker: &HashMap<u32, u32>,
) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>> {
let mut result = vec![];

for (fragment_id, mapping) in fragment_mappings {
result.push(PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(parallel_unit_to_worker)?
.to_protobuf(),
),
})
}

Ok(result)
}

pub async fn all_inuse_parallel_units(&self) -> MetaResult<Vec<i32>> {
let inner = self.inner.read().await;
let parallel_units: Vec<i32> = Actor::find()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ impl CatalogController {
fragment.update(&txn).await?;

let worker_slot_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping)
.to_worker_slot(&parallel_unit_to_worker)
.to_worker_slot(&parallel_unit_to_worker)?
.to_protobuf();

fragment_mapping_to_notify.push(FragmentWorkerSlotMapping {
Expand Down
14 changes: 2 additions & 12 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{BTreeSet, HashMap, HashSet};

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_meta_model_migration::WithQuery;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
Expand All @@ -43,6 +42,7 @@ use sea_orm::{
Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement,
};

use crate::controller::catalog::CatalogController;
use crate::{MetaError, MetaResult};

/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
Expand Down Expand Up @@ -852,17 +852,7 @@ where
.all(db)
.await?;

Ok(fragment_mappings
.into_iter()
.map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf(),
),
})
.collect())
CatalogController::convert_fragment_mappings(fragment_mappings, &parallel_unit_to_worker)
}

/// `get_fragment_mappings_by_jobs` returns the fragment vnode mappings of the given job list.
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use risingwave_common::error::BoxedError;
use risingwave_common::hash::ParallelUnitError;
use risingwave_common::session_config::SessionConfigError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
Expand Down Expand Up @@ -125,6 +126,13 @@ pub enum MetaErrorInner {
// Indicates that recovery was triggered manually.
#[error("adhoc recovery triggered")]
AdhocRecovery,

#[error("ParallelUnit error: {0}")]
ParallelUnit(
#[from]
#[backtrace]
ParallelUnitError,
),
}

impl MetaError {
Expand Down
47 changes: 29 additions & 18 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ impl FragmentManagerCore {
.values()
.map(move |fragment| FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(FragmentManager::convert_mapping(
&table_fragments.actor_status,
fragment.vnode_mapping.as_ref().unwrap(),
)),
mapping: Some(
FragmentManager::convert_mapping(
&table_fragments.actor_status,
fragment.vnode_mapping.as_ref().unwrap(),
)
.unwrap(),
),
})
})
}
Expand Down Expand Up @@ -208,15 +211,23 @@ impl FragmentManager {
async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) {
// Notify all fragment mapping to frontend nodes
for fragment in table_fragment.fragments.values() {
let fragment_mapping = FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(Self::convert_mapping(
&table_fragment.actor_status,
fragment
.vnode_mapping
.as_ref()
.expect("no data distribution found"),
)),
let vnode_mapping = fragment
.vnode_mapping
.as_ref()
.expect("no data distribution found");

let fragment_mapping = if let Operation::Delete = operation {
FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: None,
}
} else {
FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(
Self::convert_mapping(&table_fragment.actor_status, vnode_mapping).unwrap(),
),
}
};

self.env
Expand Down Expand Up @@ -1289,7 +1300,7 @@ impl FragmentManager {

*fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone();

let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping);
let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping)?;

// Notify fragment mapping to frontend nodes.
let fragment_mapping = FragmentWorkerSlotMapping {
Expand Down Expand Up @@ -1426,7 +1437,7 @@ impl FragmentManager {
fn convert_mapping(
actor_status: &BTreeMap<ActorId, ActorStatus>,
vnode_mapping: &PbParallelUnitMapping,
) -> PbWorkerSlotMapping {
) -> MetaResult<PbWorkerSlotMapping> {
let parallel_unit_to_worker = actor_status
.values()
.map(|actor_status| {
Expand All @@ -1435,9 +1446,9 @@ impl FragmentManager {
})
.collect();

ParallelUnitMapping::from_protobuf(vnode_mapping)
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf()
Ok(ParallelUnitMapping::from_protobuf(vnode_mapping)
.to_worker_slot(&parallel_unit_to_worker)?
.to_protobuf())
}

pub async fn table_node_actors(
Expand Down

0 comments on commit 7df77a4

Please sign in to comment.