Skip to content

Commit

Permalink
Refactor: Removed ParallelUnitId, simplified actor handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jul 9, 2024
1 parent 50c27b6 commit 34aa33a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 243 deletions.
106 changes: 4 additions & 102 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::ops::{Index, Sub};
use std::ops::Index;

use educe::Educe;
use itertools::Itertools;
use risingwave_pb::common::{
ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping,
};
use risingwave_pb::common::PbWorkerSlotMapping;
use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;

use super::bitmap::VnodeBitmapExt;
use super::vnode::{ParallelUnitId, VirtualNode};
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::util::compress::compress_data;
use crate::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -78,7 +75,7 @@ impl Debug for WorkerSlotId {
pub trait VnodeMappingItem {
/// The type of the item.
///
/// Currently, there are two types of items: [`ParallelUnitId`] and [`ActorId`]. We don't use
/// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use
/// them directly as the generic parameter because they're the same type aliases.
type Item: Copy + Ord + Hash + Debug;
}
Expand Down Expand Up @@ -292,12 +289,6 @@ pub mod marker {
type Item = ActorId;
}

/// A marker type for items of [`ParallelUnitId`].
pub struct ParallelUnit;
impl VnodeMappingItem for ParallelUnit {
type Item = ParallelUnitId;
}

/// A marker type for items of [`WorkerSlotId`].
pub struct WorkerSlot;
impl VnodeMappingItem for WorkerSlot {
Expand All @@ -310,25 +301,12 @@ pub type ActorMapping = VnodeMapping<marker::Actor>;
/// An expanded mapping from [`VirtualNode`] to [`ActorId`].
pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;

/// A mapping from [`VirtualNode`] to [`ParallelUnitId`].
pub type ParallelUnitMapping = VnodeMapping<marker::ParallelUnit>;
/// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`].
pub type ExpandedParallelUnitMapping = ExpandedMapping<marker::ParallelUnit>;

/// A mapping from [`VirtualNode`] to [`WorkerSlotId`].
pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`].
pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;

impl ActorMapping {
/// Transform this actor mapping to a parallel unit mapping, essentially `transform`.
pub fn to_parallel_unit<M>(&self, to_map: &M) -> ParallelUnitMapping
where
M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>,
{
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
Expand Down Expand Up @@ -367,12 +345,6 @@ impl ActorMapping {
}
}

#[derive(thiserror::Error, Debug)]
pub enum ParallelUnitError {
#[error("parallel units {0:?} are not covered by the worker slot mapping")]
NotCovered(HashSet<ParallelUnitId>),
}

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Expand All @@ -397,76 +369,6 @@ impl WorkerSlotMapping {
}
}

impl ParallelUnitMapping {
/// Create a uniform parallel unit mapping from the given parallel units, essentially
/// `new_uniform`.
pub fn build(parallel_units: &[ParallelUnit]) -> Self {
Self::new_uniform(parallel_units.iter().map(|pu| pu.id))
}

/// Create a uniform parallel unit mapping from the given parallel units ids
pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self {
Self::new_uniform(parallel_unit_ids.iter().cloned())
}

/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<ParallelUnitId, ActorId>) -> ActorMapping {
self.transform(to_map)
}

/// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`.
pub fn to_worker_slot(
&self,
to_map: &HashMap<ParallelUnitId, u32>,
) -> Result<WorkerSlotMapping, ParallelUnitError> {
let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new();
for (parallel_unit_id, worker_id) in to_map {
worker_to_parallel_units
.entry(*worker_id)
.or_default()
.insert(*parallel_unit_id);
}

let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len());

for (worker_id, parallel_unit_ids) in worker_to_parallel_units {
for (index, &parallel_unit_id) in parallel_unit_ids.iter().enumerate() {
parallel_unit_to_worker_slot
.insert(parallel_unit_id, WorkerSlotId::new(worker_id, index));
}
}

let available_parallel_unit_ids: HashSet<_> =
parallel_unit_to_worker_slot.keys().copied().collect();

let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect();

let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids);
if sub_set.is_empty() {
Ok(self.transform(&parallel_unit_to_worker_slot))
} else {
Err(ParallelUnitError::NotCovered(sub_set))
}
}

/// Create a parallel unit mapping from the protobuf representation.
pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.clone(),
}
}

/// Convert this parallel unit mapping to the protobuf representation.
pub fn to_protobuf(&self) -> ParallelUnitMappingProto {
ParallelUnitMappingProto {
original_indices: self.original_indices.clone(),
data: self.data.clone(),
}
}
}

impl WorkerSlotMapping {
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
Expand Down
4 changes: 0 additions & 4 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

/// Parallel unit is the minimal scheduling unit.
// TODO: make it a newtype
pub type ParallelUnitId = u32;

/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
#[repr(transparent)]
Expand Down
8 changes: 0 additions & 8 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_common::error::BoxedError;
use risingwave_common::hash::ParallelUnitError;
use risingwave_common::session_config::SessionConfigError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
Expand Down Expand Up @@ -126,13 +125,6 @@ pub enum MetaErrorInner {
// Indicates that recovery was triggered manually.
#[error("adhoc recovery triggered")]
AdhocRecovery,

#[error("ParallelUnit error: {0}")]
ParallelUnit(
#[from]
#[backtrace]
ParallelUnitError,
),
}

impl MetaError {
Expand Down
Loading

0 comments on commit 34aa33a

Please sign in to comment.