Skip to content

Commit

Permalink
feat(batch): mask worker node for local query (#15951)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 29, 2024
1 parent 2ab5aef commit f756b72
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 12 deletions.
38 changes: 36 additions & 2 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::PbExchangeSource;
Expand Down Expand Up @@ -97,13 +100,44 @@ impl CreateSource for DefaultCreateSource {
task_output_id,
);

let mask_failed_serving_worker = || {
if let Some(worker_node_manager) = context.worker_node_manager() {
if let Some(worker) =
worker_node_manager
.list_worker_nodes()
.iter()
.find(|worker| {
worker
.host
.as_ref()
.map_or(false, |h| HostAddr::from(h) == peer_addr)
&& worker.property.as_ref().map_or(false, |p| p.is_serving)
})
{
let duration = Duration::from_secs(std::cmp::max(
context.get_config().mask_worker_temporary_secs as u64,
1,
));
worker_node_manager.mask_worker_node(worker.id, duration);
}
}
};

Ok(ExchangeSourceImpl::Grpc(
GrpcExchangeSource::create(
self.client_pool.get_by_addr(peer_addr).await?,
self.client_pool
.get_by_addr(peer_addr.clone())
.await
.inspect_err(|_| mask_failed_serving_worker())?,
task_output_id.clone(),
prost_source.local_execute_plan.clone(),
)
.await?,
.await
.inspect_err(|e| {
if matches!(e, BatchError::RpcError(_)) {
mask_failed_serving_worker()
}
})?,
))
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::TaskId;
use crate::error::Result;
use crate::monitor::{BatchMetricsWithTaskLabels, BatchMetricsWithTaskLabelsInner};
use crate::task::{BatchEnvironment, TaskOutput, TaskOutputId};
use crate::worker_manager::worker_node_manager::WorkerNodeManagerRef;

/// Context for batch task execution.
///
Expand Down Expand Up @@ -65,6 +66,8 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {
fn mem_usage(&self) -> usize;

fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext;

fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef>;
}

/// Batch task context on compute node.
Expand Down Expand Up @@ -149,6 +152,10 @@ impl BatchTaskContext for ComputeNodeContext {
MemoryContext::none()
}
}

fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef> {
None
}
}

impl ComputeNodeContext {
Expand Down
5 changes: 5 additions & 0 deletions src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ impl WorkerNodeManager {
}

pub fn mask_worker_node(&self, worker_node_id: u32, duration: Duration) {
tracing::info!(
"Mask worker node {} for {:?} temporarily",
worker_node_id,
duration
);
let mut worker_node_mask = self.worker_node_mask.write().unwrap();
if worker_node_mask.contains(&worker_node_id) {
return;
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,10 @@ pub struct BatchConfig {
#[serde(default = "default::batch::frontend_compute_runtime_worker_threads")]
/// frontend compute runtime worker threads
pub frontend_compute_runtime_worker_threads: usize,

/// This is the secs used to mask a worker unavailable temporarily.
#[serde(default = "default::batch::mask_worker_temporary_secs")]
pub mask_worker_temporary_secs: usize,
}

/// The section `[streaming]` in `risingwave.toml`.
Expand Down Expand Up @@ -1588,6 +1592,10 @@ pub mod default {
pub fn frontend_compute_runtime_worker_threads() -> usize {
4
}

pub fn mask_worker_temporary_secs() -> usize {
30
}
}

pub mod compaction_config {
Expand Down
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This page is automatically generated by `./risedev generate-example-config`
| distributed_query_limit | This is the max number of queries per sql session. | |
| enable_barrier_read | | false |
| frontend_compute_runtime_worker_threads | frontend compute runtime worker threads | 4 |
| mask_worker_temporary_secs | This is the secs used to mask a worker unavailable temporarily. | 30 |
| max_batch_queries_per_frontend_node | This is the max number of batch queries per frontend node. | |
| statement_timeout_in_sec | Timeout for a batch query in seconds. | 3600 |
| worker_threads_num | The thread number of the batch task runtime in the compute node. The default value is decided by `tokio`. | |
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ meta_enable_check_task_level_overlap = false
enable_barrier_read = false
statement_timeout_in_sec = 3600
frontend_compute_runtime_worker_threads = 4
mask_worker_temporary_secs = 30

[batch.developer]
batch_connector_message_buffer_size = 16
Expand Down
18 changes: 8 additions & 10 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,16 +1032,14 @@ impl StageRunner {
if !worker.property.as_ref().map_or(false, |p| p.is_serving) {
return;
}
let duration = std::cmp::max(
Duration::from_secs(
self.ctx
.session
.env()
.meta_config()
.max_heartbeat_interval_secs as _,
) / 10,
Duration::from_secs(1),
);
let duration = Duration::from_secs(std::cmp::max(
self.ctx
.session
.env()
.batch_config()
.mask_worker_temporary_secs as u64,
1,
));
self.worker_node_manager
.manager
.mask_worker_node(worker.id, duration);
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use risingwave_batch::error::Result;
use risingwave_batch::monitor::BatchMetricsWithTaskLabels;
use risingwave_batch::task::{BatchTaskContext, TaskOutput, TaskOutputId};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::catalog::SysCatalogReaderRef;
use risingwave_common::config::BatchConfig;
use risingwave_common::memory::MemoryContext;
Expand Down Expand Up @@ -95,4 +96,8 @@ impl BatchTaskContext for FrontendBatchTaskContext {
fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext {
MemoryContext::none()
}

fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef> {
Some(self.session.env().worker_node_manager_ref())
}
}

0 comments on commit f756b72

Please sign in to comment.