diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index 28a3f04857555..24ff47958dd3f 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PbExchangeSource; @@ -97,13 +100,44 @@ impl CreateSource for DefaultCreateSource { task_output_id, ); + let mask_failed_serving_worker = || { + if let Some(worker_node_manager) = context.worker_node_manager() { + if let Some(worker) = + worker_node_manager + .list_worker_nodes() + .iter() + .find(|worker| { + worker + .host + .as_ref() + .map_or(false, |h| HostAddr::from(h) == peer_addr) + && worker.property.as_ref().map_or(false, |p| p.is_serving) + }) + { + let duration = Duration::from_secs(std::cmp::max( + context.get_config().mask_worker_temporary_secs as u64, + 1, + )); + worker_node_manager.mask_worker_node(worker.id, duration); + } + } + }; + Ok(ExchangeSourceImpl::Grpc( GrpcExchangeSource::create( - self.client_pool.get_by_addr(peer_addr).await?, + self.client_pool + .get_by_addr(peer_addr.clone()) + .await + .inspect_err(|_| mask_failed_serving_worker())?, task_output_id.clone(), prost_source.local_execute_plan.clone(), ) - .await?, + .await + .inspect_err(|e| { + if matches!(e, BatchError::RpcError(_)) { + mask_failed_serving_worker() + } + })?, )) } } diff --git a/src/batch/src/task/context.rs b/src/batch/src/task/context.rs index 45212b7ce8c65..89b7adec9d81b 100644 --- a/src/batch/src/task/context.rs +++ b/src/batch/src/task/context.rs @@ -27,6 +27,7 @@ use super::TaskId; use crate::error::Result; use crate::monitor::{BatchMetricsWithTaskLabels, BatchMetricsWithTaskLabelsInner}; use crate::task::{BatchEnvironment, TaskOutput, TaskOutputId}; +use crate::worker_manager::worker_node_manager::WorkerNodeManagerRef; /// Context for batch task execution. /// @@ -65,6 +66,8 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static { fn mem_usage(&self) -> usize; fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext; + + fn worker_node_manager(&self) -> Option; } /// Batch task context on compute node. @@ -149,6 +152,10 @@ impl BatchTaskContext for ComputeNodeContext { MemoryContext::none() } } + + fn worker_node_manager(&self) -> Option { + None + } } impl ComputeNodeContext { diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index f116108433063..5b0813186fd1c 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -277,6 +277,11 @@ impl WorkerNodeManager { } pub fn mask_worker_node(&self, worker_node_id: u32, duration: Duration) { + tracing::info!( + "Mask worker node {} for {:?} temporarily", + worker_node_id, + duration + ); let mut worker_node_mask = self.worker_node_mask.write().unwrap(); if worker_node_mask.contains(&worker_node_id) { return; diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 9948f1c31b3a6..e07339e6724b7 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -500,6 +500,10 @@ pub struct BatchConfig { #[serde(default = "default::batch::frontend_compute_runtime_worker_threads")] /// frontend compute runtime worker threads pub frontend_compute_runtime_worker_threads: usize, + + /// This is the secs used to mask a worker unavailable temporarily. + #[serde(default = "default::batch::mask_worker_temporary_secs")] + pub mask_worker_temporary_secs: usize, } /// The section `[streaming]` in `risingwave.toml`. @@ -1588,6 +1592,10 @@ pub mod default { pub fn frontend_compute_runtime_worker_threads() -> usize { 4 } + + pub fn mask_worker_temporary_secs() -> usize { + 30 + } } pub mod compaction_config { diff --git a/src/config/docs.md b/src/config/docs.md index 8bfbe76c7d394..e7e0640a634d2 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -9,6 +9,7 @@ This page is automatically generated by `./risedev generate-example-config` | distributed_query_limit | This is the max number of queries per sql session. | | | enable_barrier_read | | false | | frontend_compute_runtime_worker_threads | frontend compute runtime worker threads | 4 | +| mask_worker_temporary_secs | This is the secs used to mask a worker unavailable temporarily. | 30 | | max_batch_queries_per_frontend_node | This is the max number of batch queries per frontend node. | | | statement_timeout_in_sec | Timeout for a batch query in seconds. | 3600 | | worker_threads_num | The thread number of the batch task runtime in the compute node. The default value is decided by `tokio`. | | diff --git a/src/config/example.toml b/src/config/example.toml index b6605b5305cc9..4f95e929d4048 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -80,6 +80,7 @@ meta_enable_check_task_level_overlap = false enable_barrier_read = false statement_timeout_in_sec = 3600 frontend_compute_runtime_worker_threads = 4 +mask_worker_temporary_secs = 30 [batch.developer] batch_connector_message_buffer_size = 16 diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 2538277b3bcf9..e957c81483fa1 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -1032,16 +1032,14 @@ impl StageRunner { if !worker.property.as_ref().map_or(false, |p| p.is_serving) { return; } - let duration = std::cmp::max( - Duration::from_secs( - self.ctx - .session - .env() - .meta_config() - .max_heartbeat_interval_secs as _, - ) / 10, - Duration::from_secs(1), - ); + let duration = Duration::from_secs(std::cmp::max( + self.ctx + .session + .env() + .batch_config() + .mask_worker_temporary_secs as u64, + 1, + )); self.worker_node_manager .manager .mask_worker_node(worker.id, duration); diff --git a/src/frontend/src/scheduler/task_context.rs b/src/frontend/src/scheduler/task_context.rs index dcfbf30a215a1..fb617ab1dd7ea 100644 --- a/src/frontend/src/scheduler/task_context.rs +++ b/src/frontend/src/scheduler/task_context.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use risingwave_batch::error::Result; use risingwave_batch::monitor::BatchMetricsWithTaskLabels; use risingwave_batch::task::{BatchTaskContext, TaskOutput, TaskOutputId}; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::catalog::SysCatalogReaderRef; use risingwave_common::config::BatchConfig; use risingwave_common::memory::MemoryContext; @@ -95,4 +96,8 @@ impl BatchTaskContext for FrontendBatchTaskContext { fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext { MemoryContext::none() } + + fn worker_node_manager(&self) -> Option { + Some(self.session.env().worker_node_manager_ref()) + } }