Skip to content

Commit

Permalink
feat(meta): inform executors of the vnodes they own (#2887)
Browse files Browse the repository at this point in the history
* feat(meta): inform executors of the vnodes they own

* remove maplen field in VNodeBitmap and move it to common.proto

* use bitmap instead of Vec<Vnode>

* fmt proto

* refine code
  • Loading branch information
xx01cyx authored May 30, 2022
1 parent a0a167d commit b6d24f9
Show file tree
Hide file tree
Showing 24 changed files with 150 additions and 66 deletions.
6 changes: 6 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ message ParallelUnitMapping {
repeated uint64 original_indices = 2;
repeated uint32 data = 3;
}

// Bitmap that records whether vnodes are present.
message VNodeBitmap {
uint32 table_id = 1;
bytes bitmap = 2;
}
8 changes: 1 addition & 7 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@ message SstableIdInfo {
uint64 meta_delete_timestamp = 4;
}

message VNodeBitmap {
uint32 table_id = 1;
uint32 maplen = 2;
bytes bitmap = 3;
}

message SstableInfo {
uint64 id = 1;
KeyRange key_range = 2;
uint64 file_size = 3;
repeated VNodeBitmap vnode_bitmaps = 4;
repeated common.VNodeBitmap vnode_bitmaps = 4;
}

enum LevelType {
Expand Down
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ message StreamActor {
repeated uint32 upstream_actor_id = 6;
// Placement rule for actor, need to stay on the same node as upstream.
bool same_worker_node_as_upstream = 7;
// Vnodes that the executors in this actor own. If this actor is the only actor in its fragment, `vnode_bitmap`
// will be empty.
bytes vnode_bitmap = 8;
}

enum FragmentType {
Expand Down
4 changes: 3 additions & 1 deletion src/common/src/hash/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use crate::types::{DataSize, DataType};
/// `VirtualNode` is the logical key for consistent hash. Virtual nodes stand for the intermediate
/// layer between data and physical nodes.
pub type VirtualNode = u16;
pub const VIRTUAL_NODE_COUNT: usize = 2048;
pub const VNODE_BITS: usize = 11;
pub const VIRTUAL_NODE_COUNT: usize = 1 << VNODE_BITS;
pub const VNODE_BITMAP_LEN: usize = 1 << (VNODE_BITS - 3);

/// An enum to help to dynamically dispatch [`HashKey`] template.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub type ParallelUnitId = u32;
pub type WorkerLocations = HashMap<WorkerId, WorkerNode>;
pub type ClusterManagerRef<S> = Arc<ClusterManager<S>>;

const DEFAULT_WORK_NODE_PARALLEL_DEGREE: usize = 4;
pub const DEFAULT_WORK_NODE_PARALLEL_DEGREE: usize = 4;

#[derive(Debug)]
pub struct WorkerKey(pub HostAddress);
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/hummock/level_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use std::collections::HashMap;
use itertools::Itertools;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::HummockSSTableId;
use risingwave_pb::common::VNodeBitmap;
use risingwave_pb::hummock::level_handler::SstTask;
use risingwave_pb::hummock::{SstableInfo, VNodeBitmap};
use risingwave_pb::hummock::SstableInfo;

#[derive(Clone, Debug, PartialEq)]
pub struct SSTableInfo {
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::time::Duration;
use itertools::Itertools;
use risingwave_hummock_sdk::key::key_with_epoch;
use risingwave_hummock_sdk::{HummockContextId, HummockEpoch, HummockSSTableId};
use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType};
use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo, VNodeBitmap};
use risingwave_pb::common::{HostAddress, VNodeBitmap, WorkerNode, WorkerType};
use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo};

use crate::cluster::{ClusterManager, ClusterManagerRef};
use crate::hummock::{HummockManager, HummockManagerRef};
Expand Down Expand Up @@ -95,12 +95,10 @@ pub fn generate_test_tables(epoch: u64, sst_ids: Vec<HummockSSTableId>) -> Vec<S
vnode_bitmaps: vec![
VNodeBitmap {
table_id: (i + 1) as u32,
maplen: 0,
bitmap: vec![],
},
VNodeBitmap {
table_id: (i + 2) as u32,
maplen: 0,
bitmap: vec![],
},
],
Expand Down
8 changes: 6 additions & 2 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cluster::{ParallelUnitId, WorkerId};
use crate::manager::{HashMappingManagerRef, MetaSrvEnv};
use crate::model::{ActorId, MetadataModel, TableFragments, Transactional};
use crate::storage::{MetaStore, Transaction};
use crate::stream::set_table_vnode_mappings;
use crate::stream::record_table_vnode_mappings;

struct FragmentManagerCore {
table_fragments: HashMap<TableId, TableFragments>,
Expand Down Expand Up @@ -442,7 +442,11 @@ where
// identical state table id.
let actor = fragment.actors.first().unwrap();
let stream_node = actor.get_nodes()?;
set_table_vnode_mappings(&hash_mapping_manager, stream_node, fragment.fragment_id)?;
record_table_vnode_mappings(
&hash_mapping_manager,
stream_node,
fragment.fragment_id,
)?;
}
}
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub use stream_manager::*;
use crate::manager::HashMappingManagerRef;
use crate::model::FragmentId;

/// Set vnode mapping for stateful operators.
pub fn set_table_vnode_mappings(
/// Record vnode mapping for stateful operators in meta.
pub fn record_table_vnode_mappings(
hash_mapping_manager: &HashMappingManagerRef,
stream_node: &StreamNode,
fragment_id: FragmentId,
Expand Down Expand Up @@ -64,7 +64,7 @@ pub fn set_table_vnode_mappings(
}
let input_nodes = stream_node.get_input();
for input_node in input_nodes {
set_table_vnode_mappings(hash_mapping_manager, input_node, fragment_id)?;
record_table_vnode_mappings(hash_mapping_manager, input_node, fragment_id)?;
}
Ok(())
}
84 changes: 70 additions & 14 deletions src/meta/src/stream/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use std::sync::atomic::{AtomicUsize, Ordering};

use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::VNODE_BITMAP_LEN;
use risingwave_common::util::compress::compress_data;
use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::Fragment;

use super::set_table_vnode_mappings;
use super::record_table_vnode_mappings;
use crate::cluster::{ClusterManagerRef, WorkerId, WorkerLocations};
use crate::manager::HashMappingManagerRef;
use crate::model::ActorId;
Expand Down Expand Up @@ -142,10 +143,10 @@ where

/// [`Self::schedule`] schedules input fragments to different parallel units (workers).
/// The schedule procedure is two-fold:
/// (1) For normal fragments, we schedule them to all the hash parallel units in the cluster.
/// (2) For singleton fragments, we apply the round robin strategy. One single parallel unit in
/// (1) For singleton fragments, we apply the round robin strategy. One single parallel unit in
/// the cluster is assigned to a singleton fragment once, and all the single parallel units take
/// turns.
/// (2) For normal fragments, we schedule them to all the hash parallel units in the cluster.
pub async fn schedule(
&self,
fragment: &mut Fragment,
Expand All @@ -160,58 +161,98 @@ where
let actor = &fragment.actors[0];

if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
// Schedule the fragment to the same parallel unit as upstream.
let parallel_unit = locations.schedule_colocate_with(&actor.upstream_actor_id)?;

// Build vnode mapping.
self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?;

// Record actor locations.
locations
.actor_locations
.insert(fragment.actors[0].actor_id, parallel_unit);
} else {
// Choose one parallel unit to schedule from single parallel units.
let single_parallel_units = self
.cluster_manager
.list_parallel_units(Some(ParallelUnitType::Single))
.await;

let single_idx = self
.single_rr
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| {
Some((idx + 1) % single_parallel_units.len())
})
.map_err(|_| internal_error("failed to round robin id"))?;

locations.actor_locations.insert(
fragment.actors[0].actor_id,
single_parallel_units[single_idx].clone(),
);
// Build vnode mapping.
self.set_fragment_vnode_mapping(
fragment,
&[single_parallel_units[single_idx].clone()],
)?;

// Record actor locations.
locations.actor_locations.insert(
fragment.actors[0].actor_id,
single_parallel_units[single_idx].clone(),
);
}
} else {
// Normal fragment

// Find out all the hash parallel units in the cluster.
let parallel_units = self
.cluster_manager
.list_parallel_units(Some(ParallelUnitType::Hash))
.await;
for (idx, actor) in fragment.actors.iter().enumerate() {

// Build vnode mapping according to the parallel units.
self.set_fragment_vnode_mapping(fragment, &parallel_units)?;

// Find out the vnodes that a parallel unit owns.
let vnode_mapping = self
.hash_mapping_manager
.get_fragment_hash_mapping(&fragment.fragment_id)
.unwrap();
let mut vnode_bitmaps = HashMap::new();
vnode_mapping
.iter()
.enumerate()
.for_each(|(vnode, parallel_unit)| {
vnode_bitmaps
.entry(*parallel_unit)
.or_insert([0; VNODE_BITMAP_LEN])[(vnode >> 3) as usize] |=
1 << (vnode & 0b111);
});

// Record actor locations and set vnodes into the actors.
for (idx, actor) in fragment.actors.iter_mut().enumerate() {
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
let parallel_unit =
locations.schedule_colocate_with(&actor.upstream_actor_id)?;
actor.vnode_bitmap = vnode_bitmaps.get(&parallel_unit.id).unwrap().to_vec();
locations
.actor_locations
.insert(actor.actor_id, parallel_unit);
} else {
actor.vnode_bitmap = vnode_bitmaps
.get(&parallel_units[idx % parallel_units.len()].id)
.unwrap()
.to_vec();
locations.actor_locations.insert(
actor.actor_id,
parallel_units[idx % parallel_units.len()].clone(),
);
}
}
self.set_fragment_vnode_mapping(fragment, &parallel_units)?;
}

Ok(())
}

/// `set_fragment_vnode_mapping` works by following steps:
/// 1. Build a vnode mapping according to parallel units where the fragment is scheduled.
/// 2. Set the vnode mapping into the fragment.
/// 3. Record the relationship between state tables and vnode mappings.
fn set_fragment_vnode_mapping(
&self,
fragment: &mut Fragment,
Expand All @@ -230,7 +271,7 @@ where
// state table id.
let actor = fragment.actors.first().unwrap();
let stream_node = actor.get_nodes()?;
set_table_vnode_mappings(
record_table_vnode_mappings(
&self.hash_mapping_manager,
stream_node,
fragment.fragment_id,
Expand All @@ -245,14 +286,15 @@ mod test {
use std::time::Duration;

use itertools::Itertools;
use risingwave_common::hash::VIRTUAL_NODE_COUNT;
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::plan_common::TableRefId;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{MaterializeNode, StreamActor, StreamNode, TopNNode};

use super::*;
use crate::cluster::ClusterManager;
use crate::cluster::{ClusterManager, DEFAULT_WORK_NODE_PARALLEL_DEGREE};
use crate::manager::MetaSrvEnv;

#[tokio::test]
Expand Down Expand Up @@ -295,6 +337,7 @@ mod test {
dispatcher: vec![],
upstream_actor_id: vec![],
same_worker_node_as_upstream: false,
vnode_bitmap: vec![],
}],
vnode_mapping: None,
};
Expand All @@ -303,9 +346,10 @@ mod test {
})
.collect_vec();

let parallel_degree = DEFAULT_WORK_NODE_PARALLEL_DEGREE - 1;
let mut normal_fragments = (6..8u32)
.map(|fragment_id| {
let actors = (actor_id..actor_id + node_count * 7)
let actors = (actor_id..actor_id + node_count * parallel_degree as u32)
.map(|id| StreamActor {
actor_id: id,
fragment_id,
Expand All @@ -322,6 +366,7 @@ mod test {
dispatcher: vec![],
upstream_actor_id: vec![],
same_worker_node_as_upstream: false,
vnode_bitmap: vec![],
})
.collect_vec();
actor_id += node_count * 7;
Expand Down Expand Up @@ -356,6 +401,9 @@ mod test {
.get_table_hash_mapping(&fragment.fragment_id),
None
);
for actor in fragment.actors {
assert!(actor.vnode_bitmap.is_empty());
}
}

// Test normal schedule for other fragments
Expand All @@ -374,7 +422,7 @@ mod test {
.contains(actor_id)
})
.count(),
(node_count * 7) as usize
node_count as usize * parallel_degree
);
for fragment in normal_fragments {
assert_ne!(
Expand All @@ -388,6 +436,14 @@ mod test {
.get_table_hash_mapping(&fragment.fragment_id),
None
);
let mut vnode_sum = 0;
for actor in fragment.actors {
assert!(!actor.vnode_bitmap.is_empty());
for byte in actor.vnode_bitmap {
vnode_sum += byte.count_ones();
}
}
assert_eq!(vnode_sum as usize, VIRTUAL_NODE_COUNT);
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ impl StreamActorBuilder {
},
)| *same_worker_node,
),
vnode_bitmap: vec![],
}
}
}
Expand Down
23 changes: 12 additions & 11 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,17 +963,18 @@ mod tests {
.await?;

for actor in actors {
assert_eq!(
services
.state
.actor_streams
.lock()
.unwrap()
.get(&actor.get_actor_id())
.cloned()
.unwrap(),
actor
);
let mut scheduled_actor = services
.state
.actor_streams
.lock()
.unwrap()
.get(&actor.get_actor_id())
.cloned()
.unwrap()
.clone();
assert!(!scheduled_actor.vnode_bitmap.is_empty());
scheduled_actor.vnode_bitmap.clear();
assert_eq!(scheduled_actor, actor);
assert!(services
.state
.actor_ids
Expand Down
Loading

0 comments on commit b6d24f9

Please sign in to comment.