Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove parallel unit keyword #17589

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ enum WorkerType {
WORKER_TYPE_META = 5;
}

message ParallelUnit {
uint32 id = 1;
// renamed from `ParallelUnit`
message ActorLocation {
// previous `id` field for parallel unit id is deprecated.
reserved "id";
reserved 1;
uint32 worker_node_id = 2;
}

Expand Down Expand Up @@ -90,12 +93,6 @@ message Buffer {
bytes body = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to parallel unit id.
message ParallelUnitMapping {
repeated uint32 original_indices = 1;
repeated uint32 data = 2;
}

// Vnode mapping for stream fragments. Stores mapping from virtual node to (worker id, slot index).
message WorkerSlotMapping {
repeated uint32 original_indices = 1;
Expand Down
14 changes: 2 additions & 12 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
// Running normally
RUNNING = 2;
}
// Current on which parallel unit
common.ParallelUnit parallel_unit = 1;
// Current on which worker
common.ActorLocation location = 1;

Check failure on line 69 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "location" on message "ActorStatus" changed option "json_name" from "parallelUnit" to "location".

Check failure on line 69 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "location" on message "ActorStatus" changed type from "common.ParallelUnit" to "common.ActorLocation".

Check failure on line 69 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "ActorStatus" changed name from "parallel_unit" to "location".
// Current state
ActorState state = 2;
}
Expand Down Expand Up @@ -105,16 +105,6 @@
TableParallelism parallelism = 7;
}

/// Parallel unit mapping with fragment id, used for notification.
message FragmentParallelUnitMapping {
uint32 fragment_id = 1;
common.ParallelUnitMapping mapping = 2;
}

message FragmentParallelUnitMappings {
repeated FragmentParallelUnitMapping mappings = 1;
}

/// Worker slot mapping with fragment id, used for notification.
message FragmentWorkerSlotMapping {
uint32 fragment_id = 1;
Expand Down
107 changes: 5 additions & 102 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashMap};
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::ops::{Index, Sub};
use std::ops::Index;

use educe::Educe;
use itertools::Itertools;
use risingwave_pb::common::{
ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerSlotMapping,
};
use risingwave_pb::common::PbWorkerSlotMapping;
use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;

use super::bitmap::VnodeBitmapExt;
use super::vnode::{ParallelUnitId, VirtualNode};
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::hash::VirtualNode;
use crate::util::compress::compress_data;
use crate::util::iter_util::ZipEqDebug;

Expand Down Expand Up @@ -78,7 +76,7 @@ impl Debug for WorkerSlotId {
pub trait VnodeMappingItem {
/// The type of the item.
///
/// Currently, there are two types of items: [`ParallelUnitId`] and [`ActorId`]. We don't use
/// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use
/// them directly as the generic parameter because they're the same type aliases.
type Item: Copy + Ord + Hash + Debug;
}
Expand Down Expand Up @@ -292,12 +290,6 @@ pub mod marker {
type Item = ActorId;
}

/// A marker type for items of [`ParallelUnitId`].
pub struct ParallelUnit;
impl VnodeMappingItem for ParallelUnit {
type Item = ParallelUnitId;
}

/// A marker type for items of [`WorkerSlotId`].
pub struct WorkerSlot;
impl VnodeMappingItem for WorkerSlot {
Expand All @@ -310,25 +302,12 @@ pub type ActorMapping = VnodeMapping<marker::Actor>;
/// An expanded mapping from [`VirtualNode`] to [`ActorId`].
pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;

/// A mapping from [`VirtualNode`] to [`ParallelUnitId`].
pub type ParallelUnitMapping = VnodeMapping<marker::ParallelUnit>;
/// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`].
pub type ExpandedParallelUnitMapping = ExpandedMapping<marker::ParallelUnit>;

/// A mapping from [`VirtualNode`] to [`WorkerSlotId`].
pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`].
pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;

impl ActorMapping {
/// Transform this actor mapping to a parallel unit mapping, essentially `transform`.
pub fn to_parallel_unit<M>(&self, to_map: &M) -> ParallelUnitMapping
where
M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>,
{
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
Expand Down Expand Up @@ -372,12 +351,6 @@ impl ActorMapping {
}
}

#[derive(thiserror::Error, Debug)]
pub enum ParallelUnitError {
#[error("parallel units {0:?} are not covered by the worker slot mapping")]
NotCovered(HashSet<ParallelUnitId>),
}

impl WorkerSlotMapping {
/// Create a uniform worker mapping from the given worker ids
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId]) -> Self {
Expand All @@ -402,76 +375,6 @@ impl WorkerSlotMapping {
}
}

impl ParallelUnitMapping {
/// Create a uniform parallel unit mapping from the given parallel units, essentially
/// `new_uniform`.
pub fn build(parallel_units: &[ParallelUnit]) -> Self {
Self::new_uniform(parallel_units.iter().map(|pu| pu.id))
}

/// Create a uniform parallel unit mapping from the given parallel units ids
pub fn build_from_ids(parallel_unit_ids: &[ParallelUnitId]) -> Self {
Self::new_uniform(parallel_unit_ids.iter().cloned())
}

/// Transform this parallel unit mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<ParallelUnitId, ActorId>) -> ActorMapping {
self.transform(to_map)
}

/// Transform this parallel unit mapping to a worker slot mapping, essentially `transform`.
pub fn to_worker_slot(
&self,
to_map: &HashMap<ParallelUnitId, u32>,
) -> Result<WorkerSlotMapping, ParallelUnitError> {
let mut worker_to_parallel_units = HashMap::<_, BTreeSet<_>>::new();
for (parallel_unit_id, worker_id) in to_map {
worker_to_parallel_units
.entry(*worker_id)
.or_default()
.insert(*parallel_unit_id);
}

let mut parallel_unit_to_worker_slot = HashMap::with_capacity(to_map.len());

for (worker_id, parallel_unit_ids) in worker_to_parallel_units {
for (index, &parallel_unit_id) in parallel_unit_ids.iter().enumerate() {
parallel_unit_to_worker_slot
.insert(parallel_unit_id, WorkerSlotId::new(worker_id, index));
}
}

let available_parallel_unit_ids: HashSet<_> =
parallel_unit_to_worker_slot.keys().copied().collect();

let parallel_unit_ids: HashSet<_> = self.data.iter().copied().collect();

let sub_set = parallel_unit_ids.sub(&available_parallel_unit_ids);
if sub_set.is_empty() {
Ok(self.transform(&parallel_unit_to_worker_slot))
} else {
Err(ParallelUnitError::NotCovered(sub_set))
}
}

/// Create a parallel unit mapping from the protobuf representation.
pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.clone(),
}
}

/// Convert this parallel unit mapping to the protobuf representation.
pub fn to_protobuf(&self) -> ParallelUnitMappingProto {
ParallelUnitMappingProto {
original_indices: self.original_indices.clone(),
data: self.data.clone(),
}
}
}

impl WorkerSlotMapping {
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
Expand Down
4 changes: 0 additions & 4 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

/// Parallel unit is the minimal scheduling unit.
// TODO: make it a newtype
pub type ParallelUnitId = u32;

/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
#[repr(transparent)]
Expand Down
4 changes: 1 addition & 3 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
.actor_status
.get(&actor.actor_id)
.unwrap()
.get_parallel_unit()
.unwrap()
.get_worker_node_id();
.worker_id();

fragments
.entry(id)
Expand Down
4 changes: 1 addition & 3 deletions src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use itertools::Itertools;
use regex::Regex;
use risingwave_meta::manager::WorkerId;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::table_fragments::ActorStatus;
use risingwave_pb::meta::{GetClusterInfoResponse, PbWorkerReschedule};
use serde::{Deserialize, Serialize};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -265,9 +264,8 @@ pub async fn unregister_workers(
table_fragments
.actor_status
.get(&actor.actor_id)
.and_then(|ActorStatus { parallel_unit, .. }| parallel_unit.clone())
.map(|actor_status| actor_status.worker_id())
.unwrap()
.worker_node_id
})
.collect();

Expand Down
12 changes: 6 additions & 6 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,16 @@ enum MetaCommands {
SourceSplitInfo,
/// Reschedule the parallel unit in the stream graph
///
/// The format is `fragment_id-[removed]+[added]`
/// You can provide either `removed` only or `added` only, but `removed` should be preceded by
/// The format is `fragment_id-[worker_id:count]+[worker_id:count]`
/// You can provide either decreased `worker_ids` only or increased only, but decreased should be preceded by
/// `added` when both are provided.
///
/// For example, for plan `100-[1,2,3]+[4,5]` the follow request will be generated:
/// For example, for plan `100-[1:1]+[4:1]` the follow request will be generated:
/// ```text
/// {
/// 100: Reschedule {
/// added_parallel_units: [4,5],
/// removed_parallel_units: [1,2,3],
/// 100: WorkerReschedule {
/// increased_actor_count: { 1: 1 },
/// decreased_actor_count: { 4: 1 },
/// }
/// }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ impl LocalQueryExecution {
self.get_fragment_id(&side_table_desc.table_id.into())?,
)?;

// TODO: should we use `pb::ParallelUnitMapping` here?
// TODO: should we use `pb::WorkerSlotMapping` here?
node.inner_side_vnode_mapping =
mapping.to_expanded().into_iter().map(u64::from).collect();
node.worker_nodes = self.worker_node_manager.manager.list_worker_nodes();
Expand Down
5 changes: 0 additions & 5 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,6 @@ derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);

derive_from_blob!(
FragmentVnodeMapping,
risingwave_pb::common::ParallelUnitMapping
);

derive_array_from_blob!(
HummockVersionDeltaArray,
risingwave_pb::hummock::PbHummockVersionDelta,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl StreamManagerService for StreamServiceImpl {
actor_id,
fragment_id: actor_to_fragment[&actor_id],
state: status.state,
worker_id: status.parallel_unit.as_ref().unwrap().worker_node_id,
worker_id: status.worker_id(),
}
})
})
Expand Down
8 changes: 2 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ impl ReplaceTablePlan {
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
.worker_id(),
)
})
.collect(),
Expand Down Expand Up @@ -176,9 +174,7 @@ impl CreateStreamingJobCommandInfo {
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
.worker_id(),
)
})
.collect(),
Expand Down
21 changes: 6 additions & 15 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_meta_model_v2::{
ConnectorSplits, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, SourceId,
StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
Expand Down Expand Up @@ -244,7 +244,7 @@ impl CatalogController {
)
})?;

let worker_id = status.parallel_unit.clone().unwrap().get_worker_node_id() as WorkerId;
let worker_id = status.worker_id() as _;

assert_eq!(
pb_upstream_actor_id
Expand Down Expand Up @@ -430,10 +430,7 @@ impl CatalogController {
pb_actor_status.insert(
actor_id as _,
PbActorStatus {
parallel_unit: Some(PbParallelUnit {
id: u32::MAX,
worker_node_id: worker_id as _,
}),
location: PbActorLocation::from_worker(worker_id as u32),
state: PbActorState::from(status) as _,
},
);
Expand Down Expand Up @@ -1059,10 +1056,7 @@ impl CatalogController {
let (table_fragments, actor_status, _) =
Self::compose_fragment(fragment, actors, dispatcher_info)?;
for actor in table_fragments.actors {
let node_id = actor_status[&actor.actor_id]
.get_parallel_unit()
.unwrap()
.worker_node_id as WorkerId;
let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
node_actors
.entry(node_id)
.or_insert_with(Vec::new)
Expand Down Expand Up @@ -1429,7 +1423,7 @@ mod tests {
actor, actor_dispatcher, fragment, ActorId, ActorUpstreamActors, ConnectorSplits,
ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId, VnodeBitmap,
};
use risingwave_pb::common::ParallelUnit;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment};
Expand Down Expand Up @@ -1554,10 +1548,7 @@ mod tests {
(
actor_id,
PbActorStatus {
parallel_unit: Some(ParallelUnit {
id: u32::MAX,
worker_node_id: 0,
}),
location: PbActorLocation::from_worker(0),
state: PbActorState::Running as _,
},
)
Expand Down
Loading
Loading