Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ignore me] fix: try fix test failure in 17523 #17663

Closed
wants to merge 13 commits into from
753 changes: 8 additions & 745 deletions ci/workflows/pull-request.yml

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ message WorkerNode {
WorkerType type = 2;
HostAddress host = 3;
State state = 4;
// TODO #8940 `parallel_units` should be moved into `Property`
repeated ParallelUnit parallel_units = 5;

reserved 5;
reserved "parallel_units";

Property property = 6;

// Ranges from 0 to 1023, used to generate the machine ID field in the global unique ID.
Expand All @@ -75,6 +77,8 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
33 changes: 25 additions & 8 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ message TableFragments {
uint32 fragment_type_mask = 2;
FragmentDistributionType distribution_type = 3;
repeated stream_plan.StreamActor actors = 4;

// NOTE: vnode_mapping is deprecated, we will generate the vnode_mapping by actors' bitmaps
// Vnode mapping (which should be set in upstream dispatcher) of the fragment.
// This field is always set to `Some`. For singleton, the parallel unit for all vnodes will be the same.
common.ParallelUnitMapping vnode_mapping = 5;
reserved 5;
reserved "vnode_mapping";

repeated uint32 state_table_ids = 6;
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
// but in some scenarios (e.g. Scaling) it will lead to a lot of duplicate code,
Expand Down Expand Up @@ -128,8 +132,13 @@ message ActorLocation {
}

message MigrationPlan {
// NOTE: parallel_unit_migration_plan is deprecated, using worker_slot_migration_plan instead
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 1;
reserved 1;
reserved "parallel_unit_migration_plan";

// map<src_worker_slot_id, dst_worker_slot_id>, the plan indicates that the actors will be migrated from old worker_slot to the new one.
map<uint64, uint64> worker_slot_migration_plan = 2;
}

message FlushRequest {
Expand Down Expand Up @@ -244,8 +253,10 @@ message ListActorStatesResponse {
message ActorState {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 parallel_unit_id = 3;
reserved 3;
reserved "parallel_unit_id";
TableFragments.ActorStatus.ActorState state = 4;
uint32 worker_id = 5;
}
repeated ActorState states = 1;
}
Expand Down Expand Up @@ -506,16 +517,22 @@ message GetClusterInfoResponse {
uint64 revision = 5;
}

message Reschedule {
repeated uint32 added_parallel_units = 1;
repeated uint32 removed_parallel_units = 2;
// For each fragment that needs to be rescheduled, there will be a WorkerReschedule,
// indicating on which workers the actors of this fragment need to be reduced and by how many,
// and on which workers they need to be increased and by how many.
message WorkerReschedule {
// worker_id -> increased_actor_count
map<uint32, uint32> increased_actor_count = 1;
// worker_id -> decreased_actor_count
map<uint32, uint32> decreased_actor_count = 2;
}

message RescheduleRequest {
// reschedule plan for each fragment
map<uint32, Reschedule> reschedules = 1;
reserved "reschedules";
reserved 1;
uint64 revision = 2;
bool resolve_no_shuffle_upstream = 3;
map<uint32, WorkerReschedule> worker_reschedules = 4;
}

message RescheduleResponse {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand All @@ -428,7 +428,7 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallel_units: vec![],
parallelism: 0,
property: Some(Property {
is_unschedulable: false,
is_serving: true,
Expand Down
27 changes: 27 additions & 0 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,26 @@ impl ActorMapping {
self.transform(to_map)
}

/// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
for (actor_id, worker_id) in actor_to_worker {
worker_actors
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor_id);
}

let mut actor_location = HashMap::new();
for (worker, actors) in worker_actors {
for (idx, &actor) in actors.iter().enumerate() {
actor_location.insert(*actor, WorkerSlotId::new(*worker, idx));
}
}

self.transform(&actor_location)
}

/// Create an actor mapping from the protobuf representation.
pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Expand Down Expand Up @@ -447,6 +467,13 @@ impl ParallelUnitMapping {
}
}

impl WorkerSlotMapping {
/// Transform this worker slot mapping to an actor mapping, essentially `transform`.
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
self.transform(to_map)
}
}

#[cfg(test)]
mod tests {
use std::iter::repeat_with;
Expand Down
29 changes: 5 additions & 24 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,42 +198,23 @@ pub fn place_vnode(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
use risingwave_pb::common::WorkerNode;

use crate::hash::{ParallelUnitId, VirtualNode};
use crate::hash::VirtualNode;
use crate::vnode_mapping::vnode_placement::place_vnode;
#[test]
fn test_place_vnode() {
assert_eq!(VirtualNode::COUNT, 256);

let mut pu_id_counter: ParallelUnitId = 0;
let mut pu_to_worker: HashMap<ParallelUnitId, u32> = Default::default();
let serving_property = Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
};

let mut gen_pus_for_worker =
|worker_node_id: u32, number: u32, pu_to_worker: &mut HashMap<ParallelUnitId, u32>| {
let mut results = vec![];
for i in 0..number {
results.push(ParallelUnit {
id: pu_id_counter + i,
worker_node_id,
})
}
pu_id_counter += number;
for pu in &results {
pu_to_worker.insert(pu.id, pu.worker_node_id);
}
results
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
assert_eq!(wm1.len(), 256);
assert_eq!(wm2.len(), 256);
Expand All @@ -249,7 +230,7 @@ mod tests {

let worker_1 = WorkerNode {
id: 1,
parallel_units: gen_pus_for_worker(1, 1, &mut pu_to_worker),
parallelism: 1,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -264,7 +245,7 @@ mod tests {

let worker_2 = WorkerNode {
id: 2,
parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker),
parallelism: 50,
property: Some(serving_property.clone()),
..Default::default()
};
Expand All @@ -283,7 +264,7 @@ mod tests {

let worker_3 = WorkerNode {
id: 3,
parallel_units: gen_pus_for_worker(3, 60, &mut pu_to_worker),
parallelism: 60,
property: Some(serving_property),
..Default::default()
};
Expand Down
47 changes: 21 additions & 26 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use comfy_table::{Attribute, Cell, Row, Table};
use itertools::Itertools;
Expand Down Expand Up @@ -88,40 +88,33 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
revision,
} = get_cluster_info(context).await?;

// Fragment ID -> [Parallel Unit ID -> (Parallel Unit, Actor)]
// Fragment ID -> [Worker ID -> [Actor ID]]
let mut fragments = BTreeMap::new();
// Fragment ID -> Table Fragments' State
let mut fragment_states = HashMap::new();

for table_fragment in &table_fragments {
for (&id, fragment) in &table_fragment.fragments {
for actor in &fragment.actors {
let parallel_unit = table_fragment
let worker_id = table_fragment
.actor_status
.get(&actor.actor_id)
.unwrap()
.get_parallel_unit()
.unwrap();
.unwrap()
.get_worker_node_id();

fragments
.entry(id)
.or_insert_with(HashMap::new)
.insert(parallel_unit.id, (parallel_unit, actor));
.or_insert_with(BTreeMap::new)
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor.actor_id);
}
fragment_states.insert(id, table_fragment.state());
}
}

// Parallel Unit ID -> Worker Node
let all_parallel_units: BTreeMap<_, _> = worker_nodes
.iter()
.flat_map(|worker_node| {
worker_node
.parallel_units
.iter()
.map(|parallel_unit| (parallel_unit.id, worker_node.clone()))
})
.collect();

let mut table = Table::new();

let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
Expand All @@ -132,11 +125,10 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
}
};

// Compute Node, Parallel Unit, Frag 1, Frag 2, ..., Frag N
// Compute Node, Frag 1, Frag 2, ..., Frag N
table.set_header({
let mut row = Row::new();
row.add_cell("Compute Node".into());
row.add_cell("Parallel Unit".into());
for &fid in fragments.keys() {
let cell = Cell::new(format!("Frag {fid}"));
let cell = cross_out_if_creating(cell, fid);
Expand All @@ -146,8 +138,8 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
});

let mut last_worker_id = None;
for (pu, worker) in all_parallel_units {
// Compute Node, Parallel Unit, Actor 1, Actor 11, -, ..., Actor N
for worker in worker_nodes {
// Compute Node, Actor 1, Actor 11, -, ..., Actor N
let mut row = Row::new();
row.add_cell(if last_worker_id == Some(worker.id) {
"".into()
Expand All @@ -166,14 +158,17 @@ pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
))
.add_attribute(Attribute::Bold)
});
row.add_cell(pu.into());
for (&fid, f) in &fragments {
let cell = if let Some((_pu, actor)) = f.get(&pu) {
actor.actor_id.into()
for (&fragment_id, worker_actors) in &fragments {
let cell = if let Some(actors) = worker_actors.get(&worker.id) {
actors
.iter()
.map(|actor| format!("{}", actor))
.join(",")
.into()
} else {
"-".into()
};
let cell = cross_out_if_creating(cell, fid);
let cell = cross_out_if_creating(cell, fragment_id);
row.add_cell(cell);
}
table.add_row(row);
Expand Down
8 changes: 1 addition & 7 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,12 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.await?;
if worker.worker_type() == WorkerType::ComputeNode {
let pb_property = worker.worker_node.property.as_ref().unwrap();
let parallel_unit_ids = worker
.worker_node
.parallel_units
.iter()
.map(|pu| pu.id as i32)
.collect_vec();
let property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id() as _),
parallel_unit_ids: Set(parallel_unit_ids.into()),
is_streaming: Set(pb_property.is_streaming),
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallelism: Set(worker.worker_node.parallelism() as _),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
Loading
Loading