Skip to content

Commit

Permalink
Add ParallelUnitError and update mappings in Rust files
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed Jun 18, 2024
1 parent fbb597f commit 94e3b08
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 69 deletions.
36 changes: 31 additions & 5 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::ops::Index;
use std::ops::{Index, Sub};

use educe::Educe;
use itertools::Itertools;
Expand Down Expand Up @@ -341,6 +341,12 @@ impl ActorMapping {
}
}

#[derive(thiserror::Error, Debug)]
pub enum ParallelUnitError {
#[error("parallel units {0:?} are not covered by the worker slot mapping")]
NotCovered(HashSet<ParallelUnitId>),
}

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Expand Down Expand Up @@ -382,8 +388,18 @@ impl ParallelUnitMapping {
self.transform(to_map)
}

/// Transform this parallel unit mapping to an worker mapping, essentially `transform`.
pub fn to_worker_slot(&self, to_map: &HashMap<ParallelUnitId, u32>) -> WorkerSlotMapping {
pub fn as_delete_worker_slot_mapping(&self) -> WorkerSlotMapping {
VnodeMapping {
original_indices: self.original_indices.clone(),
data: self.data.iter().map(|_| WorkerSlotId(u64::MAX)).collect(),
}
}

/// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`.
pub fn to_worker_slot(
&self,
to_map: &HashMap<ParallelUnitId, u32>,
) -> Result<WorkerSlotMapping, ParallelUnitError> {
let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new();
for (parallel_unit_id, worker_id) in to_map {
worker_to_parallel_units
Expand All @@ -401,7 +417,17 @@ impl ParallelUnitMapping {
}
}

self.transform(&parallel_unit_to_worker_slot)
let available_parallel_unit_ids: HashSet<_> =
parallel_unit_to_worker_slot.keys().copied().collect();

let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect();

let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids);
if sub_set.is_empty() {
Ok(self.transform(&parallel_unit_to_worker_slot))
} else {
Err(ParallelUnitError::NotCovered(sub_set))
}
}

/// Create a parallel unit mapping from the protobuf representation.
Expand Down
8 changes: 2 additions & 6 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,6 @@ impl CatalogController {
.all(&txn)
.await?;

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?;

let fragment_mappings = fragment_mappings
Expand All @@ -296,7 +294,7 @@ impl CatalogController {
fragment_id,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.unwrap())
.to_worker_slot(&parallel_unit_to_worker)
.as_delete_worker_slot_mapping()
.to_protobuf(),
),
}
Expand Down Expand Up @@ -2111,8 +2109,6 @@ impl CatalogController {
}
let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?;

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

txn.commit().await?;

// notify about them.
Expand Down Expand Up @@ -2199,7 +2195,7 @@ impl CatalogController {
fragment_id,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.unwrap())
.to_worker_slot(&parallel_unit_to_worker)
.as_delete_worker_slot_mapping()
.to_protobuf(),
),
}
Expand Down
59 changes: 31 additions & 28 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::mem::swap;
use anyhow::Context;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
Expand Down Expand Up @@ -79,19 +79,12 @@ impl CatalogControllerInner {

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

Ok(fragment_mappings
.into_iter()
.map(move |(fragment_id, mapping)| {
let worker_slot_mapping =
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf();
let mappings = CatalogController::convert_fragment_mappings(
fragment_mappings,
&parallel_unit_to_worker,
)?;

FragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(worker_slot_mapping),
}
}))
Ok(mappings.into_iter())
}
}

Expand Down Expand Up @@ -960,27 +953,37 @@ impl CatalogController {

let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?;

let fragment_worker_slot_mapping =
Self::convert_fragment_mappings(fragment_mapping, &parallel_unit_to_worker)?;

txn.commit().await?;

self.notify_fragment_mapping(
NotificationOperation::Update,
fragment_mapping
.into_iter()
.map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf(),
),
})
.collect(),
)
.await;
self.notify_fragment_mapping(NotificationOperation::Update, fragment_worker_slot_mapping)
.await;

Ok(())
}

pub(crate) fn convert_fragment_mappings(
fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)>,
parallel_unit_to_worker: &HashMap<u32, u32>,
) -> MetaResult<Vec<PbFragmentWorkerSlotMapping>> {
let mut result = vec![];

for (fragment_id, mapping) in fragment_mappings {
result.push(PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)?
.to_protobuf(),
),
})
}

Ok(result)
}

pub async fn all_inuse_parallel_units(&self) -> MetaResult<Vec<i32>> {
let inner = self.inner.read().await;
let parallel_units: Vec<i32> = Actor::find()
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ impl CatalogController {
fragment.update(&txn).await?;

let worker_slot_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping)
.to_worker_slot(&parallel_unit_to_worker)
.to_worker_slot(&parallel_unit_to_worker)?
.to_protobuf();

fragment_mapping_to_notify.push(FragmentWorkerSlotMapping {
Expand Down
13 changes: 2 additions & 11 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use sea_orm::{
Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement,
};

use crate::controller::catalog::CatalogController;
use crate::{MetaError, MetaResult};

/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
Expand Down Expand Up @@ -852,17 +853,7 @@ where
.all(db)
.await?;

Ok(fragment_mappings
.into_iter()
.map(|(fragment_id, mapping)| PbFragmentWorkerSlotMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.to_protobuf())
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf(),
),
})
.collect())
CatalogController::convert_fragment_mappings(fragment_mappings, &parallel_unit_to_worker)
}

/// `get_fragment_mappings_by_jobs` returns the fragment vnode mappings of the given job list.
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use risingwave_common::error::BoxedError;
use risingwave_common::hash::ParallelUnitError;
use risingwave_common::session_config::SessionConfigError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
Expand Down Expand Up @@ -125,6 +126,13 @@ pub enum MetaErrorInner {
// Indicates that recovery was triggered manually.
#[error("adhoc recovery triggered")]
AdhocRecovery,

#[error("ParallelUnit error: {0}")]
ParallelUnit(
#[from]
#[backtrace]
ParallelUnitError,
),
}

impl MetaError {
Expand Down
51 changes: 33 additions & 18 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ impl FragmentManagerCore {
.values()
.map(move |fragment| FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(FragmentManager::convert_mapping(
&table_fragments.actor_status,
fragment.vnode_mapping.as_ref().unwrap(),
)),
mapping: Some(
FragmentManager::convert_mapping(
&table_fragments.actor_status,
fragment.vnode_mapping.as_ref().unwrap(),
)
.unwrap(),
),
})
})
}
Expand Down Expand Up @@ -208,15 +211,27 @@ impl FragmentManager {
async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) {
// Notify all fragment mapping to frontend nodes
for fragment in table_fragment.fragments.values() {
let fragment_mapping = FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(Self::convert_mapping(
&table_fragment.actor_status,
fragment
.vnode_mapping
.as_ref()
.expect("no data distribution found"),
)),
let vnode_mapping = fragment
.vnode_mapping
.as_ref()
.expect("no data distribution found");

let fragment_mapping = if let Operation::Delete = operation {
FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(
ParallelUnitMapping::from_protobuf(vnode_mapping)
.as_delete_worker_slot_mapping()
.to_protobuf(),
),
}
} else {
FragmentWorkerSlotMapping {
fragment_id: fragment.fragment_id,
mapping: Some(
Self::convert_mapping(&table_fragment.actor_status, vnode_mapping).unwrap(),
),
}
};

self.env
Expand Down Expand Up @@ -1289,7 +1304,7 @@ impl FragmentManager {

*fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone();

let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping);
let worker_slot_mapping = Self::convert_mapping(&actor_status, &vnode_mapping)?;

// Notify fragment mapping to frontend nodes.
let fragment_mapping = FragmentWorkerSlotMapping {
Expand Down Expand Up @@ -1426,7 +1441,7 @@ impl FragmentManager {
fn convert_mapping(
actor_status: &BTreeMap<ActorId, ActorStatus>,
vnode_mapping: &PbParallelUnitMapping,
) -> PbWorkerSlotMapping {
) -> MetaResult<PbWorkerSlotMapping> {
let parallel_unit_to_worker = actor_status
.values()
.map(|actor_status| {
Expand All @@ -1435,9 +1450,9 @@ impl FragmentManager {
})
.collect();

ParallelUnitMapping::from_protobuf(vnode_mapping)
.to_worker_slot(&parallel_unit_to_worker)
.to_protobuf()
Ok(ParallelUnitMapping::from_protobuf(vnode_mapping)
.to_worker_slot(&parallel_unit_to_worker)?
.to_protobuf())
}

pub async fn table_node_actors(
Expand Down

0 comments on commit 94e3b08

Please sign in to comment.