Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): add heartbeat between frontend and compute #16014

Closed
wants to merge 11 commits into from
5 changes: 5 additions & 0 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ message ExecuteRequest {
plan_common.ExprContext expr_context = 5;
}

message HeartbeatRequest {}

message HeartbeatResponse {}

service TaskService {
rpc CreateTask(CreateTaskRequest) returns (stream TaskInfoResponse);
// Cancel an already-died (self execution-failure, previous aborted, completed) task will still succeed.
rpc CancelTask(CancelTaskRequest) returns (CancelTaskResponse);
rpc Execute(ExecuteRequest) returns (stream GetDataResponse);
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
}

message GetDataRequest {
Expand Down
10 changes: 9 additions & 1 deletion src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::task_service::task_service_server::TaskService;
use risingwave_pb::task_service::{
CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest, GetDataResponse,
TaskInfoResponse,
HeartbeatRequest, HeartbeatResponse, TaskInfoResponse,
};
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -122,6 +122,14 @@ impl TaskService for BatchServiceImpl {
let mgr = self.mgr.clone();
BatchServiceImpl::get_execute_stream(env, mgr, req).await
}

#[cfg_attr(coverage, coverage(off))]
async fn heartbeat(
&self,
_req: Request<HeartbeatRequest>,
) -> Result<Response<HeartbeatResponse>, Status> {
Ok(Response::new(HeartbeatResponse {}))
}
}

impl BatchServiceImpl {
Expand Down
36 changes: 35 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl FrontendEnv {
Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let query_manager = QueryManager::new(
worker_node_manager.clone(),
compute_client_pool,
compute_client_pool.clone(),
catalog_reader.clone(),
Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
batch_config.distributed_query_limit,
Expand Down Expand Up @@ -383,6 +383,40 @@ impl FrontendEnv {
});
join_handles.push(join_handle);

// Heartbeat
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved
let heartbeat_worker_node_manager = worker_node_manager.clone();
let join_handle = tokio::spawn(async move {
let duration = Duration::from_secs(std::cmp::max(
batch_config.mask_worker_temporary_secs as u64,
1,
));

let mut check_heartbeat_interval =
tokio::time::interval(core::time::Duration::from_secs(2));
check_heartbeat_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
check_heartbeat_interval.reset();
loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to listen on shutdown_senders

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops... The shutdown_senders and join_handers in FrontendEnv were not used... Let's fix it later 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the frontend node process, shutdown_senders and those background tasks has the same lifecycle, we don't need to handle it actually🤔

check_heartbeat_interval.tick().await;
for worker in heartbeat_worker_node_manager
.list_worker_nodes()
.into_iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
{
if let Ok(client) = compute_client_pool.get(&worker).await {
if client.heartbeat().await.is_err() {
info!("Worker node {} is not reachable, mask it", worker.id);
heartbeat_worker_node_manager.mask_worker_node(worker.id, duration);
}
} else {
info!("Worker node {} is not reachable, mask it", worker.id);
heartbeat_worker_node_manager.mask_worker_node(worker.id, duration);
}
}
}
});
join_handles.push(join_handle);

let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
let heap_profiler =
HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
Expand Down
13 changes: 11 additions & 2 deletions src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient;
use risingwave_pb::task_service::task_service_client::TaskServiceClient;
use risingwave_pb::task_service::{
permits, CancelTaskRequest, CancelTaskResponse, CreateTaskRequest, ExecuteRequest,
GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
TaskInfoResponse,
GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, HeartbeatRequest,
HeartbeatResponse, PbPermits, TaskInfoResponse,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -189,6 +189,15 @@ impl ComputeClient {
.into_inner())
}

pub async fn heartbeat(&self) -> Result<HeartbeatResponse> {
Ok(self
.task_client
.to_owned()
.heartbeat(HeartbeatRequest {})
.await?
.into_inner())
}

pub async fn stack_trace(&self) -> Result<StackTraceResponse> {
Ok(self
.monitor_client
Expand Down
Loading