From c75c65adfca92b2146e80abe0e54c21a791ff413 Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 23 Jan 2024 11:01:14 -0800 Subject: [PATCH 1/7] add test grpc --- proto/stream_service.proto | 8 ++++++++ src/compute/src/rpc/service/stream_service.rs | 13 +++++++++++++ src/meta/src/controller/cluster.rs | 18 ++++++++++++++++++ src/meta/src/dashboard/mod.rs | 12 ++++++++++++ src/meta/src/manager/cluster.rs | 16 ++++++++++++++++ src/rpc_client/src/stream_client.rs | 1 + 6 files changed, 68 insertions(+) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 462f5ff0256a6..fb05b74b230e6 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -103,6 +103,13 @@ message WaitEpochCommitResponse { common.Status status = 1; } +// Back pressure +message GetBackPressureRequest { +} +message GetBackPressureResponse { + uint64 back_pressure = 1; +} + service StreamService { rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); @@ -112,6 +119,7 @@ service StreamService { rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse); rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); + rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse); } // TODO: Lifecycle management for actors. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 2640b505b7873..c9e339d1aab36 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use itertools::Itertools; +use risingwave_common::config::MetricLevel; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo; @@ -23,6 +24,7 @@ use risingwave_pb::stream_service::stream_service_server::StreamService; use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; +use risingwave_stream::executor::monitor::global_streaming_metrics; use risingwave_stream::executor::Barrier; use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment}; use thiserror_ext::AsReport; @@ -238,4 +240,15 @@ impl StreamService for StreamServiceImpl { Ok(Response::new(WaitEpochCommitResponse { status: None })) } + + #[cfg_attr(coverage, coverage(off))] + async fn get_back_pressure( + &self, + _request: Request, + ) -> Result, Status> { + let back_pressure = global_streaming_metrics(MetricLevel::Info); + Ok(Response::new(GetBackPressureResponse { + back_pressure: 199, + })) + } } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 042993bd5bd4c..d73db51e7e3a3 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -369,6 +369,24 @@ impl ClusterController { .await .get_worker_extra_info_by_id(worker_id) } + + pub async fn get_back_pressure_rate(&self) -> MetaResult { + let nodes = self + .inner + .read() + .await + .list_active_serving_workers() + .await + .unwrap(); + for node in nodes { + let client = self.env.stream_client_pool().get(&node).await.unwrap(); + let request = risingwave_pb::stream_service::GetBackPressureRequest {}; + let back_pressure = client.get_back_pressure(request).await.unwrap(); + break; + } + + Ok(196) + } } #[derive(Default, Clone)] diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index c1cdb5ef1ec19..4fc5379fe269a 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -360,6 +360,17 @@ pub(super) mod handlers { Ok(report) } + + pub async fn get_back_pressure_rate( + // Path(worker_id): Path, + Extension(srv): Extension, + ) -> Result { + let result = match &srv.metadata_manager { + MetadataManager::V1(mgr) => mgr.cluster_manager.get_back_pressure_rate().await, + MetadataManager::V2(mgr) => mgr.cluster_controller.get_back_pressure_rate().await, + }; + Ok("198".to_string()) + } } impl DashboardService { @@ -388,6 +399,7 @@ impl DashboardService { "/metrics/actor/back_pressures", get(prometheus::list_prometheus_actor_back_pressure), ) + .route("/metrics/back_pressures_rate", get(get_back_pressure_rate)) .route("/monitor/await_tree/:worker_id", get(dump_await_tree)) .route("/monitor/await_tree/", get(dump_await_tree_all)) .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile)) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 8640088f30193..bc6012f8f52b5 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -492,6 +492,22 @@ impl ClusterManager { pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> Option { self.core.read().await.get_worker_by_id(worker_id) } + + pub async fn get_back_pressure_rate(&self) -> MetaResult { + let mut core = self.core.write().await; + for worker in core.workers.values_mut() { + let client = self + .env + .stream_client_pool() + .get(&worker.worker_node) + .await + .unwrap(); + let request = risingwave_pb::stream_service::GetBackPressureRequest {}; + let result = client.get_back_pressure(request).await.unwrap(); + break; + } + Ok(197) + } } /// The cluster info used for scheduling a streaming job. diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index 3a271b5660bbd..af716fb42a508 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -72,6 +72,7 @@ macro_rules! for_all_stream_rpc { ,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse } ,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } + ,{ 0, get_back_pressure, GetBackPressureRequest, GetBackPressureResponse } } }; } From 2925031660526a9e57535a963055bc2094aaa37f Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 24 Jan 2024 20:35:02 -0800 Subject: [PATCH 2/7] add client function logic --- proto/stream_service.proto | 10 ++++++- src/compute/src/rpc/service/stream_service.rs | 27 +++++++++++++++++-- src/meta/src/controller/cluster.rs | 5 ++-- src/meta/src/manager/cluster.rs | 16 ++++++++--- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index fb05b74b230e6..2cd8f29364eea 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -106,8 +106,16 @@ message WaitEpochCommitResponse { // Back pressure message GetBackPressureRequest { } + +message BackPressureInfo { + string actor_id = 1; + string fragment_id = 2; + string downstream_fragment_id = 3; + double value = 4; +} + message GetBackPressureResponse { - uint64 back_pressure = 1; + repeated BackPressureInfo back_pressure_infos = 1; } service StreamService { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index c9e339d1aab36..b20d262a49ccf 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use itertools::Itertools; +use prometheus::core::Collector; use risingwave_common::config::MetricLevel; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; @@ -246,9 +247,31 @@ impl StreamService for StreamServiceImpl { &self, _request: Request, ) -> Result, Status> { - let back_pressure = global_streaming_metrics(MetricLevel::Info); + let metric_family = global_streaming_metrics(MetricLevel::Info) + .actor_output_buffer_blocking_duration_ns + .collect(); + let metrics = metric_family.get(0).unwrap().get_metric(); + let mut back_pressure_infos: Vec = Vec::new(); + for label_pairs in metrics { + let mut back_pressure_info = BackPressureInfo::default(); + for label_pair in label_pairs.get_label() { + if label_pair.get_name() == "actor_id" { + back_pressure_info.actor_id = label_pair.get_value().to_string(); + } + if label_pair.get_name() == "fragment_id" { + back_pressure_info.fragment_id = label_pair.get_value().to_string(); + } + if label_pair.get_name() == "downstream_fragment_id" { + back_pressure_info.downstream_fragment_id = label_pair.get_value().to_string(); + } + } + back_pressure_info.value = label_pairs.get_counter().get_value(); + back_pressure_infos.push(back_pressure_info); + } + + Ok(Response::new(GetBackPressureResponse { - back_pressure: 199, + back_pressure_infos, })) } } diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index d73db51e7e3a3..b8b1fb7de4ef3 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -36,6 +36,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use risingwave_pb::stream_service::BackPressureInfo; use sea_orm::prelude::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ @@ -378,11 +379,11 @@ impl ClusterController { .list_active_serving_workers() .await .unwrap(); + let mut back_pressures : Vec = Vec::new(); for node in nodes { let client = self.env.stream_client_pool().get(&node).await.unwrap(); let request = risingwave_pb::stream_service::GetBackPressureRequest {}; - let back_pressure = client.get_back_pressure(request).await.unwrap(); - break; + back_pressures.extend(client.get_back_pressure(request).await.unwrap().back_pressure_infos); } Ok(196) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index bc6012f8f52b5..a3c28af7ef14d 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -31,6 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use risingwave_pb::stream_service::BackPressureInfo; use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -493,9 +494,13 @@ impl ClusterManager { self.core.read().await.get_worker_by_id(worker_id) } - pub async fn get_back_pressure_rate(&self) -> MetaResult { + pub async fn get_back_pressure_rate(&self) -> MetaResult { let mut core = self.core.write().await; + let mut back_pressures: Vec = Vec::new(); for worker in core.workers.values_mut() { + if worker.worker_type() != WorkerType::ComputeNode { + continue; + } let client = self .env .stream_client_pool() @@ -503,8 +508,13 @@ impl ClusterManager { .await .unwrap(); let request = risingwave_pb::stream_service::GetBackPressureRequest {}; - let result = client.get_back_pressure(request).await.unwrap(); - break; + back_pressures.extend( + client + .get_back_pressure(request) + .await + .unwrap() + .back_pressure_infos, + ); } Ok(197) } From 6c56a3a4c46e344f1be83870af53157eeb08deff Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 24 Jan 2024 22:15:25 -0800 Subject: [PATCH 3/7] add server logic --- src/meta/src/controller/cluster.rs | 18 +++++++++++++----- src/meta/src/dashboard/mod.rs | 20 +++++++++++++++----- src/meta/src/manager/cluster.rs | 12 +++++++----- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index b8b1fb7de4ef3..64181840f59b1 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -36,7 +36,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; -use risingwave_pb::stream_service::BackPressureInfo; +use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; use sea_orm::prelude::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ @@ -371,7 +371,7 @@ impl ClusterController { .get_worker_extra_info_by_id(worker_id) } - pub async fn get_back_pressure_rate(&self) -> MetaResult { + pub async fn get_back_pressure_rate(&self) -> MetaResult { let nodes = self .inner .read() @@ -379,14 +379,22 @@ impl ClusterController { .list_active_serving_workers() .await .unwrap(); - let mut back_pressures : Vec = Vec::new(); + let mut back_pressure_infos: Vec = Vec::new(); for node in nodes { let client = self.env.stream_client_pool().get(&node).await.unwrap(); let request = risingwave_pb::stream_service::GetBackPressureRequest {}; - back_pressures.extend(client.get_back_pressure(request).await.unwrap().back_pressure_infos); + back_pressure_infos.extend( + client + .get_back_pressure(request) + .await + .unwrap() + .back_pressure_infos, + ); } - Ok(196) + Ok(GetBackPressureResponse { + back_pressure_infos, + }) } } diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 4fc5379fe269a..de5f41cb46053 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -29,6 +29,7 @@ use axum::routing::{get, get_service}; use axum::Router; use hyper::Request; use parking_lot::Mutex; +use risingwave_pb::stream_service::GetBackPressureResponse; use risingwave_rpc_client::ComputeClientPool; use tower::{ServiceBuilder, ServiceExt}; use tower_http::add_extension::AddExtensionLayer; @@ -364,12 +365,21 @@ pub(super) mod handlers { pub async fn get_back_pressure_rate( // Path(worker_id): Path, Extension(srv): Extension, - ) -> Result { - let result = match &srv.metadata_manager { - MetadataManager::V1(mgr) => mgr.cluster_manager.get_back_pressure_rate().await, - MetadataManager::V2(mgr) => mgr.cluster_controller.get_back_pressure_rate().await, + ) -> Result> { + let back_pressure_infos = match &srv.metadata_manager { + MetadataManager::V1(mgr) => mgr + .cluster_manager + .get_back_pressure_rate() + .await + .map_err(err)?, + MetadataManager::V2(mgr) => mgr + .cluster_controller + .get_back_pressure_rate() + .await + .map_err(err)?, }; - Ok("198".to_string()) + + Ok(back_pressure_infos.into()) } } diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index a3c28af7ef14d..a25a7ccc640bf 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -31,7 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; -use risingwave_pb::stream_service::BackPressureInfo; +use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; @@ -494,9 +494,9 @@ impl ClusterManager { self.core.read().await.get_worker_by_id(worker_id) } - pub async fn get_back_pressure_rate(&self) -> MetaResult { + pub async fn get_back_pressure_rate(&self) -> MetaResult { let mut core = self.core.write().await; - let mut back_pressures: Vec = Vec::new(); + let mut back_pressure_infos: Vec = Vec::new(); for worker in core.workers.values_mut() { if worker.worker_type() != WorkerType::ComputeNode { continue; @@ -508,7 +508,7 @@ impl ClusterManager { .await .unwrap(); let request = risingwave_pb::stream_service::GetBackPressureRequest {}; - back_pressures.extend( + back_pressure_infos.extend( client .get_back_pressure(request) .await @@ -516,7 +516,9 @@ impl ClusterManager { .back_pressure_infos, ); } - Ok(197) + Ok(GetBackPressureResponse { + back_pressure_infos, + }) } } From a8a73282186231129c21807d6d82908c32a7bf25 Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 24 Jan 2024 22:33:06 -0800 Subject: [PATCH 4/7] format --- src/compute/src/rpc/service/stream_service.rs | 1 - src/meta/src/controller/cluster.rs | 2 +- src/meta/src/dashboard/mod.rs | 14 ++++++-------- src/meta/src/manager/cluster.rs | 2 +- 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index b20d262a49ccf..88966bfa635c1 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -269,7 +269,6 @@ impl StreamService for StreamServiceImpl { back_pressure_infos.push(back_pressure_info); } - Ok(Response::new(GetBackPressureResponse { back_pressure_infos, })) diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 64181840f59b1..d536e5869d693 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -371,7 +371,7 @@ impl ClusterController { .get_worker_extra_info_by_id(worker_id) } - pub async fn get_back_pressure_rate(&self) -> MetaResult { + pub async fn get_back_pressure(&self) -> MetaResult { let nodes = self .inner .read() diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index de5f41cb46053..12b4cdb33f1e0 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -362,19 +362,17 @@ pub(super) mod handlers { Ok(report) } - pub async fn get_back_pressure_rate( + pub async fn get_back_pressure( // Path(worker_id): Path, Extension(srv): Extension, ) -> Result> { let back_pressure_infos = match &srv.metadata_manager { - MetadataManager::V1(mgr) => mgr - .cluster_manager - .get_back_pressure_rate() - .await - .map_err(err)?, + MetadataManager::V1(mgr) => { + mgr.cluster_manager.get_back_pressure().await.map_err(err)? + } MetadataManager::V2(mgr) => mgr .cluster_controller - .get_back_pressure_rate() + .get_back_pressure() .await .map_err(err)?, }; @@ -409,7 +407,7 @@ impl DashboardService { "/metrics/actor/back_pressures", get(prometheus::list_prometheus_actor_back_pressure), ) - .route("/metrics/back_pressures_rate", get(get_back_pressure_rate)) + .route("/metrics/back_pressures_rate", get(get_back_pressure)) .route("/monitor/await_tree/:worker_id", get(dump_await_tree)) .route("/monitor/await_tree/", get(dump_await_tree_all)) .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile)) diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index a25a7ccc640bf..a8cf96d95a8b5 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -494,7 +494,7 @@ impl ClusterManager { self.core.read().await.get_worker_by_id(worker_id) } - pub async fn get_back_pressure_rate(&self) -> MetaResult { + pub async fn get_back_pressure(&self) -> MetaResult { let mut core = self.core.write().await; let mut back_pressure_infos: Vec = Vec::new(); for worker in core.workers.values_mut() { From b17c9c81f80fb9d294d9ddd5ec50014451325ee8 Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 24 Jan 2024 22:41:47 -0800 Subject: [PATCH 5/7] change path name --- src/meta/src/dashboard/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 12b4cdb33f1e0..9ed5c1975436c 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -407,7 +407,7 @@ impl DashboardService { "/metrics/actor/back_pressures", get(prometheus::list_prometheus_actor_back_pressure), ) - .route("/metrics/back_pressures_rate", get(get_back_pressure)) + .route("/metrics/back_pressures", get(get_back_pressure)) .route("/monitor/await_tree/:worker_id", get(dump_await_tree)) .route("/monitor/await_tree/", get(dump_await_tree_all)) .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile)) From 5c27fc471cfd8df4d5a11732efa76fea236ee8df Mon Sep 17 00:00:00 2001 From: yufansong Date: Thu, 25 Jan 2024 11:25:23 -0800 Subject: [PATCH 6/7] change from string to int --- proto/stream_service.proto | 6 +++--- src/compute/src/rpc/service/stream_service.rs | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 2cd8f29364eea..35b38eb50da8d 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -108,9 +108,9 @@ message GetBackPressureRequest { } message BackPressureInfo { - string actor_id = 1; - string fragment_id = 2; - string downstream_fragment_id = 3; + uint32 actor_id = 1; + uint32 fragment_id = 2; + uint32 downstream_fragment_id = 3; double value = 4; } diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 88966bfa635c1..78572f8f09727 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -256,13 +256,14 @@ impl StreamService for StreamServiceImpl { let mut back_pressure_info = BackPressureInfo::default(); for label_pair in label_pairs.get_label() { if label_pair.get_name() == "actor_id" { - back_pressure_info.actor_id = label_pair.get_value().to_string(); + back_pressure_info.actor_id = label_pair.get_value().parse::().unwrap(); } if label_pair.get_name() == "fragment_id" { - back_pressure_info.fragment_id = label_pair.get_value().to_string(); + back_pressure_info.fragment_id = label_pair.get_value().parse::().unwrap(); } if label_pair.get_name() == "downstream_fragment_id" { - back_pressure_info.downstream_fragment_id = label_pair.get_value().to_string(); + back_pressure_info.downstream_fragment_id = + label_pair.get_value().parse::().unwrap(); } } back_pressure_info.value = label_pairs.get_counter().get_value(); From 7c2f859bcbb36804d80b07621e5a88e575f9bbb9 Mon Sep 17 00:00:00 2001 From: yufansong Date: Thu, 25 Jan 2024 11:32:14 -0800 Subject: [PATCH 7/7] format code --- proto/stream_service.proto | 11 +++++------ src/meta/src/stream/stream_manager.rs | 7 +++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 35b38eb50da8d..a632445aef7da 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -104,14 +104,13 @@ message WaitEpochCommitResponse { } // Back pressure -message GetBackPressureRequest { -} +message GetBackPressureRequest {} message BackPressureInfo { - uint32 actor_id = 1; - uint32 fragment_id = 2; - uint32 downstream_fragment_id = 3; - double value = 4; + uint32 actor_id = 1; + uint32 fragment_id = 2; + uint32 downstream_fragment_id = 3; + double value = 4; } message GetBackPressureResponse { diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 1950a27013f3f..ff1758aa20b96 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -915,6 +915,13 @@ mod tests { ) -> std::result::Result, Status> { Ok(Response::new(WaitEpochCommitResponse::default())) } + + async fn get_back_pressure( + &self, + _request: Request, + ) -> std::result::Result, Status> { + Ok(Response::new(GetBackPressureResponse::default())) + } } struct MockServices {