diff --git a/java/common-utils/src/main/java/com/risingwave/java/utils/MetaClient.java b/java/common-utils/src/main/java/com/risingwave/java/utils/MetaClient.java index 28c1383e668f3..75e9324c37e40 100644 --- a/java/common-utils/src/main/java/com/risingwave/java/utils/MetaClient.java +++ b/java/common-utils/src/main/java/com/risingwave/java/utils/MetaClient.java @@ -18,6 +18,7 @@ import com.risingwave.proto.Catalog.Table; import com.risingwave.proto.ClusterServiceGrpc.ClusterServiceBlockingStub; import com.risingwave.proto.Common.HostAddress; +import com.risingwave.proto.Common.WorkerNode.Property; import com.risingwave.proto.Common.WorkerType; import com.risingwave.proto.DdlServiceGrpc.DdlServiceBlockingStub; import com.risingwave.proto.DdlServiceOuterClass.GetTableRequest; @@ -29,7 +30,6 @@ import com.risingwave.proto.Hummock.UnpinVersionBeforeRequest; import com.risingwave.proto.HummockManagerServiceGrpc.HummockManagerServiceBlockingStub; import com.risingwave.proto.Meta.AddWorkerNodeRequest; -import com.risingwave.proto.Meta.AddWorkerNodeRequest.Property; import com.risingwave.proto.Meta.AddWorkerNodeResponse; import com.risingwave.proto.Meta.HeartbeatRequest; import io.grpc.Grpc; @@ -100,7 +100,6 @@ public MetaClient(String metaAddr, ScheduledExecutorService scheduler) { Property.newBuilder() .setIsStreaming(false) .setIsServing(false) - .setWorkerNodeParallelism(0) .build()) .build(); AddWorkerNodeResponse resp = clusterStub.addWorkerNode(req); diff --git a/proto/common.proto b/proto/common.proto index 1030d07d7c343..05301494b5b44 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -56,6 +56,11 @@ message WorkerNode { bool is_unschedulable = 3; // This is used for frontend node to register its rpc address string internal_rpc_host_addr = 4; + // Meta may assign labels to worker nodes to partition workload by label. + // This is used for serverless backfilling of materialized views. + optional string node_label = 5; + + uint32 parallelism = 6; } message Resource { string rw_version = 1; @@ -83,11 +88,9 @@ message WorkerNode { // It's not persistent in meta store. optional uint64 started_at = 9; - uint32 parallelism = 10; - - // Meta may assign labels to worker nodes to partition workload by label. - // This is used for serverless backfilling of materialized views. - string node_label = 11; + // Moved to `Property` message. + reserved 10; + reserved "parallelism"; } message Buffer { diff --git a/proto/meta.proto b/proto/meta.proto index 37527d6a87ac0..e5dda6b83a922 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -342,21 +342,13 @@ service StreamManagerService { } // Below for cluster service. - message AddWorkerNodeRequest { - message Property { - uint64 worker_node_parallelism = 1; - bool is_streaming = 2; - bool is_serving = 3; - bool is_unschedulable = 4; - // This is used for frontend node to register its rpc address - string internal_rpc_host_addr = 5; - } common.WorkerType worker_type = 1; common.HostAddress host = 2; reserved 3; - Property property = 4; + reserved 4; common.WorkerNode.Resource resource = 5; + common.WorkerNode.Property property = 6; } message AddWorkerNodeResponse { diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index 98923bac62ba5..ed8c30de61141 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -412,12 +412,11 @@ mod tests { r#type: WorkerType::ComputeNode as i32, host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()), state: worker_node::State::Running as i32, - parallelism: 0, property: Some(Property { is_unschedulable: false, is_serving: true, is_streaming: true, - internal_rpc_host_addr: "".to_string(), + ..Default::default() }), transactional_id: Some(1), ..Default::default() @@ -427,12 +426,11 @@ mod tests { r#type: WorkerType::ComputeNode as i32, host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()), state: worker_node::State::Running as i32, - parallelism: 0, property: Some(Property { is_unschedulable: false, is_serving: true, is_streaming: false, - internal_rpc_host_addr: "".to_string(), + ..Default::default() }), transactional_id: Some(2), ..Default::default() diff --git a/src/common/src/util/worker_util.rs b/src/common/src/util/worker_util.rs index 80ecd3b822536..da11afca6e6b4 100644 --- a/src/common/src/util/worker_util.rs +++ b/src/common/src/util/worker_util.rs @@ -13,3 +13,5 @@ // limitations under the License. pub type WorkerNodeId = u32; + +pub const DEFAULT_COMPUTE_NODE_LABEL: &str = "default"; diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 1f9235bb862ae..33e544693d0ac 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -206,7 +206,7 @@ mod tests { use risingwave_common::hash::WorkerSlotMapping; use risingwave_pb::common::worker_node::Property; - use risingwave_pb::common::WorkerNode; + use risingwave_pb::common::{WorkerNode, WorkerType}; use crate::hash::VirtualNode; @@ -232,7 +232,7 @@ mod tests { is_unschedulable: false, is_serving: true, is_streaming: false, - internal_rpc_host_addr: "".to_string(), + ..Default::default() }; let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| { @@ -248,10 +248,12 @@ mod tests { count }; + let mut property = serving_property.clone(); + property.parallelism = 1; let worker_1 = WorkerNode { id: 1, - parallelism: 1, - property: Some(serving_property.clone()), + r#type: WorkerType::ComputeNode.into(), + property: Some(property), ..Default::default() }; @@ -263,10 +265,12 @@ mod tests { let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); assert_eq!(re_worker_mapping_2.iter_unique().count(), 1); + let mut property = serving_property.clone(); + property.parallelism = 50; let worker_2 = WorkerNode { id: 2, - parallelism: 50, - property: Some(serving_property.clone()), + property: Some(property), + r#type: WorkerType::ComputeNode.into(), ..Default::default() }; @@ -282,10 +286,12 @@ mod tests { let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping); assert!(score >= 5); + let mut property = serving_property.clone(); + property.parallelism = 60; let worker_3 = WorkerNode { id: 3, - parallelism: 60, - property: Some(serving_property.clone()), + r#type: WorkerType::ComputeNode.into(), + property: Some(property), ..Default::default() }; let re_pu_mapping_2 = place_vnode( diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index ef4b5c5e32d3b..52e7719afe6f5 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -37,6 +37,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; use risingwave_common::util::tokio_util::sync::CancellationToken; +use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL; use serde::{Deserialize, Serialize}; /// If `total_memory_bytes` is not specified, the default memory limit will be set to @@ -104,6 +105,10 @@ pub struct ComputeNodeOpts { #[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)] pub parallelism: usize, + /// The parallelism that the compute node will register to the scheduler of the meta service. + #[clap(long, env = "RW_NODE_LABEL", default_value_t = default_node_label())] + pub node_label: String, + /// Decides whether the compute node can be used for streaming and serving. #[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())] pub role: Role, @@ -249,6 +254,10 @@ pub fn default_parallelism() -> usize { total_cpu_available().ceil() as usize } +pub fn default_node_label() -> String { + DEFAULT_COMPUTE_NODE_LABEL.to_string() +} + pub fn default_role() -> Role { Role::Both } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index d86a516771802..aae537271de48 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -42,10 +42,10 @@ use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer}; use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS; use risingwave_dml::dml_manager::DmlManager; +use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::WorkerType; use risingwave_pb::compute::config_service_server::ConfigServiceServer; use risingwave_pb::health::health_server::HealthServer; -use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; use risingwave_pb::stream_service::stream_service_server::StreamServiceServer; use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer; @@ -124,11 +124,12 @@ pub async fn compute_node_serve( WorkerType::ComputeNode, &advertise_addr, Property { - worker_node_parallelism: opts.parallelism as u64, + parallelism: opts.parallelism as u32, is_streaming: opts.role.for_streaming(), is_serving: opts.role.for_serving(), is_unschedulable: false, internal_rpc_host_addr: "".to_string(), + node_label: Some(opts.node_label.clone()), }, &config.meta, ) diff --git a/src/ctl/src/common/meta_service.rs b/src/ctl/src/common/meta_service.rs index 6d70bdf942833..f91f47b6f2951 100644 --- a/src/ctl/src/common/meta_service.rs +++ b/src/ctl/src/common/meta_service.rs @@ -17,8 +17,8 @@ use std::env; use anyhow::{bail, Result}; use risingwave_common::config::MetaConfig; use risingwave_common::util::addr::HostAddr; +use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::WorkerType; -use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::MetaClient; pub struct MetaServiceOpts { diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs index b50c7e4cfd07b..cbe584bee53a4 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs @@ -38,6 +38,7 @@ struct RwWorkerNode { system_total_memory_bytes: Option, system_total_cpu_cores: Option, started_at: Option, + label: Option, } #[system_catalog(table, "rw_catalog.rw_worker_nodes")] @@ -58,7 +59,11 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result Result Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(WorkerProperty::Table) + .add_column(ColumnDef::new(WorkerProperty::Label).string()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(WorkerProperty::Table) + .drop_column(WorkerProperty::Label) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum WorkerProperty { + Table, + Label, +} diff --git a/src/meta/model/src/worker_property.rs b/src/meta/model/src/worker_property.rs index ff19cdeb6f65b..09fa662bb2ca7 100644 --- a/src/meta/model/src/worker_property.rs +++ b/src/meta/model/src/worker_property.rs @@ -27,6 +27,7 @@ pub struct Model { pub is_serving: bool, pub is_unschedulable: bool, pub internal_rpc_host_addr: Option, + pub label: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index ee25f1ae84117..b352ece5012a1 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -267,9 +267,7 @@ impl GlobalBarrierWorkerContextImpl { let active_worker_slots: HashSet<_> = active_nodes .current() .values() - .flat_map(|node| { - (0..node.parallelism).map(|idx| WorkerSlotId::new(node.id, idx as usize)) - }) + .flat_map(|node| (0..node.parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))) .collect(); let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots @@ -298,7 +296,7 @@ impl GlobalBarrierWorkerContextImpl { .current() .values() .flat_map(|worker| { - (0..worker.parallelism).map(move |i| WorkerSlotId::new(worker.id, i as _)) + (0..worker.parallelism()).map(move |i| WorkerSlotId::new(worker.id, i as _)) }) .collect_vec(); @@ -316,7 +314,7 @@ impl GlobalBarrierWorkerContextImpl { .current() .values() .flat_map(|worker| { - (0..worker.parallelism * factor) + (0..worker.parallelism() * factor) .map(move |i| WorkerSlotId::new(worker.id, i as _)) }) .collect_vec(); @@ -372,7 +370,7 @@ impl GlobalBarrierWorkerContextImpl { let current_nodes = active_nodes .current() .values() - .map(|node| (node.id, &node.host, node.parallelism)) + .map(|node| (node.id, &node.host, node.parallelism())) .collect_vec(); warn!( current_nodes = ?current_nodes, @@ -413,7 +411,7 @@ impl GlobalBarrierWorkerContextImpl { let available_parallelism = active_nodes .current() .values() - .map(|worker_node| worker_node.parallelism as usize) + .map(|worker_node| worker_node.parallelism()) .sum(); let table_parallelisms: HashMap<_, _> = { diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 8d58ba8fd6c1b..242cbbafddd41 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -24,14 +24,16 @@ use risingwave_common::hash::WorkerSlotId; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::system_memory_available_bytes; +use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL; use risingwave_common::RW_VERSION; use risingwave_license::LicenseManager; use risingwave_meta_model::prelude::{Worker, WorkerProperty}; use risingwave_meta_model::worker::{WorkerStatus, WorkerType}; use risingwave_meta_model::{worker, worker_property, TransactionId, WorkerId}; -use risingwave_pb::common::worker_node::{PbProperty, PbResource, PbState}; +use risingwave_pb::common::worker_node::{ + PbProperty, PbProperty as AddNodeProperty, PbResource, PbState, +}; use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode}; -use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; use sea_orm::prelude::Expr; @@ -76,17 +78,17 @@ impl From for PbWorkerNode { port: info.0.port, }), state: PbState::from(info.0.status) as _, - parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32, property: info.1.as_ref().map(|p| PbProperty { is_streaming: p.is_streaming, is_serving: p.is_serving, is_unschedulable: p.is_unschedulable, internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(), + node_label: p.label.clone(), + parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32, }), transactional_id: info.0.transaction_id.map(|id| id as _), resource: info.2.resource, started_at: info.2.started_at, - node_label: "".to_string(), } } } @@ -394,7 +396,7 @@ impl StreamingClusterInfo { pub fn parallelism(&self) -> usize { self.worker_nodes .values() - .map(|worker| worker.parallelism as usize) + .map(|worker| worker.parallelism()) .sum() } } @@ -443,7 +445,6 @@ fn meta_node_info(host: &str, started_at: Option) -> PbWorkerNode { .map(HostAddr::to_protobuf) .ok(), state: PbState::Running as _, - parallelism: 0, property: None, transactional_id: None, resource: Some(risingwave_pb::common::worker_node::Resource { @@ -452,7 +453,6 @@ fn meta_node_info(host: &str, started_at: Option) -> PbWorkerNode { total_cpu_cores: total_cpu_available() as _, }), started_at, - node_label: "".to_string(), } } @@ -628,7 +628,7 @@ impl ClusterControllerInner { return if worker.worker_type == WorkerType::ComputeNode { let property = property.unwrap(); let mut current_parallelism = property.parallelism as usize; - let new_parallelism = add_property.worker_node_parallelism as usize; + let new_parallelism = add_property.parallelism as usize; match new_parallelism.cmp(¤t_parallelism) { Ordering::Less => { if !self.disable_automatic_parallelism_control { @@ -668,6 +668,13 @@ impl ClusterControllerInner { property.is_streaming = Set(add_property.is_streaming); property.is_serving = Set(add_property.is_serving); property.parallelism = Set(current_parallelism as _); + property.label = Set(Some(add_property.node_label.unwrap_or_else(|| { + tracing::warn!( + "node_label is not set for worker {}, fallback to `default`", + worker.worker_id + ); + DEFAULT_COMPUTE_NODE_LABEL.to_string() + }))); WorkerProperty::update(property).exec(&txn).await?; txn.commit().await?; @@ -678,13 +685,14 @@ impl ClusterControllerInner { let worker_property = worker_property::ActiveModel { worker_id: Set(worker.worker_id), parallelism: Set(add_property - .worker_node_parallelism + .parallelism .try_into() .expect("invalid parallelism")), is_streaming: Set(add_property.is_streaming), is_serving: Set(add_property.is_serving), is_unschedulable: Set(add_property.is_unschedulable), internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)), + label: Set(None), }; WorkerProperty::insert(worker_property).exec(&txn).await?; txn.commit().await?; @@ -713,13 +721,18 @@ impl ClusterControllerInner { let property = worker_property::ActiveModel { worker_id: Set(worker_id), parallelism: Set(add_property - .worker_node_parallelism + .parallelism .try_into() .expect("invalid parallelism")), is_streaming: Set(add_property.is_streaming), is_serving: Set(add_property.is_serving), is_unschedulable: Set(add_property.is_unschedulable), internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)), + label: if r#type == PbWorkerType::ComputeNode { + Set(add_property.node_label.clone()) + } else { + Set(None) + }, }; WorkerProperty::insert(property).exec(&txn).await?; } @@ -952,11 +965,11 @@ mod tests { let parallelism_num = 4_usize; let worker_count = 5_usize; let property = AddNodeProperty { - worker_node_parallelism: parallelism_num as _, + parallelism: parallelism_num as _, is_streaming: true, is_serving: true, is_unschedulable: false, - internal_rpc_host_addr: "".to_string(), + ..Default::default() }; let hosts = mock_worker_hosts_for_test(worker_count); let mut worker_ids = vec![]; @@ -999,7 +1012,7 @@ mod tests { // re-register existing worker node with larger parallelism and change its serving mode. let mut new_property = property.clone(); - new_property.worker_node_parallelism = (parallelism_num * 2) as _; + new_property.parallelism = (parallelism_num * 2) as _; new_property.is_serving = false; cluster_ctl .add_worker( @@ -1043,11 +1056,11 @@ mod tests { port: 5001, }; let mut property = AddNodeProperty { - worker_node_parallelism: 4, is_streaming: true, is_serving: true, is_unschedulable: false, - internal_rpc_host_addr: "".to_string(), + parallelism: 4, + ..Default::default() }; let worker_id = cluster_ctl .add_worker( diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 428bf48343f55..c96911aea163d 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -36,10 +36,10 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, SyncResult, FIRST_VERSION_ID, }; +use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::HummockPinnedVersion; -use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use thiserror_ext::AsReport; @@ -381,11 +381,11 @@ async fn test_release_context_resource() { WorkerType::ComputeNode, fake_host_address_2, Property { - worker_node_parallelism: fake_parallelism, + parallelism: fake_parallelism, is_streaming: true, is_serving: true, is_unschedulable: false, - internal_rpc_host_addr: "".to_string(), + ..Default::default() }, Default::default(), ) @@ -464,11 +464,11 @@ async fn test_hummock_manager_basic() { WorkerType::ComputeNode, fake_host_address_2, Property { - worker_node_parallelism: fake_parallelism, + parallelism: fake_parallelism, is_streaming: true, is_serving: true, is_unschedulable: false, - internal_rpc_host_addr: "".to_string(), + ..Default::default() }, Default::default(), ) diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index b6631a672e385..2e750a06aa222 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -32,10 +32,10 @@ use risingwave_hummock_sdk::{ CompactionGroupId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo, SyncResult, }; use risingwave_meta_model::WorkerId; +use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::CompactionConfig; -use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use crate::controller::catalog::CatalogController; @@ -347,11 +347,11 @@ pub async fn setup_compute_env_with_metric( WorkerType::ComputeNode, fake_host_address, Property { - worker_node_parallelism: fake_parallelism as _, is_streaming: true, is_serving: true, is_unschedulable: false, - internal_rpc_host_addr: "".to_string(), + parallelism: fake_parallelism as _, + ..Default::default() }, Default::default(), ) diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index a10d405d31dff..f2510a5b75486 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -22,9 +22,8 @@ use futures::future::{select, Either}; use risingwave_common::catalog::{DatabaseId, TableId, TableOption}; use risingwave_meta_model::{ObjectId, SourceId, WorkerId}; use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; -use risingwave_pb::common::worker_node::{PbResource, State}; +use risingwave_pb::common::worker_node::{PbResource, Property as AddNodeProperty, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; -use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; @@ -225,7 +224,6 @@ impl ActiveStreamingWorkerNodes { id: node.id, r#type: node.r#type, host: node.host.clone(), - parallelism: node.parallelism, property: node.property.clone(), resource: node.resource.clone(), ..Default::default() diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d6f1d54b73849..c4a97d25d7392 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1816,7 +1816,7 @@ impl ScaleController { let schedulable_worker_slots = workers .values() - .map(|worker| (worker.id as WorkerId, worker.parallelism as usize)) + .map(|worker| (worker.id as WorkerId, worker.parallelism())) .collect::>(); // index for no shuffle relation @@ -2576,7 +2576,8 @@ impl GlobalStreamManager { let prev_worker = worker_cache.insert(worker.id, worker.clone()); match prev_worker { - Some(prev_worker) if prev_worker.get_parallelism() != worker.get_parallelism() => { + // todo, add label checking in further changes + Some(prev_worker) if prev_worker.parallelism() != worker.parallelism() => { tracing::info!(worker = worker.id, "worker parallelism changed"); should_trigger = true; } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index 7a14cb43a342e..0cae1abf24654 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -223,7 +223,7 @@ impl Scheduler { let slots = workers .iter() - .map(|(worker_id, worker)| (*worker_id as WorkerId, worker.parallelism as usize)) + .map(|(worker_id, worker)| (*worker_id as WorkerId, worker.parallelism())) .collect(); let parallelism = default_parallelism.get(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index cd76e124a2d1f..b1bbc04fe5635 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -627,10 +627,7 @@ impl GlobalStreamManager { .collect::>(); // Check if the provided parallelism is valid. - let available_parallelism = worker_nodes - .iter() - .map(|w| w.parallelism as usize) - .sum::(); + let available_parallelism = worker_nodes.iter().map(|w| w.parallelism()).sum::(); let max_parallelism = self .metadata_manager .get_job_max_parallelism(table_id) diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index db34e5fd312bd..cfde9187abc66 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -20,7 +20,10 @@ use itertools::Itertools; use risingwave_common::catalog::{DatabaseId, SchemaId, TableId}; use risingwave_common::hash::VirtualNode; use risingwave_pb::catalog::PbTable; -use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType, WorkerNode}; +use risingwave_pb::common::worker_node::Property; +use risingwave_pb::common::{ + PbColumnOrder, PbDirection, PbNullsAre, PbOrderType, WorkerNode, WorkerType, +}; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType; use risingwave_pb::ddl_service::TableJobType; @@ -426,7 +429,11 @@ fn make_cluster_info() -> StreamingClusterInfo { 0, WorkerNode { id: 0, - parallelism: 8, + property: Some(Property { + parallelism: 8, + ..Default::default() + }), + r#type: WorkerType::ComputeNode.into(), ..Default::default() }, )) diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index a4678df091270..15a0d4b4ff1ba 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -23,6 +23,7 @@ pub use prost::Message; use risingwave_error::tonic::ToTonicStatus; use thiserror::Error; +use crate::common::WorkerType; #[rustfmt::skip] #[cfg_attr(madsim, path = "sim/catalog.rs")] @@ -220,7 +221,11 @@ impl stream_plan::MaterializeNode { // Encapsulating the use of parallelism. impl common::WorkerNode { pub fn parallelism(&self) -> usize { - self.parallelism as usize + assert_eq!(self.r#type(), WorkerType::ComputeNode); + self.property + .as_ref() + .expect("property should be exist") + .parallelism as usize } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index d027608e34600..8f4e6779b8e56 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -54,6 +54,7 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::cloud_service::cloud_service_client::CloudServiceClient; use risingwave_pb::cloud_service::*; +use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; use risingwave_pb::connector_service::sink_coordination_service_client::SinkCoordinationServiceClient; use risingwave_pb::ddl_service::alter_owner_request::Object; @@ -68,7 +69,6 @@ use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_c use risingwave_pb::hummock::subscribe_compaction_event_request::Register; use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::*; -use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::cluster_service_client::ClusterServiceClient; use risingwave_pb::meta::event_log_service_client::EventLogServiceClient; diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 3986a826e21e7..2a022165c8525 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -227,7 +227,7 @@ impl Fragment { self.r .worker_nodes .iter() - .map(|w| (w.id, w.parallelism as usize)) + .map(|w| (w.id, w.parallelism())) .collect() } diff --git a/src/tests/simulation/tests/integration_tests/scale/schedulability.rs b/src/tests/simulation/tests/integration_tests/scale/schedulability.rs index a8a6f73eedd3d..77275a39df70a 100644 --- a/src/tests/simulation/tests/integration_tests/scale/schedulability.rs +++ b/src/tests/simulation/tests/integration_tests/scale/schedulability.rs @@ -39,7 +39,7 @@ async fn test_cordon_normal() -> Result<()> { let rest_worker_slots: HashSet<_> = workers .iter() .flat_map(|worker| { - (0..worker.parallelism).map(|idx| WorkerSlotId::new(worker.id, idx as _)) + (0..worker.parallelism()).map(|idx| WorkerSlotId::new(worker.id, idx as _)) }) .collect();