Skip to content

Commit

Permalink
Refactor: Removed ParallelUnitId, simplified actor handling.
Browse files Browse the repository at this point in the history
Remove `ParallelUnit` and `parallel_unit` keywords.

Signed-off-by: Shanicky Chen <[email protected]>

Removed `ParallelUnitMapping`, updated comments & fields for `WorkerSlotMapping`.

Refactor Protobuf defs, update Rust actor status usage
  • Loading branch information
shanicky committed Jul 16, 2024
1 parent b8ed09d commit b1ee6d4
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 400 deletions.
13 changes: 5 additions & 8 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ enum WorkerType {
WORKER_TYPE_META = 5;
}

message ParallelUnit {
uint32 id = 1;
// renamed from `ParallelUnit`
message ActorLocation {
// previous `id` field for parallel unit id is deprecated.
reserved "id";
reserved 1;
uint32 worker_node_id = 2;
}

Expand Down Expand Up @@ -90,12 +93,6 @@ message Buffer {
bytes body = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to parallel unit id.
message ParallelUnitMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to (worker id, slot index).
message WorkerSlotMapping {
repeated uint32 original_indices = 1;
Expand Down
14 changes: 2 additions & 12 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ message TableFragments {
// Running normally
RUNNING = 2;
}
// Current on which parallel unit
common.ParallelUnit parallel_unit = 1;
// Current on which worker
common.ActorLocation location = 1;
// Current state
ActorState state = 2;
}
Expand Down Expand Up @@ -105,16 +105,6 @@ message TableFragments {
TableParallelism parallelism = 7;
}

/// Parallel unit mapping with fragment id, used for notification.
message FragmentParallelUnitMapping {
uint32 fragment_id = 1;
common.ParallelUnitMapping mapping = 2;
}

message FragmentParallelUnitMappings {
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker slot mapping with fragment id, used for notification.
message FragmentWorkerSlotMapping {
uint32 fragment_id = 1;
Expand Down
107 changes: 5 additions & 102 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::ops::{Index, Sub};
use std::ops::Index;

use educe::Educe;
use itertools::Itertools;
use risingwave_pb::common::{
ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping,
};
use risingwave_pb::common::PbWorkerSlotMapping;
use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;

use super::bitmap::VnodeBitmapExt;
use super::vnode::{ParallelUnitId, VirtualNode};
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::hash::VirtualNode;
use crate::util::compress::compress_data;
use crate::util::iter_util::ZipEqDebug;

Expand Down Expand Up @@ -78,7 +76,7 @@ impl Debug for WorkerSlotId {
pub trait VnodeMappingItem {
/// The type of the item.
///
/// Currently, there are two types of items: [`ParallelUnitId`] and [`ActorId`]. We don't use
/// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use
/// them directly as the generic parameter because they're the same type aliases.
type Item: Copy + Ord + Hash + Debug;
}
Expand Down Expand Up @@ -292,12 +290,6 @@ pub mod marker {
type Item = ActorId;
}

/// A marker type for items of [`ParallelUnitId`].
pub struct ParallelUnit;
impl VnodeMappingItem for ParallelUnit {
type Item = ParallelUnitId;
}

/// A marker type for items of [`WorkerSlotId`].
pub struct WorkerSlot;
impl VnodeMappingItem for WorkerSlot {
Expand All @@ -310,25 +302,12 @@ pub type ActorMapping = VnodeMapping<marker::Actor>;
/// An expanded mapping from [`VirtualNode`] to [`ActorId`].
pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;

/// A mapping from [`VirtualNode`] to [`ParallelUnitId`].
pub type ParallelUnitMapping = VnodeMapping<marker::ParallelUnit>;
/// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`].
pub type ExpandedParallelUnitMapping = ExpandedMapping<marker::ParallelUnit>;

/// A mapping from [`VirtualNode`] to [`WorkerSlotId`].
pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`].
pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;

impl ActorMapping {
/// Transform this actor mapping to a parallel unit mapping, essentially `transform`.
pub fn to_parallel_unit<M>(&self, to_map: &M) -> ParallelUnitMapping
where
M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>,
{
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
Expand Down Expand Up @@ -367,12 +346,6 @@ impl ActorMapping {
}
}

#[derive(thiserror::Error, Debug)]
pub enum ParallelUnitError {
#[error("parallel units {0:?} are not covered by the worker slot mapping")]
NotCovered(HashSet<ParallelUnitId>),
}

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Expand All @@ -397,76 +370,6 @@ impl WorkerSlotMapping {
}
}

impl ParallelUnitMapping {
/// Create a uniform parallel unit mapping from the given parallel units, essentially
/// `new_uniform`.
pub fn build(parallel_units: &[ParallelUnit]) -> Self {
Self::new_uniform(parallel_units.iter().map(|pu| pu.id))
}

/// Create a uniform parallel unit mapping from the given parallel units ids
pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self {
Self::new_uniform(parallel_unit_ids.iter().cloned())
}

/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<ParallelUnitId, ActorId>) -> ActorMapping {
self.transform(to_map)
}

/// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`.
pub fn to_worker_slot(
&self,
to_map: &HashMap<ParallelUnitId, u32>,
) -> Result<WorkerSlotMapping, ParallelUnitError> {
let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new();
for (parallel_unit_id, worker_id) in to_map {
worker_to_parallel_units
.entry(*worker_id)
.or_default()
.insert(*parallel_unit_id);
}

let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len());

for (worker_id, parallel_unit_ids) in worker_to_parallel_units {
for (index, &parallel_unit_id) in parallel_unit_ids.iter().enumerate() {
parallel_unit_to_worker_slot
.insert(parallel_unit_id, WorkerSlotId::new(worker_id, index));
}
}

let available_parallel_unit_ids: HashSet<_> =
parallel_unit_to_worker_slot.keys().copied().collect();

let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect();

let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids);
if sub_set.is_empty() {
Ok(self.transform(&parallel_unit_to_worker_slot))
} else {
Err(ParallelUnitError::NotCovered(sub_set))
}
}

/// Create a parallel unit mapping from the protobuf representation.
pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.clone(),
}
}

/// Convert this parallel unit mapping to the protobuf representation.
pub fn to_protobuf(&self) -> ParallelUnitMappingProto {
ParallelUnitMappingProto {
original_indices: self.original_indices.clone(),
data: self.data.clone(),
}
}
}

impl WorkerSlotMapping {
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
Expand Down
4 changes: 0 additions & 4 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

/// Parallel unit is the minimal scheduling unit.
// TODO: make it a newtype
pub type ParallelUnitId = u32;

/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
#[repr(transparent)]
Expand Down
4 changes: 1 addition & 3 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
.actor_status
.get(&actor.actor_id)
.unwrap()
.get_parallel_unit()
.unwrap()
.get_worker_node_id();
.worker_id();

fragments
.entry(id)
Expand Down
4 changes: 1 addition & 3 deletions src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use itertools::Itertools;
use regex::{Match, Regex};
use risingwave_meta::manager::WorkerId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::table_fragments::ActorStatus;
use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -301,9 +300,8 @@ pub async fn unregister_workers(
table_fragments
.actor_status
.get(&actor.actor_id)
.and_then(|ActorStatus { parallel_unit, .. }| parallel_unit.clone())
.map(|actor_status| actor_status.worker_id())
.unwrap()
.worker_node_id
})
.collect();

Expand Down
12 changes: 6 additions & 6 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,16 @@ enum MetaCommands {
SourceSplitInfo,
/// Reschedule the parallel unit in the stream graph
///
/// The format is `fragment_id-[removed]+[added]`
/// You can provide either `removed` only or `added` only, but `removed` should be preceded by
/// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
/// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
/// `added` when both are provided.
///
/// For example, for plan `100-[1,2,3]+[4,5]` the follow request will be generated:
/// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
/// ```text
/// {
/// 100: Reschedule {
/// added_parallel_units: [4,5],
/// removed_parallel_units: [1,2,3],
/// 100: WorkerReschedule {
/// increased_actor_count: { 1: 1 },
/// decreased_actor_count: { 4: 1 },
/// }
/// }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl LocalQueryExecution {
self.get_fragment_id(&side_table_desc.table_id.into())?,
)?;

// TODO: should we use `pb::ParallelUnitMapping` here?
// TODO: should we use `pb::WorkerSlotMapping` here?
node.inner_side_vnode_mapping =
mapping.to_expanded().into_iter().map(u64::from).collect();
node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes();
Expand Down
5 changes: 0 additions & 5 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,6 @@ derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);

derive_from_blob!(
FragmentVnodeMapping,
risingwave_pb::common::ParallelUnitMapping
);

derive_array_from_blob!(
HummockVersionDeltaArray,
risingwave_pb::hummock::PbHummockVersionDelta,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl StreamManagerService for StreamServiceImpl {
actor_id,
fragment_id: actor_to_fragment[&actor_id],
state: status.state,
worker_id: status.parallel_unit.as_ref().unwrap().worker_node_id,
worker_id: status.worker_id(),
}
})
})
Expand Down
8 changes: 2 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ impl ReplaceTablePlan {
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
.worker_id(),
)
})
.collect(),
Expand Down Expand Up @@ -294,9 +292,7 @@ impl Command {
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
.worker_id(),
)
})
.collect(),
Expand Down
21 changes: 6 additions & 15 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_meta_model_v2::{
ConnectorSplits, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
Expand Down Expand Up @@ -244,7 +244,7 @@ impl CatalogController {
)
})?;

let worker_id = status.parallel_unit.clone().unwrap().get_worker_node_id() as WorkerId;
let worker_id = status.worker_id() as _;

assert_eq!(
pb_upstream_actor_id
Expand Down Expand Up @@ -430,10 +430,7 @@ impl CatalogController {
pb_actor_status.insert(
actor_id as _,
PbActorStatus {
parallel_unit: Some(PbParallelUnit {
id: u32::MAX,
worker_node_id: worker_id as _,
}),
location: PbActorLocation::from_worker(worker_id as u32),
state: PbActorState::from(status) as _,
},
);
Expand Down Expand Up @@ -1050,10 +1047,7 @@ impl CatalogController {
let (table_fragments, actor_status, _) =
Self::compose_fragment(fragment, actors, dispatcher_info)?;
for actor in table_fragments.actors {
let node_id = actor_status[&actor.actor_id]
.get_parallel_unit()
.unwrap()
.worker_node_id as WorkerId;
let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
node_actors
.entry(node_id)
.or_insert_with(Vec::new)
Expand Down Expand Up @@ -1420,7 +1414,7 @@ mod tests {
actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits,
ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap,
};
use risingwave_pb::common::ParallelUnit;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment};
Expand Down Expand Up @@ -1545,10 +1539,7 @@ mod tests {
(
actor_id,
PbActorStatus {
parallel_unit: Some(ParallelUnit {
id: u32::MAX,
worker_node_id: 0,
}),
location: PbActorLocation::from_worker(0),
state: PbActorState::Running as _,
},
)
Expand Down
Loading

0 comments on commit b1ee6d4

Please sign in to comment.