Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): add get back pressure RPC for UI dashboard #14790

Merged
merged 7 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}

// Back pressure
message GetBackPressureRequest {}

message BackPressureInfo {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 downstream_fragment_id = 3;
double value = 4;
}

message GetBackPressureResponse {
repeated BackPressureInfo back_pressure_infos = 1;
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
Expand All @@ -112,6 +126,7 @@ service StreamService {
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's better to put it in MonitorService.

}

// TODO: Lifecycle management for actors.
36 changes: 36 additions & 0 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use itertools::Itertools;
use prometheus::core::Collector;
use risingwave_common::config::MetricLevel;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -238,4 +241,37 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, coverage(off))]
async fn get_back_pressure(
&self,
_request: Request<GetBackPressureRequest>,
) -> Result<Response<GetBackPressureResponse>, Status> {
let metric_family = global_streaming_metrics(MetricLevel::Info)
.actor_output_buffer_blocking_duration_ns
.collect();
let metrics = metric_family.get(0).unwrap().get_metric();
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for label_pairs in metrics {
let mut back_pressure_info = BackPressureInfo::default();
for label_pair in label_pairs.get_label() {
if label_pair.get_name() == "actor_id" {
back_pressure_info.actor_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "fragment_id" {
back_pressure_info.fragment_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "downstream_fragment_id" {
back_pressure_info.downstream_fragment_id =
label_pair.get_value().parse::<u32>().unwrap();
}
}
back_pressure_info.value = label_pairs.get_counter().get_value();
back_pressure_infos.push(back_pressure_info);
}

Ok(Response::new(GetBackPressureResponse {
back_pressure_infos,
}))
}
}
27 changes: 27 additions & 0 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse};
use sea_orm::prelude::Expr;
use sea_orm::ActiveValue::Set;
use sea_orm::{
Expand Down Expand Up @@ -369,6 +370,32 @@ impl ClusterController {
.await
.get_worker_extra_info_by_id(worker_id)
}

pub async fn get_back_pressure(&self) -> MetaResult<GetBackPressureResponse> {
let nodes = self
.inner
.read()
.await
.list_active_serving_workers()
yufansong marked this conversation as resolved.
Show resolved Hide resolved
.await
.unwrap();
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for node in nodes {
let client = self.env.stream_client_pool().get(&node).await.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can perform RPC calls concurrently.

let request = risingwave_pb::stream_service::GetBackPressureRequest {};
back_pressure_infos.extend(
client
.get_back_pressure(request)
.await
.unwrap()
yufansong marked this conversation as resolved.
Show resolved Hide resolved
.back_pressure_infos,
);
}

Ok(GetBackPressureResponse {
back_pressure_infos,
})
}
}

#[derive(Default, Clone)]
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use axum::routing::{get, get_service};
use axum::Router;
use hyper::Request;
use parking_lot::Mutex;
use risingwave_pb::stream_service::GetBackPressureResponse;
use risingwave_rpc_client::ComputeClientPool;
use tower::{ServiceBuilder, ServiceExt};
use tower_http::add_extension::AddExtensionLayer;
Expand Down Expand Up @@ -360,6 +361,24 @@ pub(super) mod handlers {

Ok(report)
}

pub async fn get_back_pressure(
// Path(worker_id): Path<WorkerId>,
Extension(srv): Extension<Service>,
) -> Result<Json<GetBackPressureResponse>> {
let back_pressure_infos = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can list all streaming worker nodes directly here by metadata manager, then we can call the RPC here and avoid to add interface for worker manager and controller.

mgr.cluster_manager.get_back_pressure().await.map_err(err)?
}
MetadataManager::V2(mgr) => mgr
.cluster_controller
.get_back_pressure()
.await
.map_err(err)?,
};

Ok(back_pressure_infos.into())
}
}

impl DashboardService {
Expand Down Expand Up @@ -388,6 +407,7 @@ impl DashboardService {
"/metrics/actor/back_pressures",
get(prometheus::list_prometheus_actor_back_pressure),
)
.route("/metrics/back_pressures", get(get_back_pressure))
.route("/monitor/await_tree/:worker_id", get(dump_await_tree))
.route("/monitor/await_tree/", get(dump_await_tree_all))
.route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
Expand Down
28 changes: 28 additions & 0 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::Sender;
use tokio::sync::{RwLock, RwLockReadGuard};
Expand Down Expand Up @@ -492,6 +493,33 @@ impl ClusterManager {
pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> Option<Worker> {
self.core.read().await.get_worker_by_id(worker_id)
}

pub async fn get_back_pressure(&self) -> MetaResult<GetBackPressureResponse> {
let mut core = self.core.write().await;
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for worker in core.workers.values_mut() {
if worker.worker_type() != WorkerType::ComputeNode {
yufansong marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
let client = self
.env
.stream_client_pool()
.get(&worker.worker_node)
.await
.unwrap();
let request = risingwave_pb::stream_service::GetBackPressureRequest {};
back_pressure_infos.extend(
client
.get_back_pressure(request)
.await
.unwrap()
.back_pressure_infos,
);
}
Ok(GetBackPressureResponse {
back_pressure_infos,
})
}
}

/// The cluster info used for scheduling a streaming job.
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ mod tests {
) -> std::result::Result<Response<WaitEpochCommitResponse>, Status> {
Ok(Response::new(WaitEpochCommitResponse::default()))
}

async fn get_back_pressure(
&self,
_request: Request<GetBackPressureRequest>,
) -> std::result::Result<Response<GetBackPressureResponse>, Status> {
Ok(Response::new(GetBackPressureResponse::default()))
}
}

struct MockServices {
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ macro_rules! for_all_stream_rpc {
,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse }
,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse }
,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
,{ 0, get_back_pressure, GetBackPressureRequest, GetBackPressureResponse }
}
};
}
Expand Down
Loading