Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): improve create mview / scheduler readability #3748

Merged
merged 5 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#![feature(drain_filter)]
#![feature(custom_test_frameworks)]
#![feature(lint_reasons)]
#![feature(map_try_insert)]
#![cfg_attr(coverage, feature(no_coverage))]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]

Expand Down
17 changes: 5 additions & 12 deletions src/meta/src/manager/hash_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,6 @@ impl HashMappingManager {
.map(|info| info.vnode_mapping.clone())
}

pub fn set_need_consolidation(&self, newflag: bool) {
let mut core = self.core.lock();
core.need_sst_consolidation = newflag;
}

pub fn get_need_consolidation(&self) -> bool {
let core = self.core.lock();
core.need_sst_consolidation
}

/// For test.
#[cfg(test)]
fn get_fragment_mapping_info(&self, fragment_id: &FragmentId) -> Option<HashMappingInfo> {
Expand All @@ -121,7 +111,6 @@ struct HashMappingInfo {
}

struct HashMappingManagerCore {
need_sst_consolidation: bool,
/// Mapping from fragment to hash mapping information. One fragment will have exactly one vnode
/// mapping, which describes the data distribution of the fragment.
hash_mapping_infos: HashMap<FragmentId, HashMappingInfo>,
Expand All @@ -132,12 +121,15 @@ struct HashMappingManagerCore {
impl HashMappingManagerCore {
fn new() -> Self {
Self {
need_sst_consolidation: true,
hash_mapping_infos: HashMap::new(),
state_table_fragment_mapping: HashMap::new(),
}
}

/// Build a [`HashMappingInfo`] and record it for the fragment based on given `parallel_units`.
///
/// For example, if `parallel_units` is `[0, 1, 2]`, and the total vnode count is 10, we'll
/// generate mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`.
fn build_fragment_hash_mapping(
&mut self,
fragment_id: FragmentId,
Expand Down Expand Up @@ -187,6 +179,7 @@ impl HashMappingManagerCore {
vnode_mapping
}

/// Construct a [`HashMappingInfo`] with given `vnode_mapping` and record it for the fragment.
fn set_fragment_hash_mapping(
&mut self,
fragment_id: FragmentId,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ where
.map(|tf| (tf.table_id(), tf))
.collect();

// Extract vnode mapping info from listed `table_fragments` to hash mapping manager.
Self::restore_vnode_mappings(env.hash_mapping_manager_ref(), &table_fragments)?;

Ok(Self {
Expand Down
164 changes: 76 additions & 88 deletions src/meta/src/stream/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::iter::empty;
use std::sync::atomic::{AtomicUsize, Ordering};

use anyhow::anyhow;
use risingwave_common::bail;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{internal_error, Result};
use risingwave_common::error::Result;
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_common::util::compress::compress_data;
use risingwave_pb::common::{ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType};
use risingwave_pb::common::{
ActorInfo, ParallelUnit, ParallelUnitMapping, ParallelUnitType, WorkerNode,
};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::table_fragments::Fragment;

Expand All @@ -38,38 +42,44 @@ pub struct Scheduler<S: MetaStore> {
/// Round robin counter for singleton fragments
single_rr: AtomicUsize,
}

/// [`ScheduledLocations`] represents the location of scheduled result.
pub struct ScheduledLocations {
/// actor location map.
pub actor_locations: BTreeMap<ActorId, ParallelUnit>,
/// worker location map.
pub node_locations: WorkerLocations,
pub worker_locations: WorkerLocations,
}

impl ScheduledLocations {
#[cfg_attr(not(test), expect(dead_code))]
pub fn new() -> Self {
Self::with_workers(empty())
}

pub fn with_workers(workers: impl IntoIterator<Item = WorkerNode>) -> Self {
Self {
actor_locations: BTreeMap::new(),
node_locations: HashMap::new(),
actor_locations: Default::default(),
worker_locations: workers.into_iter().map(|w| (w.id, w)).collect(),
}
}

/// [`Self::node_actors`] returns all actors for every node.
pub fn node_actors(&self) -> HashMap<WorkerId, Vec<ActorId>> {
let mut node_actors = HashMap::new();
/// Returns all actors for every worker node.
pub fn worker_actors(&self) -> HashMap<WorkerId, Vec<ActorId>> {
let mut worker_actors = HashMap::new();
self.actor_locations
.iter()
.for_each(|(actor_id, parallel_unit)| {
node_actors
worker_actors
.entry(parallel_unit.worker_node_id)
.or_insert_with(Vec::new)
.push(*actor_id);
});

node_actors
worker_actors
}

/// [`Self::actor_info_map`] returns the `ActorInfo` map for every actor.
/// Returns the `ActorInfo` map for every actor.
pub fn actor_info_map(&self) -> HashMap<ActorId, ActorInfo> {
self.actor_locations
.iter()
Expand All @@ -78,7 +88,7 @@ impl ScheduledLocations {
*actor_id,
ActorInfo {
actor_id: *actor_id,
host: self.node_locations[&parallel_unit.worker_node_id]
host: self.worker_locations[&parallel_unit.worker_node_id]
.host
.clone(),
},
Expand All @@ -87,40 +97,37 @@ impl ScheduledLocations {
.collect::<HashMap<_, _>>()
}

/// [`Self::actor_infos`] returns the `ActorInfo` slice.
pub fn actor_infos(&self) -> Vec<ActorInfo> {
/// Returns an iterator of `ActorInfo`.
pub fn actor_infos(&self) -> impl Iterator<Item = ActorInfo> + '_ {
self.actor_locations
.iter()
.map(|(actor_id, parallel_unit)| ActorInfo {
actor_id: *actor_id,
host: self.node_locations[&parallel_unit.worker_node_id]
host: self.worker_locations[&parallel_unit.worker_node_id]
.host
.clone(),
})
.collect::<Vec<_>>()
}

/// Find a placement location that is on the same node of given actor ids.
/// Find a placement location that is on the same worker node of given actor ids.
pub fn schedule_colocate_with(&self, actor_ids: &[ActorId]) -> Result<ParallelUnit> {
let mut result_location = None;
for actor_id in actor_ids {
let location = self.actor_locations.get(actor_id);
if let Some(location) = location {
if result_location.is_none() {
result_location = Some(location.clone());
} else if location != result_location.as_ref().unwrap() {
return Err(internal_error(format!(
let location = self
.actor_locations
.get(actor_id)
.ok_or_else(|| anyhow!("actor location not found: {}", actor_id))?;
match &result_location {
None => result_location = Some(location.clone()),
Some(result_location) if result_location != location => {
bail!(
"cannot satisfy placement rule: {} is at {:?}, while others are on {:?}",
actor_id,
location,
result_location.as_ref().unwrap()
)));
result_location
);
}
} else {
return Err(internal_error(format!(
"actor location not found: {}",
actor_id
)));
_ => {}
}
}
Ok(result_location.unwrap())
Expand Down Expand Up @@ -154,51 +161,40 @@ where
locations: &mut ScheduledLocations,
) -> Result<()> {
if fragment.actors.is_empty() {
return Err(InternalError("fragment has no actor".to_string()).into());
bail!("fragment has no actor");
}

self.hash_mapping_manager.set_need_consolidation(true);

if fragment.distribution_type == FragmentDistributionType::Single as i32 {
// Singleton fragment
let actor = &fragment.actors[0];

if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
// Schedule the fragment to the same parallel unit as upstream.
let parallel_unit = locations.schedule_colocate_with(&actor.upstream_actor_id)?;
let parallel_unit =
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
// Schedule the fragment to the same parallel unit as upstream.
locations.schedule_colocate_with(&actor.upstream_actor_id)?
} else {
// Choose one parallel unit to schedule from single parallel units.
let single_parallel_units = self
.cluster_manager
.list_parallel_units(Some(ParallelUnitType::Single))
.await;
let single_idx = self
.single_rr
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| {
Some((idx + 1) % single_parallel_units.len())
})
.unwrap();

single_parallel_units[single_idx].clone()
};

// Build vnode mapping.
self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?;
// Build vnode mapping. However, we'll leave vnode field of actors unset for singletons.
self.set_fragment_vnode_mapping(fragment, &[parallel_unit.clone()])?;

// Record actor locations.
locations
.actor_locations
.insert(fragment.actors[0].actor_id, parallel_unit);
} else {
// Choose one parallel unit to schedule from single parallel units.
let single_parallel_units = self
.cluster_manager
.list_parallel_units(Some(ParallelUnitType::Single))
.await;
let single_idx = self
.single_rr
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |idx| {
Some((idx + 1) % single_parallel_units.len())
})
.map_err(|_| internal_error("failed to round robin id"))?;

// Build vnode mapping.
self.set_fragment_vnode_mapping(
fragment,
&[single_parallel_units[single_idx].clone()],
)?;

// Record actor locations.
locations.actor_locations.insert(
fragment.actors[0].actor_id,
single_parallel_units[single_idx].clone(),
);
}
// Record actor locations.
locations
.actor_locations
.insert(fragment.actors[0].actor_id, parallel_unit);
} else {
// Normal fragment

Expand Down Expand Up @@ -237,26 +233,18 @@ where

// Record actor locations and set vnodes into the actors.
for (idx, actor) in fragment.actors.iter_mut().enumerate() {
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
let parallel_unit =
locations.schedule_colocate_with(&actor.upstream_actor_id)?;
actor.vnode_bitmap =
Some(vnode_bitmaps.get(&parallel_unit.id).unwrap().to_protobuf());
locations
.actor_locations
.insert(actor.actor_id, parallel_unit);
} else {
actor.vnode_bitmap = Some(
vnode_bitmaps
.get(&parallel_units[idx % parallel_units.len()].id)
.unwrap()
.to_protobuf(),
);
locations.actor_locations.insert(
actor.actor_id,
parallel_units[idx % parallel_units.len()].clone(),
);
}
let parallel_unit =
if actor.same_worker_node_as_upstream && !actor.upstream_actor_id.is_empty() {
locations.schedule_colocate_with(&actor.upstream_actor_id)?
} else {
parallel_units[idx % parallel_units.len()].clone()
};

actor.vnode_bitmap =
Some(vnode_bitmaps.get(&parallel_unit.id).unwrap().to_protobuf());
locations
.actor_locations
.insert(actor.actor_id, parallel_unit);
}
}

Expand Down
Loading