Skip to content

Commit

Permalink
Refactor WorkerMapping to WorkerSlotMapping in protobuf and Rust code
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed May 17, 2024
1 parent 3f4ac77 commit 93cb568
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 138 deletions.
18 changes: 9 additions & 9 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ message FragmentParallelUnitMappings {
}

/// Worker mapping with fragment id, used for notification.
message FragmentWorkerMapping {
message FragmentWorkerSlotMapping {
uint32 fragment_id = 1;
common.WorkerSlotMapping mapping = 2;
}

message FragmentWorkerMappings {
repeated FragmentWorkerMapping mappings = 1;
message FragmentWorkerSlotMappings {
repeated FragmentWorkerSlotMapping mappings = 1;
}

// TODO: remove this when dashboard refactored.
Expand Down Expand Up @@ -391,7 +391,7 @@ message MetaSnapshot {
reserved 2;
reserved "parallel_unit_mapping_version";
uint64 worker_node_version = 3;
uint64 streaming_worker_mapping_version = 4;
uint64 streaming_worker_slot_mapping_version = 4;
}
repeated catalog.Database databases = 1;
repeated catalog.Schema schemas = 2;
Expand All @@ -416,8 +416,8 @@ message MetaSnapshot {
reserved "serving_parallel_unit_mappings";

// for streaming
repeated FragmentWorkerMapping streaming_worker_mappings = 21;
repeated FragmentWorkerMapping serving_worker_mappings = 22;
repeated FragmentWorkerSlotMapping streaming_worker_slot_mappings = 21;
repeated FragmentWorkerSlotMapping serving_worker_slot_mappings = 22;

SnapshotVersion version = 13;
}
Expand Down Expand Up @@ -467,8 +467,8 @@ message SubscribeResponse {
catalog.Connection connection = 22;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
FragmentWorkerMapping streaming_worker_mapping = 27;
FragmentWorkerMappings serving_worker_mappings = 28;
FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27;
FragmentWorkerSlotMappings serving_worker_slot_mappings = 28;
}
reserved 12;
reserved "parallel_unit_mapping";
Expand Down Expand Up @@ -651,7 +651,7 @@ message GetServingVnodeMappingsResponse {
reserved 1;
reserved "mappings";
map<uint32, uint32> fragment_to_table = 2;
repeated FragmentWorkerMapping worker_mappings = 3;
repeated FragmentWorkerSlotMapping worker_slot_mappings = 3;
}

service ServingService {
Expand Down
16 changes: 8 additions & 8 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct InnerSideExecutorBuilder<C> {
task_id: TaskId,
epoch: BatchQueryEpoch,
worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
worker_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
/// based on the passed `scan_range` and virtual node.
fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
let list = self.worker_to_scan_range_mapping.get(id).unwrap();
let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
let mut scan_ranges = vec![];
let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());

Expand Down Expand Up @@ -159,7 +159,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
#[async_trait::async_trait]
impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C> {
fn reset(&mut self) {
self.worker_to_scan_range_mapping = HashMap::new();
self.worker_slot_to_scan_range_mapping = HashMap::new();
}

/// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id
Expand Down Expand Up @@ -190,11 +190,11 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let worker_id = self.vnode_mapping[vnode.to_index()];
let worker_slot_id = self.vnode_mapping[vnode.to_index()];

let list = self
.worker_to_scan_range_mapping
.entry(worker_id)
.worker_slot_to_scan_range_mapping
.entry(worker_slot_id)
.or_default();
list.push((scan_range, vnode));

Expand All @@ -206,7 +206,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
async fn build_executor(&mut self) -> Result<BoxedExecutor> {
self.next_stage_id += 1;
let mut sources = vec![];
for id in self.worker_to_scan_range_mapping.keys() {
for id in self.worker_slot_to_scan_range_mapping.keys() {
sources.push(self.build_prost_exchange_source(id)?);
}

Expand Down Expand Up @@ -402,7 +402,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
worker_to_scan_range_mapping: HashMap::new(),
worker_slot_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
Expand Down
8 changes: 5 additions & 3 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl WorkerNodeManager {
/// If worker ids is empty, the scheduler may fail to schedule any task and stuck at
/// schedule next stage. If we do not return error in this case, needs more complex control
/// logic above. Report in this function makes the schedule root fail reason more clear.
pub fn get_workers_by_worker_ids(
pub fn get_workers_by_worker_slot_ids(
&self,
worker_slot_ids: &[WorkerSlotId],
) -> Result<Vec<WorkerNode>> {
Expand All @@ -156,7 +156,6 @@ impl WorkerNodeManager {

let guard = self.inner.read().unwrap();

// TODO: Does the return order of this function need to match the order of the parameters?
let worker_slot_index: HashMap<_, _> = guard
.worker_nodes
.iter()
Expand All @@ -171,7 +170,10 @@ impl WorkerNodeManager {
for worker_slot_id in worker_slot_ids {
match worker_slot_index.get(worker_slot_id) {
Some(worker) => workers.push((*worker).clone()),
None => bail!("No worker node found for worker id: {}", worker_slot_id),
None => bail!(
"No worker node found for worker slot id: {}",
worker_slot_id
),
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ where
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Info::Recovery(_) => true,
Info::StreamingWorkerMapping(_) => {
Info::StreamingWorkerSlotMapping(_) => {
notification.version
> info
.version
.as_ref()
.unwrap()
.streaming_worker_mapping_version
.streaming_worker_slot_mapping_version
}
Info::ServingWorkerMappings(_) => true,
Info::ServingWorkerSlotMappings(_) => true,
});

self.observer_states
Expand Down
2 changes: 0 additions & 2 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use crate::util::iter_util::ZipEqDebug;
// TODO: find a better place for this.
pub type ActorId = u32;

// pub type WorkerSlotId = u64;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct WorkerSlotId(u64);

Expand Down
19 changes: 0 additions & 19 deletions src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use risingwave_pb::common::WorkerNode;

use crate::hash::ParallelUnitId;

pub type WorkerNodeId = u32;

pub fn get_pu_to_worker_mapping(nodes: &[WorkerNode]) -> HashMap<ParallelUnitId, WorkerNode> {
let mut pu_to_worker = HashMap::new();

for node in nodes {
for pu in &node.parallel_units {
let res = pu_to_worker.insert(pu.id, node.clone());
assert!(res.is_none(), "duplicate parallel unit id");
}
}

pu_to_worker
}
16 changes: 4 additions & 12 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,10 @@ use crate::hash::{VirtualNode, WorkerSlotMapping};
/// The strategy is similar to `rebalance_actor_vnode` used in meta node, but is modified to
/// consider `max_parallelism` too.
pub fn place_vnode(
hint_worker_mapping: Option<&WorkerSlotMapping>,
hint_worker_slot_mapping: Option<&WorkerSlotMapping>,
workers: &[WorkerNode],
max_parallelism: Option<usize>,
) -> Option<WorkerSlotMapping> {
// #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
// struct WorkerSlot(u32, usize);
//
// impl WorkerSlot {
// fn worker_id(&self) -> u32 {
// self.0
// }
// }
// Get all serving worker slots from all available workers, grouped by worker id and ordered
// by worker slot id in each group.
let mut worker_slots: LinkedList<_> = workers
Expand Down Expand Up @@ -113,9 +105,9 @@ pub fn place_vnode(
builder: BitmapBuilder::zeroed(VirtualNode::COUNT),
is_temp: true,
};
match hint_worker_mapping {
Some(hint_worker_mapping) => {
for (vnode, worker_slot) in hint_worker_mapping.iter_with_vnode() {
match hint_worker_slot_mapping {
Some(hint_worker_slot_mapping) => {
for (vnode, worker_slot) in hint_worker_slot_mapping.iter_with_vnode() {
let b = if selected_slots_set.contains(&worker_slot) {
// Assign vnode to the same worker slot as hint.
balances.get_mut(&worker_slot).unwrap()
Expand Down
28 changes: 14 additions & 14 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentWorkerMapping, MetaSnapshot, SubscribeResponse};
use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
use risingwave_rpc_client::ComputeClientPoolRef;
use tokio::sync::watch::Sender;

Expand Down Expand Up @@ -102,8 +102,8 @@ impl ObserverState for FrontendObserverNode {
Info::HummockStats(stats) => {
self.handle_table_stats_notification(stats);
}
Info::StreamingWorkerMapping(_) => self.handle_fragment_mapping_notification(resp),
Info::ServingWorkerMappings(m) => {
Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
Info::ServingWorkerSlotMappings(m) => {
self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
}
Info::Recovery(_) => {
Expand Down Expand Up @@ -138,8 +138,8 @@ impl ObserverState for FrontendObserverNode {
hummock_version: _,
meta_backup_manifest_id: _,
hummock_write_limits: _,
streaming_worker_mappings,
serving_worker_mappings,
streaming_worker_slot_mappings,
serving_worker_slot_mappings,
session_params,
version,
} = snapshot;
Expand Down Expand Up @@ -180,8 +180,8 @@ impl ObserverState for FrontendObserverNode {

self.worker_node_manager.refresh(
nodes,
convert_worker_mapping(&streaming_worker_mappings),
convert_worker_mapping(&serving_worker_mappings),
convert_worker_slot_mapping(&streaming_worker_slot_mappings),
convert_worker_slot_mapping(&serving_worker_slot_mappings),
);
self.hummock_snapshot_manager
.update(hummock_snapshot.unwrap());
Expand Down Expand Up @@ -388,7 +388,7 @@ impl FrontendObserverNode {
return;
};
match info {
Info::StreamingWorkerMapping(streaming_worker_mapping) => {
Info::StreamingWorkerSlotMapping(streaming_worker_mapping) => {
let fragment_id = streaming_worker_mapping.fragment_id;
let mapping = || {
WorkerSlotMapping::from_protobuf(
Expand Down Expand Up @@ -418,20 +418,20 @@ impl FrontendObserverNode {

fn handle_fragment_serving_mapping_notification(
&mut self,
mappings: Vec<FragmentWorkerMapping>,
mappings: Vec<FragmentWorkerSlotMapping>,
op: Operation,
) {
match op {
Operation::Add | Operation::Update => {
self.worker_node_manager
.upsert_serving_fragment_mapping(convert_worker_mapping(&mappings));
.upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
}
Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
&mappings.into_iter().map(|m| m.fragment_id).collect_vec(),
),
Operation::Snapshot => {
self.worker_node_manager
.set_serving_fragment_mapping(convert_worker_mapping(&mappings));
.set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
}
_ => panic!("receive an unsupported notify {:?}", op),
}
Expand Down Expand Up @@ -471,13 +471,13 @@ impl FrontendObserverNode {
}
}

fn convert_worker_mapping(
worker_mappings: &[FragmentWorkerMapping],
fn convert_worker_slot_mapping(
worker_mappings: &[FragmentWorkerSlotMapping],
) -> HashMap<FragmentId, WorkerSlotMapping> {
worker_mappings
.iter()
.map(
|FragmentWorkerMapping {
|FragmentWorkerSlotMapping {
fragment_id,
mapping,
}| {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl StageRunner {
let workers = self
.worker_node_manager
.manager
.get_workers_by_worker_ids(&worker_ids)?;
.get_workers_by_worker_slot_ids(&worker_ids)?;

for (i, (worker_id, worker)) in worker_ids
.into_iter()
Expand Down Expand Up @@ -716,7 +716,7 @@ impl StageRunner {
let candidates = self
.worker_node_manager
.manager
.get_workers_by_worker_ids(&worker_ids)?;
.get_workers_by_worker_slot_ids(&worker_ids)?;
if candidates.is_empty() {
return Err(BatchError::EmptyWorkerNodes.into());
}
Expand Down Expand Up @@ -752,7 +752,7 @@ impl StageRunner {
let candidates = self
.worker_node_manager
.manager
.get_workers_by_worker_ids(&[worker_id])?;
.get_workers_by_worker_slot_ids(&[worker_id])?;
if candidates.is_empty() {
return Err(BatchError::EmptyWorkerNodes.into());
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl LocalQueryExecution {
let workers = self
.worker_node_manager
.manager
.get_workers_by_worker_ids(&worker_ids)?;
.get_workers_by_worker_slot_ids(&worker_ids)?;
for (idx, (worker_node, partition)) in
(workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate()
{
Expand Down Expand Up @@ -615,7 +615,7 @@ impl LocalQueryExecution {
let candidates = self
.worker_node_manager
.manager
.get_workers_by_worker_ids(&worker_ids)?;
.get_workers_by_worker_slot_ids(&worker_ids)?;
if candidates.is_empty() {
return Err(BatchError::EmptyWorkerNodes.into());
}
Expand Down
Loading

0 comments on commit 93cb568

Please sign in to comment.