Skip to content

Commit

Permalink
serving info be aware of vnode count
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 12, 2024
1 parent 2bd3f59 commit dd4b4f7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 39 deletions.
1 change: 0 additions & 1 deletion src/expr/impl/src/scalar/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl Expression for VnodeExpression {
#[cfg(test)]
mod tests {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::Row;
use risingwave_expr::expr::build_from_pretty;
use risingwave_expr::expr_context::VNODE_COUNT;
Expand Down
25 changes: 21 additions & 4 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ use crate::controller::utils::{
get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors,
FragmentDesc, PartialActorLocation, PartialFragmentStateTables,
};
use crate::manager::{ActorInfos, InflightFragmentInfo, LocalNotification};
use crate::manager::{
ActorInfos, FragmentParallelismInfo, InflightFragmentInfo, LocalNotification,
};
use crate::model::{TableFragments, TableParallelism};
use crate::stream::SplitAssignment;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -475,8 +477,9 @@ impl CatalogController {
pub async fn running_fragment_parallelisms(
&self,
id_filter: Option<HashSet<FragmentId>>,
) -> MetaResult<HashMap<FragmentId, usize>> {
) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
let inner = self.inner.read().await;

let mut select = Actor::find()
.select_only()
.column(actor::Column::FragmentId)
Expand All @@ -485,11 +488,25 @@ impl CatalogController {
if let Some(id_filter) = id_filter {
select = select.having(actor::Column::FragmentId.is_in(id_filter));
}
let fragment_parallelisms: Vec<(FragmentId, i64)> =
select = select
.join(JoinType::InnerJoin, actor::Relation::Fragment.def())
.column(fragment::Column::DistributionType)
.column(fragment::Column::VnodeCount);

let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
select.into_tuple().all(&inner.db).await?;
Ok(fragment_parallelisms
.into_iter()
.map(|(fragment_id, count)| (fragment_id, count as usize))
.map(|(fragment_id, count, distribution_type, vnode_count)| {
(
fragment_id,
FragmentParallelismInfo {
distribution_type: distribution_type.into(),
actor_count: count as usize,
vnode_count: vnode_count as usize,
},
)
})
.collect())
}

Expand Down
28 changes: 18 additions & 10 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::util::stream_graph_visitor::{
visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut,
};
Expand Down Expand Up @@ -137,7 +137,7 @@ impl FragmentManagerCore {
fn running_fragment_parallelisms(
&self,
id_filter: Option<HashSet<FragmentId>>,
) -> HashMap<FragmentId, usize> {
) -> HashMap<FragmentId, FragmentParallelismInfo> {
self.table_fragments
.values()
.filter(|tf| tf.state() != State::Initial)
Expand All @@ -149,13 +149,14 @@ impl FragmentManagerCore {
return None;
}

let parallelism = match fragment.get_distribution_type().unwrap() {
FragmentDistributionType::Unspecified => unreachable!(),
FragmentDistributionType::Single => 1,
FragmentDistributionType::Hash => fragment.get_actors().len(),
};

Some((fragment.fragment_id, parallelism))
Some((
fragment.fragment_id,
FragmentParallelismInfo {
distribution_type: fragment.get_distribution_type().unwrap(),
actor_count: fragment.actors.len(),
vnode_count: fragment.vnode_count(),
},
))
})
})
.collect()
Expand Down Expand Up @@ -190,6 +191,13 @@ impl ActorInfos {
}
}

#[derive(Clone, Debug)]
pub struct FragmentParallelismInfo {
pub distribution_type: FragmentDistributionType,
pub actor_count: usize,
pub vnode_count: usize,
}

pub type FragmentManagerRef = Arc<FragmentManager>;

impl FragmentManager {
Expand Down Expand Up @@ -1685,7 +1693,7 @@ impl FragmentManager {
pub async fn running_fragment_parallelisms(
&self,
id_filter: Option<HashSet<FragmentId>>,
) -> HashMap<FragmentId, usize> {
) -> HashMap<FragmentId, FragmentParallelismInfo> {
self.core
.read()
.await
Expand Down
8 changes: 3 additions & 5 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tokio::sync::oneshot;
use tokio::time::{sleep, Instant};
use tracing::warn;

use super::FragmentParallelismInfo;
use crate::barrier::Reschedule;
use crate::controller::catalog::CatalogControllerRef;
use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo};
Expand Down Expand Up @@ -437,15 +438,12 @@ impl MetadataManager {
pub async fn running_fragment_parallelisms(
&self,
id_filter: Option<HashSet<FragmentId>>,
) -> MetaResult<HashMap<FragmentId, usize>> {
) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
match self {
MetadataManager::V1(mgr) => Ok(mgr
.fragment_manager
.running_fragment_parallelisms(id_filter)
.await
.into_iter()
.map(|(k, v)| (k as FragmentId, v))
.collect()),
.await),
MetadataManager::V2(mgr) => {
let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect());
Ok(mgr
Expand Down
38 changes: 19 additions & 19 deletions src/meta/src/serving/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_common::hash::{VirtualNode, WorkerSlotMapping};
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
use crate::manager::{
FragmentParallelismInfo, LocalNotification, MetadataManager, NotificationManagerRef,
};
use crate::model::FragmentId;

pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
Expand All @@ -43,27 +46,21 @@ impl ServingVnodeMapping {
/// Returns (successful updates, failed updates).
pub fn upsert(
&self,
streaming_parallelisms: HashMap<FragmentId, usize>,
streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
workers: &[WorkerNode],
) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
let mut failed: Vec<FragmentId> = vec![];
for (fragment_id, streaming_parallelism) in streaming_parallelisms {
for (fragment_id, info) in streaming_parallelisms {
let new_mapping = {
let old_mapping = serving_vnode_mappings.get(&fragment_id);
let max_parallelism = if streaming_parallelism == 1 {
Some(1)
} else {
None
let max_parallelism = match info.distribution_type {
FragmentDistributionType::Unspecified => unreachable!(),
FragmentDistributionType::Single => Some(1),
FragmentDistributionType::Hash => None,
};
// TODO(var-vnode): also fetch vnode count for each fragment
place_vnode(
old_mapping,
workers,
max_parallelism,
VirtualNode::COUNT_FOR_COMPAT,
)
place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
};
match new_mapping {
None => {
Expand Down Expand Up @@ -134,7 +131,10 @@ pub async fn on_meta_start(

async fn fetch_serving_infos(
metadata_manager: &MetadataManager,
) -> (Vec<WorkerNode>, HashMap<FragmentId, usize>) {
) -> (
Vec<WorkerNode>,
HashMap<FragmentId, FragmentParallelismInfo>,
) {
match metadata_manager {
MetadataManager::V1(mgr) => (
mgr.cluster_manager
Expand All @@ -160,7 +160,7 @@ async fn fetch_serving_infos(
serving_compute_nodes,
parallelisms
.into_iter()
.map(|(fragment_id, cnt)| (fragment_id as FragmentId, cnt))
.map(|(fragment_id, info)| (fragment_id as FragmentId, info))
.collect(),
)
}
Expand Down Expand Up @@ -198,9 +198,9 @@ pub async fn start_serving_vnode_mapping_worker(
continue;
}
let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id|{
let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
match streaming_parallelisms.get(frag_id) {
Some(parallelism) => Some((*frag_id, *parallelism)),
Some(info) => Some((*frag_id, info.clone())),
None => {
tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
None
Expand Down

0 comments on commit dd4b4f7

Please sign in to comment.