Skip to content

Commit

Permalink
Refine comments on migration and worker ID terms
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Apr 11, 2024
1 parent a2d1e72 commit be9203d
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 53 deletions.
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message ActorLocation {
}

message MigrationPlan {
// map<parallel_unit_id, parallel_unit>, the plan indicates that the actors will be migrated from old parallel unit to the new one.
map<uint32, common.ParallelUnit> parallel_unit_migration_plan = 1;
}

Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl WorkerNodeManager {
write_guard.serving_fragment_vnode_mapping = serving_mapping;
}

/// If parallel unit ids is empty, the scheduler may fail to schedule any task and stuck at
/// If worker ids is empty, the scheduler may fail to schedule any task and stuck at
/// schedule next stage. If we do not return error in this case, needs more complex control
/// logic above. Report in this function makes the schedule root fail reason more clear.
pub fn get_workers_by_worker_ids(&self, worker_ids: &[WorkerId]) -> Result<Vec<WorkerNode>> {
Expand Down
24 changes: 12 additions & 12 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ pub fn place_vnode(
self.0
}
}
// Get all serving parallel units from all available workers, grouped by worker id and ordered
// by parallel unit id in each group.
// Get all serving worker slots from all available workers, grouped by worker id and ordered
// by worker slot id in each group.
let mut worker_slots: LinkedList<_> = workers
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlot(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of parallel units, specified
// Set serving parallelism to the minimum of total number of worker slots, specified
// `max_parallelism` and total number of virtual nodes.
let serving_parallelism = std::cmp::min(
worker_slots.iter().map(|slots| slots.len()).sum(),
std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT),
);

// Select `serving_parallelism` parallel units in a round-robin fashion, to distribute workload
// Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload
// evenly among workers.
let mut selected_slots = Vec::new();
while !worker_slots.is_empty() {
Expand All @@ -76,9 +76,9 @@ pub fn place_vnode(
return None;
}

// Calculate balance for each selected parallel unit. Initially, each parallel unit is assigned
// Calculate balance for each selected worker slot. Initially, each worker slot is assigned
// no vnodes. Thus its negative balance means that many vnodes should be assigned to it later.
// `is_temp` is a mark for a special temporary parallel unit, only to simplify implementation.
// `is_temp` is a mark for a special temporary worker slot, only to simplify implementation.
#[derive(Debug)]
struct Balance {
slot: WorkerSlot,
Expand Down Expand Up @@ -119,10 +119,10 @@ pub fn place_vnode(
let worker_slot = WorkerSlot(worker_id, 0);

let b = if selected_slots_set.contains(&worker_slot) {
// Assign vnode to the same parallel unit as hint.
// Assign vnode to the same worker slot as hint.
balances.get_mut(&worker_slot).unwrap()
} else {
// Assign vnode that doesn't belong to any parallel unit to `temp_pu`
// Assign vnode that doesn't belong to any worker slot to `temp_pu`
// temporarily. They will be reassigned later.
&mut temp_slot
};
Expand All @@ -140,10 +140,10 @@ pub fn place_vnode(
}
}

// The final step is to move vnodes from parallel units with positive balance to parallel units
// with negative balance, until all parallel units are of 0 balance.
// A double-ended queue with parallel units ordered by balance in descending order is consumed:
// 1. Peek 2 parallel units from front and back.
// The final step is to move vnodes from worker slots with positive balance to worker slots
// with negative balance, until all worker slots are of 0 balance.
// A double-ended queue with worker slots ordered by balance in descending order is consumed:
// 1. Peek 2 worker slots from front and back.
// 2. It any of them is of 0 balance, pop it and go to step 1.
// 3. Otherwise, move vnodes from front to back.
let mut balances: VecDeque<_> = balances
Expand Down
1 change: 0 additions & 1 deletion src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl NotificationServiceImpl {
self.get_worker_mapping_snapshot().await?;
let serving_worker_mappings = self.get_serving_vnode_mappings();

// let serving_parallel_unit_mappings = self.get_serving_vnode_mappings();
let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;

let hummock_snapshot = Some(self.hummock_manager.latest_snapshot());
Expand Down
15 changes: 8 additions & 7 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ use risingwave_pb::user::PbUserInfo;
use sea_orm::sea_query::{Expr, SimpleExpr};
use sea_orm::ActiveValue::Set;
use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
TransactionTrait, Value,
ActiveModelTrait, ColumnTrait, ConnectionTrait, DatabaseConnection, DatabaseTransaction,
EntityTrait, IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect,
RelationTrait, TransactionTrait, Value,
};
use tokio::sync::{RwLock, RwLockReadGuard};

Expand Down Expand Up @@ -335,17 +335,18 @@ impl CatalogController {
))
}

pub(crate) async fn get_parallel_unit_to_worker_map(
txn: &DatabaseTransaction,
) -> MetaResult<HashMap<u32, u32>> {
pub(crate) async fn get_parallel_unit_to_worker_map<C>(db: &C) -> MetaResult<HashMap<u32, u32>>
where
C: ConnectionTrait,
{
let worker_parallel_units = WorkerProperty::find()
.select_only()
.columns([
worker_property::Column::WorkerId,
worker_property::Column::ParallelUnitIds,
])
.into_tuple::<(WorkerId, I32Array)>()
.all(txn)
.all(db)
.await?;

let parallel_unit_to_worker = worker_parallel_units
Expand Down
44 changes: 12 additions & 32 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::{
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, sink, source, table, user, user_privilege, view, worker_property,
ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId,
PrivilegeId, SchemaId, SourceId, UserId, WorkerId,
object_dependency, schema, sink, source, table, user, user_privilege, view, ActorId,
DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, PrivilegeId,
SchemaId, SourceId, UserId,
};
use risingwave_pb::catalog::{PbConnection, PbFunction};
use risingwave_pb::meta::{PbFragmentParallelUnitMapping, PbFragmentWorkerMapping};
Expand All @@ -43,6 +43,7 @@ use sea_orm::{
Order, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, Statement,
};

use crate::controller::catalog::CatalogController;
use crate::{MetaError, MetaResult};

/// This function will construct a query using recursive cte to find all objects[(id, `obj_type`)] that are used by the given object.
Expand Down Expand Up @@ -791,25 +792,7 @@ pub async fn get_fragment_mappings<C>(
where
C: ConnectionTrait,
{
let worker_parallel_unit_ids: Vec<(WorkerId, I32Array)> = WorkerProperty::find()
.select_only()
.columns([
worker_property::Column::WorkerId,
worker_property::Column::ParallelUnitIds,
])
.into_tuple()
.all(db)
.await?;

let parallel_unit_to_worker = worker_parallel_unit_ids
.into_iter()
.flat_map(|(worker_id, parallel_unit_ids)| {
parallel_unit_ids
.0
.into_iter()
.map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32))
})
.collect::<HashMap<_, _>>();
let parallel_unit_to_worker = CatalogController::get_parallel_unit_to_worker_map(db).await?;

let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find()
.select_only()
Expand All @@ -821,16 +804,13 @@ where

Ok(fragment_mappings
.into_iter()
.map(|(fragment_id, mapping)| {
let mapping1 = mapping.into_inner();
PbFragmentWorkerMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping1)
.to_worker(&parallel_unit_to_worker)
.to_protobuf(),
),
}
.map(|(fragment_id, mapping)| PbFragmentWorkerMapping {
fragment_id: fragment_id as _,
mapping: Some(
ParallelUnitMapping::from_protobuf(&mapping.into_inner())
.to_worker(&parallel_unit_to_worker)
.to_protobuf(),
),
})
.collect())
}
Expand Down

0 comments on commit be9203d

Please sign in to comment.