Skip to content

Commit

Permalink
Remove parallelism, reorder fields, update tests, modify worker node …
Browse files Browse the repository at this point in the history
…info function

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Nov 14, 2024
1 parent f625140 commit d1f87cc
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 25 deletions.
4 changes: 4 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ message WorkerNode {
// It's populated by meta node, when the worker node is added by meta node.
// It's not persistent in meta store.
optional uint64 started_at = 9;

// Moved to `Property` message.
reserved 10;
reserved "parallelism";
}

message Buffer {
Expand Down
4 changes: 2 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,13 @@ service StreamManagerService {
}

// Below for cluster service.

message AddWorkerNodeRequest {
common.WorkerType worker_type = 1;
common.HostAddress host = 2;
reserved 3;
common.WorkerNode.Property property = 4;
reserved 4;
common.WorkerNode.Resource resource = 5;
common.WorkerNode.Property property = 6;
}

message AddWorkerNodeResponse {
Expand Down
8 changes: 2 additions & 6 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,10 @@ mod tests {
host: Some(HostAddr::try_from("127.0.0.1:1234").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
property: Some(Property {
parallelism: 0,
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -429,12 +427,10 @@ mod tests {
host: Some(HostAddr::try_from("127.0.0.1:1235").unwrap().to_protobuf()),
state: worker_node::State::Running as i32,
property: Some(Property {
parallelism: 0,
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
}),
transactional_id: Some(2),
..Default::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
port: host.map(|h| h.port.to_string()),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().into(),
parallelism: worker.parallelism() as i32,
parallelism: if is_compute {
worker.parallelism() as i32
} else {
0
},
is_streaming: if is_compute {
property.map(|p| p.is_streaming)
} else {
Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,7 @@ pub(crate) mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
}),
transactional_id: Some(0),
..Default::default()
Expand All @@ -693,8 +692,7 @@ pub(crate) mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
}),
transactional_id: Some(1),
..Default::default()
Expand All @@ -712,8 +710,7 @@ pub(crate) mod tests {
is_unschedulable: false,
is_serving: true,
is_streaming: true,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
}),
transactional_id: Some(2),
..Default::default()
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,7 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
};
let hosts = mock_worker_hosts_for_test(worker_count);
let mut worker_ids = vec![];
Expand Down Expand Up @@ -1060,9 +1059,8 @@ mod tests {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
parallelism: 4,
..Default::default()
};
let worker_id = cluster_ctl
.add_worker(
Expand Down
6 changes: 2 additions & 4 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ async fn test_release_context_resource() {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
},
Default::default(),
)
Expand Down Expand Up @@ -469,8 +468,7 @@ async fn test_hummock_manager_basic() {
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
..Default::default()
},
Default::default(),
)
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ pub async fn setup_compute_env_with_metric(
is_streaming: true,
is_serving: true,
is_unschedulable: false,
internal_rpc_host_addr: "".to_string(),
node_label: None,
parallelism: fake_parallelism as _,
..Default::default()
},
Default::default(),
)
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,7 @@ impl GlobalStreamManager {
let prev_worker = worker_cache.insert(worker.id, worker.clone());

match prev_worker {
// todo, add label checking in further changes
Some(prev_worker) if prev_worker.parallelism() != worker.parallelism() => {
tracing::info!(worker = worker.id, "worker parallelism changed");
should_trigger = true;
Expand Down

0 comments on commit d1f87cc

Please sign in to comment.