Skip to content

Commit

Permalink
refactor(meta): improve create mview / scheduler readability (risingw…
Browse files Browse the repository at this point in the history
…avelabs#3748)

* refactor(meta): improve create mview / scheduler readability

Signed-off-by: Bugen Zhao <[email protected]>

* add some comments for hash mapping

Signed-off-by: Bugen Zhao <[email protected]>

* refine docs

Signed-off-by: Bugen Zhao <[email protected]>

* make yuanxin happy

Signed-off-by: Bugen Zhao <[email protected]>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nasnoisaac committed Aug 9, 2022
1 parent 168bde6 commit ec22e86
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 256 deletions.
1 change: 1 addition & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#![feature(drain_filter)]
#![feature(custom_test_frameworks)]
#![feature(lint_reasons)]
#![feature(map_try_insert)]
#![cfg_attr(coverage, feature(no_coverage))]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]

Expand Down
17 changes: 5 additions & 12 deletions src/meta/src/manager/hash_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,6 @@ impl HashMappingManager {
.map(|info| info.vnode_mapping.clone())
}

pub fn set_need_consolidation(&self, newflag: bool) {
let mut core = self.core.lock();
core.need_sst_consolidation = newflag;
}

pub fn get_need_consolidation(&self) -> bool {
let core = self.core.lock();
core.need_sst_consolidation
}

/// For test.
#[cfg(test)]
fn get_fragment_mapping_info(&self, fragment_id: &FragmentId) -> Option<HashMappingInfo> {
Expand All @@ -121,7 +111,6 @@ struct HashMappingInfo {
}

struct HashMappingManagerCore {
need_sst_consolidation: bool,
/// Mapping from fragment to hash mapping information. One fragment will have exactly one vnode
/// mapping, which describes the data distribution of the fragment.
hash_mapping_infos: HashMap<FragmentId, HashMappingInfo>,
Expand All @@ -132,12 +121,15 @@ struct HashMappingManagerCore {
impl HashMappingManagerCore {
fn new() -> Self {
Self {
need_sst_consolidation: true,
hash_mapping_infos: HashMap::new(),
state_table_fragment_mapping: HashMap::new(),
}
}

/// Build a [`HashMappingInfo`] and record it for the fragment based on given `parallel_units`.
///
/// For example, if `parallel_units` is `[0, 1, 2]`, and the total vnode count is 10, we'll
/// generate mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`.
fn build_fragment_hash_mapping(
&mut self,
fragment_id: FragmentId,
Expand Down Expand Up @@ -187,6 +179,7 @@ impl HashMappingManagerCore {
vnode_mapping
}

/// Construct a [`HashMappingInfo`] with given `vnode_mapping` and record it for the fragment.
fn set_fragment_hash_mapping(
&mut self,
fragment_id: FragmentId,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ where
.map(|tf| (tf.table_id(), tf))
.collect();

// Extract vnode mapping info from listed `table_fragments` to hash mapping manager.
Self::restore_vnode_mappings(env.hash_mapping_manager_ref(), &table_fragments)?;

Ok(Self {
Expand Down
164 changes: 76 additions & 88 deletions src/meta/src/stream/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::iter::empty;
use std::sync::atomic::{AtomicUsize, Ordering};

use anyhow::anyhow;
use risingwave_common::bail;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{internal_error, Result};
use risingwave_common::error::Result;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_common::util::compress::compress_data;
use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType};
use risingwave_pb::common::{
ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType, WorkerNode,
};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::Fragment;

Expand All @@ -38,38 +42,44 @@ pub struct Scheduler<S: MetaStore> {
/// Round robin counter for singleton fragments
single_rr: AtomicUsize,
}

/// [`ScheduledLocations`] represents the location of scheduled result.
pub struct ScheduledLocations {
/// actor location map.
pub actor_locations: BTreeMap<ActorId, ParallelUnit>,
/// worker location map.
pub node_locations: WorkerLocations,
pub worker_locations: WorkerLocations,
}

impl ScheduledLocations {
#[cfg_attr(not(test), expect(dead_code))]
pub fn new() -> Self {
Self::with_workers(empty())
}

pub fn with_workers(workers: impl IntoIterator<Item = WorkerNode>) -> Self {
Self {
actor_locations: BTreeMap::new(),
node_locations: HashMap::new(),
actor_locations: Default::default(),
worker_locations: workers.into_iter().map(|w| (w.id, w)).collect(),
}
}

/// [`Self::node_actors`] returns all actors for every node.
pub fn node_actors(&self) -> HashMap<WorkerId, Vec<ActorId>> {
let mut node_actors = HashMap::new();
/// Returns all actors for every worker node.
pub fn worker_actors(&self) -> HashMap<WorkerId, Vec<ActorId>> {
let mut worker_actors = HashMap::new();
self.actor_locations
.iter()
.for_each(|(actor_id, parallel_unit)| {
node_actors
worker_actors
.entry(parallel_unit.worker_node_id)
.or_insert_with(Vec::new)
.push(*actor_id);
});

node_actors
worker_actors
}

/// [`Self::actor_info_map`] returns the `ActorInfo` map for every actor.
/// Returns the `ActorInfo` map for every actor.
pub fn actor_info_map(&self) -> HashMap<ActorId, ActorInfo> {
self.actor_locations
.iter()
Expand All @@ -78,7 +88,7 @@ impl ScheduledLocations {
*actor_id,
ActorInfo {
actor_id: *actor_id,
host: self.node_locations[&parallel_unit.worker_node_id]
host: self.worker_locations[&parallel_unit.worker_node_id]
.host
.clone(),
},
Expand All @@ -87,40 +97,37 @@ impl ScheduledLocations {
.collect::<HashMap<_, _>>()
}

/// [`Self::actor_infos`] returns the `ActorInfo` slice.
pub fn actor_infos(&self) -> Vec<ActorInfo> {
/// Returns an iterator of `ActorInfo`.
pub fn actor_infos(&self) -> impl Iterator<Item = ActorInfo> + '_ {
self.actor_locations
.iter()
.map(|(actor_id, parallel_unit)| ActorInfo {
actor_id: *actor_id,
host: self.node_locations[&parallel_unit.worker_node_id]
host: self.worker_locations[&parallel_unit.worker_node_id]
.host
.clone(),
})
.collect::<Vec<_>>()
}

/// Find a placement location that is on the same node of given actor ids.
/// Find a placement location that is on the same worker node of given actor ids.
pub fn schedule_colocate_with(&self, actor_ids: &[ActorId]) -> Result<ParallelUnit> {
let mut result_location = None;
for actor_id in actor_ids {
let location = self.actor_locations.get(actor_id);
if let Some(location) = location {
if result_location.is_none() {
result_location = Some(location.clone());
} else if location != result_location.as_ref().unwrap() {
return Err(internal_error(format!(
let location = self
.actor_locations
.get(actor_id)
.ok_or_else(|| anyhow!("actor location not found: {}", actor_id))?;
match &result_location {
None => result_location = Some(location.clone()),
Some(result_location) if result_location != location => {
bail!(
"cannot satisfy placement rule: {} is at {:?}, while others are on {:?}",
actor_id,
location,
result_location.as_ref().unwrap()
)));
result_location
);
}
} else {
return Err(internal_error(format!(
"actor location not found: {}",
actor_id
)));
_ => {}
}
}
Ok(result_location.unwrap())
Expand Down Expand Up @@ -154,51 +161,40 @@ where
locations: &mut ScheduledLocations,
) -> Result<()> {
if fragment.actors.is_empty() {
return Err(InternalError("fragment has no actor".to_string()).into());
bail!("fragment has no actor");
}

self.hash_mapping_manager.set_need_consolidation(true);

if fragment.distribution_type == FragmentDistributionType::Single as i32 {
// Singleton fragment
let actor = &fragment.actors[0];

if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
// Schedule the fragment to the same parallel unit as upstream.
let parallel_unit = locations.schedule_colocate_with(&actor.upstream_actor_id)?;
let parallel_unit =
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
// Schedule the fragment to the same parallel unit as upstream.
locations.schedule_colocate_with(&actor.upstream_actor_id)?
} else {
// Choose one parallel unit to schedule from single parallel units.
let single_parallel_units = self
.cluster_manager
.list_parallel_units(Some(ParallelUnitType::Single))
.await;
let single_idx = self
.single_rr
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| {
Some((idx + 1) % single_parallel_units.len())
})
.unwrap();

single_parallel_units[single_idx].clone()
};

// Build vnode mapping.
self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?;
// Build vnode mapping. However, we'll leave vnode field of actors unset for singletons.
self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?;

// Record actor locations.
locations
.actor_locations
.insert(fragment.actors[0].actor_id, parallel_unit);
} else {
// Choose one parallel unit to schedule from single parallel units.
let single_parallel_units = self
.cluster_manager
.list_parallel_units(Some(ParallelUnitType::Single))
.await;
let single_idx = self
.single_rr
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| {
Some((idx + 1) % single_parallel_units.len())
})
.map_err(|_| internal_error("failed to round robin id"))?;

// Build vnode mapping.
self.set_fragment_vnode_mapping(
fragment,
&[single_parallel_units[single_idx].clone()],
)?;

// Record actor locations.
locations.actor_locations.insert(
fragment.actors[0].actor_id,
single_parallel_units[single_idx].clone(),
);
}
// Record actor locations.
locations
.actor_locations
.insert(fragment.actors[0].actor_id, parallel_unit);
} else {
// Normal fragment

Expand Down Expand Up @@ -237,26 +233,18 @@ where

// Record actor locations and set vnodes into the actors.
for (idx, actor) in fragment.actors.iter_mut().enumerate() {
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
let parallel_unit =
locations.schedule_colocate_with(&actor.upstream_actor_id)?;
actor.vnode_bitmap =
Some(vnode_bitmaps.get(&parallel_unit.id).unwrap().to_protobuf());
locations
.actor_locations
.insert(actor.actor_id, parallel_unit);
} else {
actor.vnode_bitmap = Some(
vnode_bitmaps
.get(&parallel_units[idx % parallel_units.len()].id)
.unwrap()
.to_protobuf(),
);
locations.actor_locations.insert(
actor.actor_id,
parallel_units[idx % parallel_units.len()].clone(),
);
}
let parallel_unit =
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
locations.schedule_colocate_with(&actor.upstream_actor_id)?
} else {
parallel_units[idx % parallel_units.len()].clone()
};

actor.vnode_bitmap =
Some(vnode_bitmaps.get(&parallel_unit.id).unwrap().to_protobuf());
locations
.actor_locations
.insert(actor.actor_id, parallel_unit);
}
}

Expand Down
Loading

0 comments on commit ec22e86

Please sign in to comment.