-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(meta): add get back pressure RPC for UI dashboard #14790
Changes from 5 commits
c75c65a
2925031
6c56a3a
a8a7328
b17c9c8
5c27fc4
7c2f859
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,6 +103,21 @@ message WaitEpochCommitResponse { | |
common.Status status = 1; | ||
} | ||
|
||
// Back pressure | ||
message GetBackPressureRequest { | ||
} | ||
|
||
message BackPressureInfo { | ||
string actor_id = 1; | ||
string fragment_id = 2; | ||
string downstream_fragment_id = 3; | ||
double value = 4; | ||
} | ||
|
||
message GetBackPressureResponse { | ||
repeated BackPressureInfo back_pressure_infos = 1; | ||
} | ||
|
||
service StreamService { | ||
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); | ||
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); | ||
|
@@ -112,6 +127,7 @@ service StreamService { | |
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); | ||
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); | ||
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); | ||
rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it's better to put it in |
||
} | ||
|
||
// TODO: Lifecycle management for actors. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; | |
use risingwave_pb::meta::heartbeat_request; | ||
use risingwave_pb::meta::subscribe_response::{Info, Operation}; | ||
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; | ||
use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; | ||
use sea_orm::prelude::Expr; | ||
use sea_orm::ActiveValue::Set; | ||
use sea_orm::{ | ||
|
@@ -369,6 +370,32 @@ impl ClusterController { | |
.await | ||
.get_worker_extra_info_by_id(worker_id) | ||
} | ||
|
||
pub async fn get_back_pressure(&self) -> MetaResult<GetBackPressureResponse> { | ||
let nodes = self | ||
.inner | ||
.read() | ||
.await | ||
.list_active_serving_workers() | ||
yufansong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.await | ||
.unwrap(); | ||
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new(); | ||
for node in nodes { | ||
let client = self.env.stream_client_pool().get(&node).await.unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can perform RPC calls concurrently. |
||
let request = risingwave_pb::stream_service::GetBackPressureRequest {}; | ||
back_pressure_infos.extend( | ||
client | ||
.get_back_pressure(request) | ||
.await | ||
.unwrap() | ||
yufansong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.back_pressure_infos, | ||
); | ||
} | ||
|
||
Ok(GetBackPressureResponse { | ||
back_pressure_infos, | ||
}) | ||
} | ||
} | ||
|
||
#[derive(Default, Clone)] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ use axum::routing::{get, get_service}; | |
use axum::Router; | ||
use hyper::Request; | ||
use parking_lot::Mutex; | ||
use risingwave_pb::stream_service::GetBackPressureResponse; | ||
use risingwave_rpc_client::ComputeClientPool; | ||
use tower::{ServiceBuilder, ServiceExt}; | ||
use tower_http::add_extension::AddExtensionLayer; | ||
|
@@ -360,6 +361,24 @@ pub(super) mod handlers { | |
|
||
Ok(report) | ||
} | ||
|
||
pub async fn get_back_pressure( | ||
// Path(worker_id): Path<WorkerId>, | ||
Extension(srv): Extension<Service>, | ||
) -> Result<Json<GetBackPressureResponse>> { | ||
let back_pressure_infos = match &srv.metadata_manager { | ||
MetadataManager::V1(mgr) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can list all streaming worker nodes directly here by metadata manager, then we can call the RPC here and avoid to add interface for worker manager and controller. |
||
mgr.cluster_manager.get_back_pressure().await.map_err(err)? | ||
} | ||
MetadataManager::V2(mgr) => mgr | ||
.cluster_controller | ||
.get_back_pressure() | ||
.await | ||
.map_err(err)?, | ||
}; | ||
|
||
Ok(back_pressure_infos.into()) | ||
} | ||
} | ||
|
||
impl DashboardService { | ||
|
@@ -388,6 +407,7 @@ impl DashboardService { | |
"/metrics/actor/back_pressures", | ||
get(prometheus::list_prometheus_actor_back_pressure), | ||
) | ||
.route("/metrics/back_pressures", get(get_back_pressure)) | ||
.route("/monitor/await_tree/:worker_id", get(dump_await_tree)) | ||
.route("/monitor/await_tree/", get(dump_await_tree_all)) | ||
.route("/monitor/dump_heap_profile/:worker_id", get(heap_profile)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data types can be integers.
We use string in Prometheus because Prometheus require all labels to be string. But sicne this interface is particularly designed for back-pressire (as it's named
BackPressureInfo
), it's not that case.