diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto index 7c7769da6b7ff..aa9880917d725 100644 --- a/proto/monitor_service.proto +++ b/proto/monitor_service.proto @@ -48,10 +48,25 @@ message AnalyzeHeapResponse { bytes result = 1; } +// Back pressure +message GetBackPressureRequest {} + +message BackPressureInfo { + uint32 actor_id = 1; + uint32 fragment_id = 2; + uint32 downstream_fragment_id = 3; + double value = 4; +} + +message GetBackPressureResponse { + repeated BackPressureInfo back_pressure_infos = 1; +} + service MonitorService { rpc StackTrace(StackTraceRequest) returns (StackTraceResponse); rpc Profiling(ProfilingRequest) returns (ProfilingResponse); rpc HeapProfiling(HeapProfilingRequest) returns (HeapProfilingResponse); rpc ListHeapProfiling(ListHeapProfilingRequest) returns (ListHeapProfilingResponse); rpc AnalyzeHeap(AnalyzeHeapRequest) returns (AnalyzeHeapResponse); + rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse); } diff --git a/proto/stream_service.proto b/proto/stream_service.proto index a632445aef7da..462f5ff0256a6 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -103,20 +103,6 @@ message WaitEpochCommitResponse { common.Status status = 1; } -// Back pressure -message GetBackPressureRequest {} - -message BackPressureInfo { - uint32 actor_id = 1; - uint32 fragment_id = 2; - uint32 downstream_fragment_id = 3; - double value = 4; -} - -message GetBackPressureResponse { - repeated BackPressureInfo back_pressure_infos = 1; -} - service StreamService { rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); @@ -126,7 +112,6 @@ service StreamService { rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); - rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 82c254e58d62d..b832b7827adda 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -19,15 +19,18 @@ use std::sync::Arc; use std::time::Duration; use itertools::Itertools; -use risingwave_common::config::ServerConfig; +use prometheus::core::Collector; +use risingwave_common::config::{MetricLevel, ServerConfig}; use risingwave_common_heap_profiling::{AUTO_DUMP_SUFFIX, COLLAPSED_SUFFIX, MANUALLY_DUMP_SUFFIX}; use risingwave_pb::monitor_service::monitor_service_server::MonitorService; use risingwave_pb::monitor_service::{ - AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse, - ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, - StackTraceRequest, StackTraceResponse, + AnalyzeHeapRequest, AnalyzeHeapResponse, BackPressureInfo, GetBackPressureRequest, + GetBackPressureResponse, HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest, + ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, + StackTraceResponse, }; use risingwave_rpc_client::error::ToTonicStatus; +use risingwave_stream::executor::monitor::global_streaming_metrics; use risingwave_stream::task::LocalStreamManager; use thiserror_ext::AsReport; use tonic::{Code, Request, Response, Status}; @@ -229,6 +232,39 @@ impl MonitorService for MonitorServiceImpl { let file = fs::read(Path::new(&collapsed_path_str))?; Ok(Response::new(AnalyzeHeapResponse { result: file })) } + + #[cfg_attr(coverage, coverage(off))] + async fn get_back_pressure( + &self, + _request: Request, + ) -> Result, Status> { + let metric_family = global_streaming_metrics(MetricLevel::Info) + .actor_output_buffer_blocking_duration_ns + .collect(); + let metrics = metric_family.get(0).unwrap().get_metric(); + let mut back_pressure_infos: Vec = Vec::new(); + for label_pairs in metrics { + let mut back_pressure_info = BackPressureInfo::default(); + for label_pair in label_pairs.get_label() { + if label_pair.get_name() == "actor_id" { + back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap(); + } + if label_pair.get_name() == "fragment_id" { + back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap(); + } + if label_pair.get_name() == "downstream_fragment_id" { + back_pressure_info.downstream_fragment_id = + label_pair.get_value().parse::().unwrap(); + } + } + back_pressure_info.value = label_pairs.get_counter().get_value(); + back_pressure_infos.push(back_pressure_info); + } + + Ok(Response::new(GetBackPressureResponse { + back_pressure_infos, + })) + } } pub use grpc_middleware::*; diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 78572f8f09727..2640b505b7873 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -16,8 +16,6 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use itertools::Itertools; -use prometheus::core::Collector; -use risingwave_common::config::MetricLevel; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; @@ -25,7 +23,6 @@ use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; -use risingwave_stream::executor::monitor::global_streaming_metrics; use risingwave_stream::executor::Barrier; use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; @@ -241,37 +238,4 @@ impl StreamService for StreamServiceImpl { Ok(Response::new(WaitEpochCommitResponse { status: None })) } - - #[cfg_attr(coverage, coverage(off))] - async fn get_back_pressure( - &self, - _request: Request, - ) -> Result, Status> { - let metric_family = global_streaming_metrics(MetricLevel::Info) - .actor_output_buffer_blocking_duration_ns - .collect(); - let metrics = metric_family.get(0).unwrap().get_metric(); - let mut back_pressure_infos: Vec = Vec::new(); - for label_pairs in metrics { - let mut back_pressure_info = BackPressureInfo::default(); - for label_pair in label_pairs.get_label() { - if label_pair.get_name() == "actor_id" { - back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap(); - } - if label_pair.get_name() == "fragment_id" { - back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap(); - } - if label_pair.get_name() == "downstream_fragment_id" { - back_pressure_info.downstream_fragment_id = - label_pair.get_value().parse::().unwrap(); - } - } - back_pressure_info.value = label_pairs.get_counter().get_value(); - back_pressure_infos.push(back_pressure_info); - } - - Ok(Response::new(GetBackPressureResponse { - back_pressure_infos, - })) - } } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index d536e5869d693..042993bd5bd4c 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -36,7 +36,6 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; -use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; use sea_orm::prelude::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ @@ -370,32 +369,6 @@ impl ClusterController { .await .get_worker_extra_info_by_id(worker_id) } - - pub async fn get_back_pressure(&self) -> MetaResult { - let nodes = self - .inner - .read() - .await - .list_active_serving_workers() - .await - .unwrap(); - let mut back_pressure_infos: Vec = Vec::new(); - for node in nodes { - let client = self.env.stream_client_pool().get(&node).await.unwrap(); - let request = risingwave_pb::stream_service::GetBackPressureRequest {}; - back_pressure_infos.extend( - client - .get_back_pressure(request) - .await - .unwrap() - .back_pressure_infos, - ); - } - - Ok(GetBackPressureResponse { - back_pressure_infos, - }) - } } #[derive(Default, Clone)] diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 9ed5c1975436c..814e14d42af9e 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -29,7 +29,6 @@ use axum::routing::{get, get_service}; use axum::Router; use hyper::Request; use parking_lot::Mutex; -use risingwave_pb::stream_service::GetBackPressureResponse; use risingwave_rpc_client::ComputeClientPool; use tower::{ServiceBuilder, ServiceExt}; use tower_http::add_extension::AddExtensionLayer; @@ -56,6 +55,7 @@ pub type Service = Arc; pub(super) mod handlers { use anyhow::Context; use axum::Json; + use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; use risingwave_common_heap_profiling::COLLAPSED_SUFFIX; @@ -64,7 +64,8 @@ pub(super) mod handlers { use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::{ActorLocation, PbTableFragments}; use risingwave_pb::monitor_service::{ - HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse, + GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse, + StackTraceResponse, }; use serde_json::json; use thiserror_ext::AsReport; @@ -363,21 +364,37 @@ pub(super) mod handlers { } pub async fn get_back_pressure( - // Path(worker_id): Path, Extension(srv): Extension, ) -> Result> { - let back_pressure_infos = match &srv.metadata_manager { - MetadataManager::V1(mgr) => { - mgr.cluster_manager.get_back_pressure().await.map_err(err)? - } - MetadataManager::V2(mgr) => mgr - .cluster_controller - .get_back_pressure() - .await - .map_err(err)?, - }; + let worker_nodes = srv + .metadata_manager + .list_active_streaming_compute_nodes() + .await + .map_err(err)?; - Ok(back_pressure_infos.into()) + let mut futures = Vec::new(); + + for worker_node in worker_nodes { + let client = srv.compute_clients.get(&worker_node).await.map_err(err)?; + let client = Arc::new(client); + let fut = async move { + let result = client.get_back_pressure().await.map_err(err)?; + Ok::<_, DashboardError>(result) + }; + futures.push(fut); + } + let results = join_all(futures).await; + + let mut all = GetBackPressureResponse::default(); + + for result in results { + let result = result + .map_err(|_| anyhow!("Failed to get back pressure")) + .map_err(err)?; + all.back_pressure_infos.extend(result.back_pressure_infos); + } + + Ok(all.into()) } } diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index a8cf96d95a8b5..8640088f30193 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -31,7 +31,6 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; -use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -493,33 +492,6 @@ impl ClusterManager { pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> Option { self.core.read().await.get_worker_by_id(worker_id) } - - pub async fn get_back_pressure(&self) -> MetaResult { - let mut core = self.core.write().await; - let mut back_pressure_infos: Vec = Vec::new(); - for worker in core.workers.values_mut() { - if worker.worker_type() != WorkerType::ComputeNode { - continue; - } - let client = self - .env - .stream_client_pool() - .get(&worker.worker_node) - .await - .unwrap(); - let request = risingwave_pb::stream_service::GetBackPressureRequest {}; - back_pressure_infos.extend( - client - .get_back_pressure(request) - .await - .unwrap() - .back_pressure_infos, - ); - } - Ok(GetBackPressureResponse { - back_pressure_infos, - }) - } } /// The cluster info used for scheduling a streaming job. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index ff1758aa20b96..1950a27013f3f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -915,13 +915,6 @@ mod tests { ) -> std::result::Result, Status> { Ok(Response::new(WaitEpochCommitResponse::default())) } - - async fn get_back_pressure( - &self, - _request: Request, - ) -> std::result::Result, Status> { - Ok(Response::new(GetBackPressureResponse::default())) - } } struct MockServices { diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 9d37625b20eb4..8e96f7a81702d 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -27,9 +27,10 @@ use risingwave_pb::compute::config_service_client::ConfigServiceClient; use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse}; use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient; use risingwave_pb::monitor_service::{ - AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse, - ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, - StackTraceRequest, StackTraceResponse, + AnalyzeHeapRequest, AnalyzeHeapResponse, GetBackPressureRequest, GetBackPressureResponse, + HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest, + ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, + StackTraceResponse, }; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient; @@ -197,6 +198,15 @@ impl ComputeClient { .into_inner()) } + pub async fn get_back_pressure(&self) -> Result { + Ok(self + .monitor_client + .to_owned() + .get_back_pressure(GetBackPressureRequest::default()) + .await? + .into_inner()) + } + pub async fn profile(&self, sleep_s: u64) -> Result { Ok(self .monitor_client diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index af716fb42a508..3a271b5660bbd 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -72,7 +72,6 @@ macro_rules! for_all_stream_rpc { ,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse } ,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } - ,{ 0, get_back_pressure, GetBackPressureRequest, GetBackPressureResponse } } }; } diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs index d72e3de4f79ee..7990b2810f536 100644 --- a/src/storage/compactor/src/rpc.rs +++ b/src/storage/compactor/src/rpc.rs @@ -21,9 +21,10 @@ use risingwave_pb::compactor::{ }; use risingwave_pb::monitor_service::monitor_service_server::MonitorService; use risingwave_pb::monitor_service::{ - AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse, - ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, - StackTraceRequest, StackTraceResponse, + AnalyzeHeapRequest, AnalyzeHeapResponse, GetBackPressureRequest, GetBackPressureResponse, + HeapProfilingRequest, HeapProfilingResponse, ListHeapProfilingRequest, + ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse, StackTraceRequest, + StackTraceResponse, }; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; @@ -133,4 +134,13 @@ impl MonitorService for MonitorServiceImpl { "Heap profiling unimplemented in compactor", )) } + + async fn get_back_pressure( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "Get Back Pressure unimplemented in compactor", + )) + } }