diff --git a/src/expr/impl/src/scalar/vnode.rs b/src/expr/impl/src/scalar/vnode.rs index 8fdecda841d85..7d44dfb0e03b1 100644 --- a/src/expr/impl/src/scalar/vnode.rs +++ b/src/expr/impl/src/scalar/vnode.rs @@ -64,7 +64,6 @@ impl Expression for VnodeExpression { #[cfg(test)] mod tests { use risingwave_common::array::{DataChunk, DataChunkTestExt}; - use risingwave_common::hash::VirtualNode; use risingwave_common::row::Row; use risingwave_expr::expr::build_from_pretty; use risingwave_expr::expr_context::VNODE_COUNT; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index b29c4cc207ebb..721e6da5fc7ff 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -56,7 +56,9 @@ use crate::controller::utils::{ get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, }; -use crate::manager::{ActorInfos, InflightFragmentInfo, LocalNotification}; +use crate::manager::{ + ActorInfos, FragmentParallelismInfo, InflightFragmentInfo, LocalNotification, +}; use crate::model::{TableFragments, TableParallelism}; use crate::stream::SplitAssignment; use crate::{MetaError, MetaResult}; @@ -475,8 +477,9 @@ impl CatalogController { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> MetaResult> { + ) -> MetaResult> { let inner = self.inner.read().await; + let mut select = Actor::find() .select_only() .column(actor::Column::FragmentId) @@ -485,11 +488,25 @@ impl CatalogController { if let Some(id_filter) = id_filter { select = select.having(actor::Column::FragmentId.is_in(id_filter)); } - let fragment_parallelisms: Vec<(FragmentId, i64)> = + select = select + .join(JoinType::InnerJoin, actor::Relation::Fragment.def()) + .column(fragment::Column::DistributionType) + .column(fragment::Column::VnodeCount); + + let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> = select.into_tuple().all(&inner.db).await?; Ok(fragment_parallelisms .into_iter() - .map(|(fragment_id, count)| (fragment_id, count as usize)) + .map(|(fragment_id, count, distribution_type, vnode_count)| { + ( + fragment_id, + FragmentParallelismInfo { + distribution_type: distribution_type.into(), + actor_count: count as usize, + vnode_count: vnode_count as usize, + }, + ) + }) .collect()) } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index b734cdb54602a..78d1b884926df 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VnodeCountCompat, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::stream_graph_visitor::{ visit_stream_node, visit_stream_node_cont, visit_stream_node_cont_mut, }; @@ -137,7 +137,7 @@ impl FragmentManagerCore { fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> HashMap { + ) -> HashMap { self.table_fragments .values() .filter(|tf| tf.state() != State::Initial) @@ -149,13 +149,14 @@ impl FragmentManagerCore { return None; } - let parallelism = match fragment.get_distribution_type().unwrap() { - FragmentDistributionType::Unspecified => unreachable!(), - FragmentDistributionType::Single => 1, - FragmentDistributionType::Hash => fragment.get_actors().len(), - }; - - Some((fragment.fragment_id, parallelism)) + Some(( + fragment.fragment_id, + FragmentParallelismInfo { + distribution_type: fragment.get_distribution_type().unwrap(), + actor_count: fragment.actors.len(), + vnode_count: fragment.vnode_count(), + }, + )) }) }) .collect() @@ -190,6 +191,13 @@ impl ActorInfos { } } +#[derive(Clone, Debug)] +pub struct FragmentParallelismInfo { + pub distribution_type: FragmentDistributionType, + pub actor_count: usize, + pub vnode_count: usize, +} + pub type FragmentManagerRef = Arc; impl FragmentManager { @@ -1685,7 +1693,7 @@ impl FragmentManager { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> HashMap { + ) -> HashMap { self.core .read() .await diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 935d4773865ed..e11568781db69 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -32,6 +32,7 @@ use tokio::sync::oneshot; use tokio::time::{sleep, Instant}; use tracing::warn; +use super::FragmentParallelismInfo; use crate::barrier::Reschedule; use crate::controller::catalog::CatalogControllerRef; use crate::controller::cluster::{ClusterControllerRef, WorkerExtraInfo}; @@ -437,15 +438,12 @@ impl MetadataManager { pub async fn running_fragment_parallelisms( &self, id_filter: Option>, - ) -> MetaResult> { + ) -> MetaResult> { match self { MetadataManager::V1(mgr) => Ok(mgr .fragment_manager .running_fragment_parallelisms(id_filter) - .await - .into_iter() - .map(|(k, v)| (k as FragmentId, v)) - .collect()), + .await), MetadataManager::V2(mgr) => { let id_filter = id_filter.map(|ids| ids.into_iter().map(|id| id as _).collect()); Ok(mgr diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index a7d719369d7c3..c7e39ca8a0a8d 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,15 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::{VirtualNode, WorkerSlotMapping}; +use risingwave_common::hash::WorkerSlotMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType; use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef}; +use crate::manager::{ + FragmentParallelismInfo, LocalNotification, MetadataManager, NotificationManagerRef, +}; use crate::model::FragmentId; pub type ServingVnodeMappingRef = Arc; @@ -43,27 +46,21 @@ impl ServingVnodeMapping { /// Returns (successful updates, failed updates). pub fn upsert( &self, - streaming_parallelisms: HashMap, + streaming_parallelisms: HashMap, workers: &[WorkerNode], ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; - for (fragment_id, streaming_parallelism) in streaming_parallelisms { + for (fragment_id, info) in streaming_parallelisms { let new_mapping = { let old_mapping = serving_vnode_mappings.get(&fragment_id); - let max_parallelism = if streaming_parallelism == 1 { - Some(1) - } else { - None + let max_parallelism = match info.distribution_type { + FragmentDistributionType::Unspecified => unreachable!(), + FragmentDistributionType::Single => Some(1), + FragmentDistributionType::Hash => None, }; - // TODO(var-vnode): also fetch vnode count for each fragment - place_vnode( - old_mapping, - workers, - max_parallelism, - VirtualNode::COUNT_FOR_COMPAT, - ) + place_vnode(old_mapping, workers, max_parallelism, info.vnode_count) }; match new_mapping { None => { @@ -134,7 +131,10 @@ pub async fn on_meta_start( async fn fetch_serving_infos( metadata_manager: &MetadataManager, -) -> (Vec, HashMap) { +) -> ( + Vec, + HashMap, +) { match metadata_manager { MetadataManager::V1(mgr) => ( mgr.cluster_manager @@ -160,7 +160,7 @@ async fn fetch_serving_infos( serving_compute_nodes, parallelisms .into_iter() - .map(|(fragment_id, cnt)| (fragment_id as FragmentId, cnt)) + .map(|(fragment_id, info)| (fragment_id as FragmentId, info)) .collect(), ) } @@ -198,9 +198,9 @@ pub async fn start_serving_vnode_mapping_worker( continue; } let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; - let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id|{ + let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| { match streaming_parallelisms.get(frag_id) { - Some(parallelism) => Some((*frag_id, *parallelism)), + Some(info) => Some((*frag_id, info.clone())), None => { tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found"); None