diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 462f5ff0256a6..a632445aef7da 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -103,6 +103,20 @@ message WaitEpochCommitResponse { common.Status status = 1; } +// Back pressure +message GetBackPressureRequest {} + +message BackPressureInfo { + uint32 actor_id = 1; + uint32 fragment_id = 2; + uint32 downstream_fragment_id = 3; + double value = 4; +} + +message GetBackPressureResponse { + repeated BackPressureInfo back_pressure_infos = 1; +} + service StreamService { rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); @@ -112,6 +126,7 @@ service StreamService { rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); + rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 2640b505b7873..78572f8f09727 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -16,6 +16,8 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use itertools::Itertools; +use prometheus::core::Collector; +use risingwave_common::config::MetricLevel; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; @@ -23,6 +25,7 @@ use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; +use risingwave_stream::executor::monitor::global_streaming_metrics; use risingwave_stream::executor::Barrier; use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; @@ -238,4 +241,37 @@ impl StreamService for StreamServiceImpl { Ok(Response::new(WaitEpochCommitResponse { status: None })) } + + #[cfg_attr(coverage, coverage(off))] + async fn get_back_pressure( + &self, + _request: Request, + ) -> Result, Status> { + let metric_family = global_streaming_metrics(MetricLevel::Info) + .actor_output_buffer_blocking_duration_ns + .collect(); + let metrics = metric_family.get(0).unwrap().get_metric(); + let mut back_pressure_infos: Vec = Vec::new(); + for label_pairs in metrics { + let mut back_pressure_info = BackPressureInfo::default(); + for label_pair in label_pairs.get_label() { + if label_pair.get_name() == "actor_id" { + back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap(); + } + if label_pair.get_name() == "fragment_id" { + back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap(); + } + if label_pair.get_name() == "downstream_fragment_id" { + back_pressure_info.downstream_fragment_id = + label_pair.get_value().parse::().unwrap(); + } + } + back_pressure_info.value = label_pairs.get_counter().get_value(); + back_pressure_infos.push(back_pressure_info); + } + + Ok(Response::new(GetBackPressureResponse { + back_pressure_infos, + })) + } } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 042993bd5bd4c..d536e5869d693 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -36,6 +36,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; use sea_orm::prelude::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ @@ -369,6 +370,32 @@ impl ClusterController { .await .get_worker_extra_info_by_id(worker_id) } + + pub async fn get_back_pressure(&self) -> MetaResult { + let nodes = self + .inner + .read() + .await + .list_active_serving_workers() + .await + .unwrap(); + let mut back_pressure_infos: Vec = Vec::new(); + for node in nodes { + let client = self.env.stream_client_pool().get(&node).await.unwrap(); + let request = risingwave_pb::stream_service::GetBackPressureRequest {}; + back_pressure_infos.extend( + client + .get_back_pressure(request) + .await + .unwrap() + .back_pressure_infos, + ); + } + + Ok(GetBackPressureResponse { + back_pressure_infos, + }) + } } #[derive(Default, Clone)] diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index c1cdb5ef1ec19..9ed5c1975436c 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -29,6 +29,7 @@ use axum::routing::{get, get_service}; use axum::Router; use hyper::Request; use parking_lot::Mutex; +use risingwave_pb::stream_service::GetBackPressureResponse; use risingwave_rpc_client::ComputeClientPool; use tower::{ServiceBuilder, ServiceExt}; use tower_http::add_extension::AddExtensionLayer; @@ -360,6 +361,24 @@ pub(super) mod handlers { Ok(report) } + + pub async fn get_back_pressure( + // Path(worker_id): Path, + Extension(srv): Extension, + ) -> Result> { + let back_pressure_infos = match &srv.metadata_manager { + MetadataManager::V1(mgr) => { + mgr.cluster_manager.get_back_pressure().await.map_err(err)? + } + MetadataManager::V2(mgr) => mgr + .cluster_controller + .get_back_pressure() + .await + .map_err(err)?, + }; + + Ok(back_pressure_infos.into()) + } } impl DashboardService { @@ -388,6 +407,7 @@ impl DashboardService { "/metrics/actor/back_pressures", get(prometheus::list_prometheus_actor_back_pressure), ) + .route("/metrics/back_pressures", get(get_back_pressure)) .route("/monitor/await_tree/:worker_id", get(dump_await_tree)) .route("/monitor/await_tree/", get(dump_await_tree_all)) .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile)) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 8640088f30193..a8cf96d95a8b5 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -31,6 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -492,6 +493,33 @@ impl ClusterManager { pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> Option { self.core.read().await.get_worker_by_id(worker_id) } + + pub async fn get_back_pressure(&self) -> MetaResult { + let mut core = self.core.write().await; + let mut back_pressure_infos: Vec = Vec::new(); + for worker in core.workers.values_mut() { + if worker.worker_type() != WorkerType::ComputeNode { + continue; + } + let client = self + .env + .stream_client_pool() + .get(&worker.worker_node) + .await + .unwrap(); + let request = risingwave_pb::stream_service::GetBackPressureRequest {}; + back_pressure_infos.extend( + client + .get_back_pressure(request) + .await + .unwrap() + .back_pressure_infos, + ); + } + Ok(GetBackPressureResponse { + back_pressure_infos, + }) + } } /// The cluster info used for scheduling a streaming job. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 1950a27013f3f..ff1758aa20b96 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -915,6 +915,13 @@ mod tests { ) -> std::result::Result, Status> { Ok(Response::new(WaitEpochCommitResponse::default())) } + + async fn get_back_pressure( + &self, + _request: Request, + ) -> std::result::Result, Status> { + Ok(Response::new(GetBackPressureResponse::default())) + } } struct MockServices { diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 3a271b5660bbd..af716fb42a508 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -72,6 +72,7 @@ macro_rules! for_all_stream_rpc { ,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse } ,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } + ,{ 0, get_back_pressure, GetBackPressureRequest, GetBackPressureResponse } } }; }