From 0562b8d1793b128d22982396382f9087d06c65fa Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 29 Mar 2024 11:54:37 +0800 Subject: [PATCH 1/6] add heartbeat between frontend and compute --- proto/task_service.proto | 9 +++++ src/batch/src/rpc/service/task_service.rs | 10 +++++- src/frontend/src/session.rs | 40 ++++++++++++++++++++++- src/rpc_client/src/compute_client.rs | 13 ++++++-- 4 files changed, 68 insertions(+), 4 deletions(-) diff --git a/proto/task_service.proto b/proto/task_service.proto index 121d189c923df..e4fd5b5dab53d 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -68,11 +68,20 @@ message ExecuteRequest { plan_common.ExprContext expr_context = 5; } +message HeartbeatRequest { + +} + +message HeartbeatResponse { + +} + service TaskService { rpc CreateTask(CreateTaskRequest) returns (stream TaskInfoResponse); // Cancel an already-died (self execution-failure, previous aborted, completed) task will still succeed. rpc CancelTask(CancelTaskRequest) returns (CancelTaskResponse); rpc Execute(ExecuteRequest) returns (stream GetDataResponse); + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); } message GetDataRequest { diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index 816b62a02fc4d..95f8a4bcbb887 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -20,7 +20,7 @@ use risingwave_pb::batch_plan::TaskOutputId; use risingwave_pb::task_service::task_service_server::TaskService; use risingwave_pb::task_service::{ CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, GetDataResponse, - TaskInfoResponse, + HeartbeatRequest, HeartbeatResponse, TaskInfoResponse, }; use thiserror_ext::AsReport; use tokio_stream::wrappers::ReceiverStream; @@ -122,6 +122,14 @@ impl TaskService for BatchServiceImpl { let mgr = self.mgr.clone(); BatchServiceImpl::get_execute_stream(env, mgr, req).await } + + #[cfg_attr(coverage, coverage(off))] + async fn heartbeat( + &self, + _req: Request, + ) -> Result, Status> { + Ok(Response::new(HeartbeatResponse {})) + } } impl BatchServiceImpl { diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 47a8ede81884a..57a45da87fe38 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -279,7 +279,7 @@ impl FrontendEnv { Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); let query_manager = QueryManager::new( worker_node_manager.clone(), - compute_client_pool, + compute_client_pool.clone(), catalog_reader.clone(), Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()), batch_config.distributed_query_limit, @@ -382,6 +382,44 @@ impl FrontendEnv { }); join_handles.push(join_handle); + // Heartbeat + let heartbeat_worker_node_manager = worker_node_manager.clone(); + let join_handle = tokio::spawn(async move { + + let duration = std::cmp::max( + Duration::from_secs( + meta_config + .max_heartbeat_interval_secs as _, + ) / 10, + Duration::from_secs(1), + ); + + let mut check_heartbeat_interval = + tokio::time::interval(core::time::Duration::from_secs(2)); + check_heartbeat_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + check_heartbeat_interval.reset(); + loop { + check_heartbeat_interval.tick().await; + for worker in heartbeat_worker_node_manager + .list_worker_nodes() + .into_iter() + .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) + { + if let Ok(client) = compute_client_pool.get(&worker).await { + if let Err(_) = client.heartbeat().await { + info!("Worker node {} is not reachable, mask it", worker.id); + heartbeat_worker_node_manager.mask_worker_node(worker.id, duration); + } + } else { + info!("Worker node {} is not reachable, mask it", worker.id); + heartbeat_worker_node_manager.mask_worker_node(worker.id, duration); + } + } + } + }); + join_handles.push(join_handle); + let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs index 8e96f7a81702d..1782ea59cc9dd 100644 --- a/src/rpc_client/src/compute_client.rs +++ b/src/rpc_client/src/compute_client.rs @@ -37,8 +37,8 @@ use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient; use risingwave_pb::task_service::task_service_client::TaskServiceClient; use risingwave_pb::task_service::{ permits, CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, - GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits, - TaskInfoResponse, + GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, HeartbeatRequest, + HeartbeatResponse, PbPermits, TaskInfoResponse, }; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -189,6 +189,15 @@ impl ComputeClient { .into_inner()) } + pub async fn heartbeat(&self) -> Result { + Ok(self + .task_client + .to_owned() + .heartbeat(HeartbeatRequest {}) + .await? + .into_inner()) + } + pub async fn stack_trace(&self) -> Result { Ok(self .monitor_client From d70e05b515ad672ae2252c1549ee4666018182b4 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 29 Mar 2024 12:00:23 +0800 Subject: [PATCH 2/6] fix --- src/frontend/src/session.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 57a45da87fe38..4a9d5473dec8c 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -385,12 +385,8 @@ impl FrontendEnv { // Heartbeat let heartbeat_worker_node_manager = worker_node_manager.clone(); let join_handle = tokio::spawn(async move { - let duration = std::cmp::max( - Duration::from_secs( - meta_config - .max_heartbeat_interval_secs as _, - ) / 10, + Duration::from_secs(meta_config.max_heartbeat_interval_secs as _) / 10, Duration::from_secs(1), ); @@ -407,7 +403,7 @@ impl FrontendEnv { .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) { if let Ok(client) = compute_client_pool.get(&worker).await { - if let Err(_) = client.heartbeat().await { + if client.heartbeat().await.is_err() { info!("Worker node {} is not reachable, mask it", worker.id); heartbeat_worker_node_manager.mask_worker_node(worker.id, duration); } From 01defe6b8663b026d49937922d1ece413b71c6bd Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 29 Mar 2024 12:19:05 +0800 Subject: [PATCH 3/6] fmt --- proto/task_service.proto | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/proto/task_service.proto b/proto/task_service.proto index e4fd5b5dab53d..44ea88123b5aa 100644 --- a/proto/task_service.proto +++ b/proto/task_service.proto @@ -68,13 +68,9 @@ message ExecuteRequest { plan_common.ExprContext expr_context = 5; } -message HeartbeatRequest { +message HeartbeatRequest {} -} - -message HeartbeatResponse { - -} +message HeartbeatResponse {} service TaskService { rpc CreateTask(CreateTaskRequest) returns (stream TaskInfoResponse); From 045495d86ee25bd3aafd1e4e3559432353463ecd Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 1 Apr 2024 16:31:46 +0800 Subject: [PATCH 4/6] refine --- src/frontend/src/session.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 0cc0804236d47..b22b63c812f18 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -387,10 +387,10 @@ impl FrontendEnv { // Heartbeat let heartbeat_worker_node_manager = worker_node_manager.clone(); let join_handle = tokio::spawn(async move { - let duration = std::cmp::max( - Duration::from_secs(meta_config.max_heartbeat_interval_secs as _) / 10, - Duration::from_secs(1), - ); + let duration = Duration::from_secs(std::cmp::max( + batch_config.mask_worker_temporary_secs as u64, + 1, + )); let mut check_heartbeat_interval = tokio::time::interval(core::time::Duration::from_secs(2)); From 0e152f33921f370531dd80daaf37c336d079ba3a Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Sun, 7 Apr 2024 11:32:28 +0800 Subject: [PATCH 5/6] fmt --- src/frontend/src/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index c926b8f113064..830c45a76130b 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -383,7 +383,7 @@ impl FrontendEnv { }); join_handles.push(join_handle); - // Heartbeat + // Heartbeat between compute nodes (worker nodes) and frontend let heartbeat_worker_node_manager = worker_node_manager.clone(); let join_handle = tokio::spawn(async move { let duration = Duration::from_secs(std::cmp::max( From aa176b8c89ed354fce6511326fef5217cd27e255 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 8 Apr 2024 15:16:13 +0800 Subject: [PATCH 6/6] add invalidate --- src/frontend/src/session.rs | 26 +++++++++++++++++++------- src/rpc_client/src/lib.rs | 5 +++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index cca71269ad13f..25701a2768c81 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -406,6 +406,7 @@ impl FrontendEnv { check_heartbeat_interval .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); check_heartbeat_interval.reset(); + let heartbeat_fail_retry_num = 3; loop { check_heartbeat_interval.tick().await; for worker in heartbeat_worker_node_manager @@ -413,14 +414,25 @@ impl FrontendEnv { .into_iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) { - if let Ok(client) = compute_client_pool.get(&worker).await { - if client.heartbeat().await.is_err() { - info!("Worker node {} is not reachable, mask it", worker.id); - heartbeat_worker_node_manager.mask_worker_node(worker.id, duration); + let addr: HostAddr = worker.get_host().unwrap().into(); + for i in 0..heartbeat_fail_retry_num { + let success = if let Ok(client) = + compute_client_pool.get_by_addr(addr.clone()).await + { + client.heartbeat().await.is_ok() + } else { + false + }; + + if success { + break; + } else { + compute_client_pool.invalidate(&addr).await; + if i == heartbeat_fail_retry_num - 1 { + info!("Worker node {} is not reachable, mask it", worker.id); + heartbeat_worker_node_manager.mask_worker_node(worker.id, duration); + } } - } else { - info!("Worker node {} is not reachable, mask it", worker.id); - heartbeat_worker_node_manager.mask_worker_node(worker.id, duration); } } } diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index fabd1dabeca01..188c3b47d9b94 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -129,6 +129,11 @@ where .unwrap() .clone()) } + + /// Invalidates the clients for the given addr. + pub async fn invalidate(&self, addr: &HostAddr) { + self.clients.invalidate(addr).await; + } } /// `ExtraInfoSource` is used by heartbeat worker to pull extra info that needs to be piggybacked.