Skip to content

Commit

Permalink
Move parallelism to WorkerNode.Property, update related structs and r…
Browse files Browse the repository at this point in the history
…eferences

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Oct 29, 2024
1 parent c59fab5 commit 101327d
Show file tree
Hide file tree
Showing 23 changed files with 70 additions and 75 deletions.
4 changes: 2 additions & 2 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ message WorkerNode {
// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
optional string node_label = 5;

uint32 parallelism = 6;
}
message Resource {
string rw_version = 1;
Expand All @@ -85,8 +87,6 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

uint32 parallelism = 10;
}

message Buffer {
Expand Down
11 changes: 1 addition & 10 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -340,19 +340,10 @@ service StreamManagerService {
// Below for cluster service.

message AddWorkerNodeRequest {
message Property {
uint64 worker_node_parallelism = 1;
bool is_streaming = 2;
bool is_serving = 3;
bool is_unschedulable = 4;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 5;
optional string label = 6;
}
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
reserved 3;
Property property = 4;
common.WorkerNode.Property property = 4;
common.WorkerNode.Resource resource = 5;
}

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallelism: 0,
property: Some(Property {
parallelism: 0,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
Expand All @@ -428,8 +428,8 @@ mod tests {
r#type: WorkerType::ComputeNode as i32,
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
parallelism: 0,
property: Some(Property {
parallelism: 0,
is_unschedulable: false,
is_serving: true,
is_streaming: false,
Expand Down
18 changes: 10 additions & 8 deletions src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
};

let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
Expand All @@ -249,10 +248,11 @@ mod tests {
count
};

let mut property = serving_property.clone();
property.parallelism = 1;
let worker_1 = WorkerNode {
id: 1,
parallelism: 1,
property: Some(serving_property.clone()),
property: Some(property),
..Default::default()
};

Expand All @@ -264,10 +264,11 @@ mod tests {
let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap();
assert_eq!(re_worker_mapping_2.iter_unique().count(), 1);

let mut property = serving_property.clone();
property.parallelism = 50;
let worker_2 = WorkerNode {
id: 2,
parallelism: 50,
property: Some(serving_property.clone()),
property: Some(property),
..Default::default()
};

Expand All @@ -283,10 +284,11 @@ mod tests {
let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping);
assert!(score >= 5);

let mut property = serving_property.clone();
property.parallelism = 60;
let worker_3 = WorkerNode {
id: 3,
parallelism: 60,
property: Some(serving_property.clone()),
property: Some(property),
..Default::default()
};
let re_pu_mapping_2 = place_vnode(
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager, TracingExtractLayer};
use risingwave_connector::source::monitor::GLOBAL_SOURCE_METRICS;
use risingwave_dml::dml_manager::DmlManager;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerType;
use risingwave_pb::compute::config_service_server::ConfigServiceServer;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
use risingwave_pb::stream_service::stream_service_server::StreamServiceServer;
use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
Expand Down Expand Up @@ -124,12 +124,12 @@ pub async fn compute_node_serve(
WorkerType::ComputeNode,
&advertise_addr,
Property {
worker_node_parallelism: opts.parallelism as u64,
parallelism: opts.parallelism as u32,
is_streaming: opts.role.for_streaming(),
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
label: Some(opts.node_label.clone()),
node_label: Some(opts.node_label.clone()),
},
&config.meta,
)
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::env;
use anyhow::{bail, Result};
use risingwave_common::config::MetaConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerType;
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_rpc_client::MetaClient;

pub struct MetaServiceOpts {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ pub async fn handle_show_object(
addr: addr.to_string(),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().to_string(),
parallelism: worker.get_parallelism() as _,
parallelism: worker.parallelism() as _,
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ pub(crate) mod tests {
port: 5687,
}),
state: risingwave_pb::common::worker_node::State::Running as i32,
parallelism: 8,
property: Some(Property {
parallelism: 8,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
Expand All @@ -688,8 +688,8 @@ pub(crate) mod tests {
port: 5688,
}),
state: risingwave_pb::common::worker_node::State::Running as i32,
parallelism: 8,
property: Some(Property {
parallelism: 8,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
Expand All @@ -707,8 +707,8 @@ pub(crate) mod tests {
port: 5689,
}),
state: risingwave_pb::common::worker_node::State::Running as i32,
parallelism: 8,
property: Some(Property {
parallelism: 8,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
use risingwave_connector::source::monitor::{SourceMetrics, GLOBAL_SOURCE_METRICS};
use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
use risingwave_pb::common::WorkerType;
use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::meta::add_worker_node_request::Property as AddWorkerNodeProperty;
use risingwave_pb::user::auth_info::EncryptionType;
use risingwave_pb::user::grant_privilege::Object;
use risingwave_rpc_client::{ComputeClientPool, ComputeClientPoolRef, MetaClient};
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ impl GlobalBarrierWorker {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallelism: node.parallelism,
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
Expand Down
12 changes: 5 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ impl GlobalBarrierWorkerContext {
let active_worker_slots: HashSet<_> = active_nodes
.current()
.values()
.flat_map(|node| {
(0..node.parallelism).map(|idx| WorkerSlotId::new(node.id, idx as usize))
})
.flat_map(|node| (0..node.parallelism()).map(|idx| WorkerSlotId::new(node.id, idx)))
.collect();

let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
Expand Down Expand Up @@ -443,7 +441,7 @@ impl GlobalBarrierWorkerContext {
.current()
.values()
.flat_map(|worker| {
(0..worker.parallelism).map(move |i| WorkerSlotId::new(worker.id, i as _))
(0..worker.parallelism()).map(move |i| WorkerSlotId::new(worker.id, i as _))
})
.collect_vec();

Expand All @@ -461,7 +459,7 @@ impl GlobalBarrierWorkerContext {
.current()
.values()
.flat_map(|worker| {
(0..worker.parallelism * factor)
(0..worker.parallelism() * factor)
.map(move |i| WorkerSlotId::new(worker.id, i as _))
})
.collect_vec();
Expand Down Expand Up @@ -517,7 +515,7 @@ impl GlobalBarrierWorkerContext {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, node.parallelism))
.map(|node| (node.id, &node.host, node.parallelism()))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
Expand Down Expand Up @@ -558,7 +556,7 @@ impl GlobalBarrierWorkerContext {
let available_parallelism = active_nodes
.current()
.values()
.map(|worker_node| worker_node.parallelism as usize)
.map(|worker_node| worker_node.parallelism())
.sum();

let table_parallelisms: HashMap<_, _> = {
Expand Down
28 changes: 14 additions & 14 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ use risingwave_license::LicenseManager;
use risingwave_meta_model::prelude::{Worker, WorkerProperty};
use risingwave_meta_model::worker::{WorkerStatus, WorkerType};
use risingwave_meta_model::{worker, worker_property, TransactionId, WorkerId};
use risingwave_pb::common::worker_node::{PbProperty, PbResource, PbState};
use risingwave_pb::common::worker_node::{
PbProperty, PbProperty as AddNodeProperty, PbResource, PbState,
};
use risingwave_pb::common::{HostAddress, PbHostAddress, PbWorkerNode, PbWorkerType, WorkerNode};
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use sea_orm::prelude::Expr;
Expand Down Expand Up @@ -76,13 +77,13 @@ impl From<WorkerInfo> for PbWorkerNode {
port: info.0.port,
}),
state: PbState::from(info.0.status) as _,
parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
property: info.1.as_ref().map(|p| PbProperty {
is_streaming: p.is_streaming,
is_serving: p.is_serving,
is_unschedulable: p.is_unschedulable,
internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
node_label: p.label.clone(),
parallelism: info.1.as_ref().map(|p| p.parallelism).unwrap_or_default() as u32,
}),
transactional_id: info.0.transaction_id.map(|id| id as _),
resource: info.2.resource,
Expand Down Expand Up @@ -394,7 +395,7 @@ impl StreamingClusterInfo {
pub fn parallelism(&self) -> usize {
self.worker_nodes
.values()
.map(|worker| worker.parallelism as usize)
.map(|worker| worker.parallelism())
.sum()
}
}
Expand Down Expand Up @@ -443,7 +444,6 @@ fn meta_node_info(host: &str, started_at: Option<u64>) -> PbWorkerNode {
.map(HostAddr::to_protobuf)
.ok(),
state: PbState::Running as _,
parallelism: 0,
property: None,
transactional_id: None,
resource: Some(risingwave_pb::common::worker_node::Resource {
Expand Down Expand Up @@ -627,7 +627,7 @@ impl ClusterControllerInner {
return if worker.worker_type == WorkerType::ComputeNode {
let property = property.unwrap();
let mut current_parallelism = property.parallelism as usize;
let new_parallelism = add_property.worker_node_parallelism as usize;
let new_parallelism = add_property.parallelism as usize;
match new_parallelism.cmp(&current_parallelism) {
Ordering::Less => {
if !self.disable_automatic_parallelism_control {
Expand Down Expand Up @@ -677,7 +677,7 @@ impl ClusterControllerInner {
let worker_property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id),
parallelism: Set(add_property
.worker_node_parallelism
.parallelism
.try_into()
.expect("invalid parallelism")),
is_streaming: Set(add_property.is_streaming),
Expand Down Expand Up @@ -713,15 +713,15 @@ impl ClusterControllerInner {
let property = worker_property::ActiveModel {
worker_id: Set(worker_id),
parallelism: Set(add_property
.worker_node_parallelism
.parallelism
.try_into()
.expect("invalid parallelism")),
is_streaming: Set(add_property.is_streaming),
is_serving: Set(add_property.is_serving),
is_unschedulable: Set(add_property.is_unschedulable),
internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
label: if r#type == PbWorkerType::ComputeNode {
Set(add_property.label.clone())
Set(add_property.node_label.clone())
} else {
Set(None)
},
Expand Down Expand Up @@ -957,12 +957,12 @@ mod tests {
let parallelism_num = 4_usize;
let worker_count = 5_usize;
let property = AddNodeProperty {
worker_node_parallelism: parallelism_num as _,
parallelism: parallelism_num as _,
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
label: None,
node_label: None,
};
let hosts = mock_worker_hosts_for_test(worker_count);
let mut worker_ids = vec![];
Expand Down Expand Up @@ -1005,7 +1005,7 @@ mod tests {

// re-register existing worker node with larger parallelism and change its serving mode.
let mut new_property = property.clone();
new_property.worker_node_parallelism = (parallelism_num * 2) as _;
new_property.parallelism = (parallelism_num * 2) as _;
new_property.is_serving = false;
cluster_ctl
.add_worker(
Expand Down Expand Up @@ -1049,12 +1049,12 @@ mod tests {
port: 5001,
};
let mut property = AddNodeProperty {
worker_node_parallelism: 4,
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
label: None,
node_label: None,
parallelism: 4,
};
let worker_id = cluster_ctl
.add_worker(
Expand Down
Loading

0 comments on commit 101327d

Please sign in to comment.