Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): deprecate parallel unit #17523

Merged
merged 19 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d2a6099
Refactor parallelism handling: Shift from parallel units to paralleli…
shanicky Jul 1, 2024
def21c2
Refactor worker manager & meta mgmt, simplify worker map & enhance ba…
shanicky Jul 1, 2024
7518c2e
Add WorkerReschedule; rm/add rw_ modules; rename rs file.
shanicky Jul 3, 2024
fa877cb
Deprecate `vnode_mapping`, reserve field 5, update `MigrationPlan`
shanicky Jul 3, 2024
4804087
Add parallelism column to WorkerProperty migration
shanicky Jul 4, 2024
4ef6db4
Refactor, docs update, and tweak parallelism access
shanicky Jul 5, 2024
4969ce1
Cleanup imports and update Bitmap path in 4 Rust files
shanicky Jul 8, 2024
6c6c73d
Removed misnamed `get_fragment_ids_by_jobs` function
shanicky Jul 8, 2024
77e738a
Renamed DDL vars for clarity
shanicky Jul 9, 2024
55e5819
Refactor ddl_controller: simplify rand, update imports, clean code
shanicky Jul 16, 2024
e858923
Refactor: Remove worker slot mgmt, update actor/worker ID types
shanicky Jul 16, 2024
d43ba9d
Clean up imports in recovery and streaming_job.rs
shanicky Jul 17, 2024
32bb7b9
Refactor worker rescheduling logic with unified actor diff mapping
shanicky Jul 16, 2024
0e7efb3
Add resource string format comment; skip non-Running actors.
shanicky Jul 17, 2024
f66bf63
Add Copy to DistributionType, fix worker_id cast, rm Bitmap import
shanicky Jul 17, 2024
9a5cb0a
Updated job_ids to ObjectId; refactored loop and get_fragment_mappings.
shanicky Jul 17, 2024
65f9452
Refactor ActorMapping iteration and worker lookup in mapping.rs
shanicky Jul 17, 2024
d59bc6d
Add migration timeout; update logging terminology; scale worker slots…
shanicky Jul 19, 2024
36f3c83
Update assert in sink recovery test to allow equal counts
shanicky Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ message WorkerNode {
WorkerType type = 2;
HostAddress host = 3;
State state = 4;
// TODO #8940 `parallel_units` should be moved into `Property`
repeated ParallelUnit parallel_units = 5;

reserved 5;
reserved "parallel_units";

Property property = 6;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
Expand All @@ -75,6 +77,8 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
30 changes: 22 additions & 8 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ message TableFragments {
uint32 fragment_type_mask = 2;
FragmentDistributionType distribution_type = 3;
repeated stream_plan.StreamActor actors = 4;

// NOTE: vnode_mapping is deprecated, we will generate the vnode_mapping by actors' bitmaps
// Vnode mapping (which should be set in upstream dispatcher) of the fragment.
// This field is always set to `Some`. For singleton, the parallel unit for all vnodes will be the same.
xxchan marked this conversation as resolved.
Show resolved Hide resolved
common.ParallelUnitMapping vnode_mapping = 5;
reserved 5;
reserved "vnode_mapping";

repeated uint32 state_table_ids = 6;
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
// but in some scenarios (e.g. Scaling) it will lead to a lot of duplicate code,
Expand Down Expand Up @@ -128,8 +132,13 @@ message ActorLocation {
}

message MigrationPlan {
// NOTE: parallel_unit_migration_plan is deprecated, using worker_slot_migration_plan instead
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
shanicky marked this conversation as resolved.
Show resolved Hide resolved
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 1;
reserved 1;
reserved "parallel_unit_migration_plan";

// map<src_worker_slot_id, dst_worker_slot_id>, the plan indicates that the actors will be migrated from old worker_slot to the new one.
map<uint64, uint64> worker_slot_migration_plan = 2;
}

message FlushRequest {
Expand Down Expand Up @@ -244,8 +253,10 @@ message ListActorStatesResponse {
message ActorState {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 parallel_unit_id = 3;
reserved 3;
reserved "parallel_unit_id";
TableFragments.ActorStatus.ActorState state = 4;
uint32 worker_id = 5;
}
repeated ActorState states = 1;
}
Expand Down Expand Up @@ -507,16 +518,19 @@ message GetClusterInfoResponse {
uint64 revision = 5;
}

message Reschedule {
repeated uint32 added_parallel_units = 1;
repeated uint32 removed_parallel_units = 2;
// For each fragment that needs to be rescheduled, there will be a WorkerReschedule,
// indicating on which workers the actors of this fragment need to be changed and by how many.
message WorkerReschedule {
// worker_id -> actor_diff
map<uint32, int32> worker_actor_diff = 1;
}

message RescheduleRequest {
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 1;
reserved "reschedules";
reserved 1;
Comment on lines 528 to +530
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to have ever been persisted. We can make breaking changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this part before merging.

uint64 revision = 2;
bool resolve_no_shuffle_upstream = 3;
map<uint32, WorkerReschedule> worker_reschedules = 4;
}

message RescheduleResponse {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand All @@ -439,7 +439,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand Down
32 changes: 32 additions & 0 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,31 @@ impl ActorMapping {
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
for actor_id in self.iter_unique() {
let worker_id = actor_to_worker
.get(&actor_id)
.cloned()
.unwrap_or_else(|| panic!("location for actor {} not found", actor_id));

worker_actors
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor_id);
}

let mut actor_location = HashMap::new();
for (worker, actors) in worker_actors {
for (idx, &actor) in actors.iter().enumerate() {
actor_location.insert(actor, WorkerSlotId::new(worker, idx));
}
}

self.transform(&actor_location)
}

/// Create an actor mapping from the protobuf representation.
pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Expand Down Expand Up @@ -447,6 +472,13 @@ impl ParallelUnitMapping {
}
}

impl WorkerSlotMapping {
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
self.transform(to_map)
}
}

#[cfg(test)]
mod tests {
use std::iter::repeat_with;
Expand Down
29 changes: 5 additions & 24 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,42 +198,23 @@ pub fn place_vnode(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::common::WorkerNode;

use crate::hash::{ParallelUnitId, VirtualNode};
use crate::hash::VirtualNode;
use crate::vnode_mapping::vnode_placement::place_vnode;
#[test]
fn test_place_vnode() {
assert_eq!(VirtualNode::COUNT, 256);

let mut pu_id_counter: ParallelUnitId = 0;
let mut pu_to_worker: HashMap<ParallelUnitId, u32> = Default::default();
let serving_property = Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
};

let mut gen_pus_for_worker =
|worker_node_id: u32, number: u32, pu_to_worker: &mut HashMap<ParallelUnitId, u32>| {
let mut results = vec![];
for i in 0..number {
results.push(ParallelUnit {
id: pu_id_counter + i,
worker_node_id,
})
}
pu_id_counter += number;
for pu in &results {
pu_to_worker.insert(pu.id, pu.worker_node_id);
}
results
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
assert_eq!(wm1.len(), 256);
assert_eq!(wm2.len(), 256);
Expand All @@ -249,7 +230,7 @@ mod tests {

let worker_1 = WorkerNode {
id: 1,
parallel_units: gen_pus_for_worker(1, 1, &mut pu_to_worker),
parallelism: 1,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -264,7 +245,7 @@ mod tests {

let worker_2 = WorkerNode {
id: 2,
parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker),
parallelism: 50,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -283,7 +264,7 @@ mod tests {

let worker_3 = WorkerNode {
id: 3,
parallel_units: gen_pus_for_worker(3, 60, &mut pu_to_worker),
parallelism: 60,
property: Some(serving_property),
..Default::default()
};
Expand Down
47 changes: 21 additions & 26 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use comfy_table::{Attribute, Cell, Row, Table};
use itertools::Itertools;
Expand Down Expand Up @@ -88,40 +88,33 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
revision,
} = get_cluster_info(context).await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
// Fragment ID -> [Worker ID -> [Actor ID]]
let mut fragments = BTreeMap::new();
// Fragment ID -> Table Fragments' State
let mut fragment_states = HashMap::new();

for table_fragment in &table_fragments {
for (&id, fragment) in &table_fragment.fragments {
for actor in &fragment.actors {
let parallel_unit = table_fragment
let worker_id = table_fragment
.actor_status
.get(&actor.actor_id)
.unwrap()
.get_parallel_unit()
.unwrap();
.unwrap()
.get_worker_node_id();

fragments
.entry(id)
.or_insert_with(HashMap::new)
.insert(parallel_unit.id, (parallel_unit, actor));
.or_insert_with(BTreeMap::new)
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor.actor_id);
}
fragment_states.insert(id, table_fragment.state());
}
}

// Parallel Unit ID -> Worker Node
let all_parallel_units: BTreeMap<_, _> = worker_nodes
.iter()
.flat_map(|worker_node| {
worker_node
.parallel_units
.iter()
.map(|parallel_unit| (parallel_unit.id, worker_node.clone()))
})
.collect();

let mut table = Table::new();

let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
Expand All @@ -132,11 +125,10 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
}
};

// Compute Node, Parallel Unit, Frag 1, Frag 2, ..., Frag N
// Compute Node, Frag 1, Frag 2, ..., Frag N
table.set_header({
let mut row = Row::new();
row.add_cell("Compute Node".into());
row.add_cell("Parallel Unit".into());
for &fid in fragments.keys() {
let cell = Cell::new(format!("Frag {fid}"));
let cell = cross_out_if_creating(cell, fid);
Expand All @@ -146,8 +138,8 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
});

let mut last_worker_id = None;
for (pu, worker) in all_parallel_units {
// Compute Node, Parallel Unit, Actor 1, Actor 11, -, ..., Actor N
for worker in worker_nodes {
// Compute Node, Actor 1, Actor 11, -, ..., Actor N
let mut row = Row::new();
row.add_cell(if last_worker_id == Some(worker.id) {
"".into()
Expand All @@ -166,14 +158,17 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
))
.add_attribute(Attribute::Bold)
});
row.add_cell(pu.into());
for (&fid, f) in &fragments {
let cell = if let Some((_pu, actor)) = f.get(&pu) {
actor.actor_id.into()
for (&fragment_id, worker_actors) in &fragments {
let cell = if let Some(actors) = worker_actors.get(&worker.id) {
actors
.iter()
.map(|actor| format!("{}", actor))
.join(",")
.into()
} else {
"-".into()
};
let cell = cross_out_if_creating(cell, fid);
let cell = cross_out_if_creating(cell, fragment_id);
row.add_cell(cell);
}
table.add_row(row);
Expand Down
8 changes: 1 addition & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.await?;
if worker.worker_type() == WorkerType::ComputeNode {
let pb_property = worker.worker_node.property.as_ref().unwrap();
let parallel_unit_ids = worker
.worker_node
.parallel_units
.iter()
.map(|pu| pu.id as i32)
.collect_vec();
let property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id() as _),
parallel_unit_ids: Set(parallel_unit_ids.into()),
is_streaming: Set(pb_property.is_streaming),
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallelism: Set(worker.worker_node.parallelism() as _),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
Loading
Loading