Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batch): move worker node manager from frontend to batch #15947

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ memcomparable = "0.2"
parking_lot = { version = "0.12", features = ["arc_lock"] }
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_connector = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use thiserror::Error;
use thiserror_ext::Construct;
use tonic::Status;

use crate::worker_manager::worker_node_manager::FragmentId;

pub type Result<T> = std::result::Result<T, BatchError>;
/// Batch result with shared error.
pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
Expand Down Expand Up @@ -125,6 +127,15 @@ pub enum BatchError {
#[backtrace]
Arc<Self>,
),

#[error("Empty workers found")]
EmptyWorkerNodes,

#[error("Serving vnode mapping not found for fragment {0}")]
ServingVnodeMappingNotFound(FragmentId),

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),
}

// Serialize/deserialize error.
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#![feature(lazy_cell)]
#![feature(array_methods)]
#![feature(error_generic_member_access)]
#![feature(map_try_insert)]

pub mod error;
pub mod exchange_source;
Expand All @@ -41,6 +42,7 @@ pub mod executor;
pub mod monitor;
pub mod rpc;
pub mod task;
pub mod worker_manager;

#[macro_use]
extern crate tracing;
Expand Down
15 changes: 15 additions & 0 deletions src/batch/src/worker_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod worker_node_manager;
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};

use crate::catalog::FragmentId;
use crate::scheduler::{SchedulerError, SchedulerResult};
use crate::error::{BatchError, Result};

pub(crate) type FragmentId = u32;

/// `WorkerNodeManager` manages live worker nodes and table vnode mapping information.
pub struct WorkerNodeManager {
Expand Down Expand Up @@ -160,9 +161,9 @@ impl WorkerNodeManager {
pub fn get_workers_by_parallel_unit_ids(
&self,
parallel_unit_ids: &[ParallelUnitId],
) -> SchedulerResult<Vec<WorkerNode>> {
) -> Result<Vec<WorkerNode>> {
if parallel_unit_ids.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
return Err(BatchError::EmptyWorkerNodes);
}

let guard = self.inner.read().unwrap();
Expand All @@ -183,14 +184,14 @@ impl WorkerNodeManager {
pub fn get_streaming_fragment_mapping(
&self,
fragment_id: &FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
) -> Result<ParallelUnitMapping> {
self.inner
.read()
.unwrap()
.streaming_fragment_vnode_mapping
.get(fragment_id)
.cloned()
.ok_or_else(|| SchedulerError::StreamingVnodeMappingNotFound(*fragment_id))
.ok_or_else(|| BatchError::StreamingVnodeMappingNotFound(*fragment_id))
}

pub fn insert_streaming_fragment_mapping(
Expand Down Expand Up @@ -227,15 +228,12 @@ impl WorkerNodeManager {
}

/// Returns fragment's vnode mapping for serving.
fn serving_fragment_mapping(
&self,
fragment_id: FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result<ParallelUnitMapping> {
self.inner
.read()
.unwrap()
.get_serving_fragment_mapping(fragment_id)
.ok_or_else(|| SchedulerError::ServingVnodeMappingNotFound(fragment_id))
.ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id))
}

pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, ParallelUnitMapping>) {
Expand Down Expand Up @@ -339,10 +337,7 @@ impl WorkerNodeSelector {
.sum()
}

pub fn fragment_mapping(
&self,
fragment_id: FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<ParallelUnitMapping> {
if self.enable_barrier_read {
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
Expand All @@ -357,7 +352,7 @@ impl WorkerNodeSelector {
(Some(o), max_parallelism)
}
Err(e) => {
if !matches!(e, SchedulerError::ServingVnodeMappingNotFound(_)) {
if !matches!(e, BatchError::ServingVnodeMappingNotFound(_)) {
return Err(e);
}
// We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton.
Expand All @@ -375,19 +370,19 @@ impl WorkerNodeSelector {
// 2. Temporary mapping that filters out unavailable workers.
let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes());
let masked_mapping = place_vnode(hint.as_ref(), &new_workers, parallelism);
masked_mapping.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
masked_mapping.ok_or_else(|| BatchError::EmptyWorkerNodes)
}
}

pub fn next_random_worker(&self) -> SchedulerResult<WorkerNode> {
pub fn next_random_worker(&self) -> Result<WorkerNode> {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.choose(&mut rand::thread_rng())
.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
.ok_or_else(|| BatchError::EmptyWorkerNodes)
.map(|w| (*w).clone())
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use async_trait::async_trait;
use futures::future::BoxFuture;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::acl::AclMode;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
Expand All @@ -40,7 +41,6 @@ use risingwave_pb::user::grant_privilege::Object;
use crate::catalog::catalog_service::CatalogReader;
use crate::catalog::view_catalog::ViewCatalog;
use crate::meta_client::FrontendMetaClient;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::session::AuthContext;
use crate::user::user_catalog::UserCatalog;
use crate::user::user_privilege::available_prost_privilege;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::Fields;
use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement};
Expand All @@ -32,7 +33,6 @@ use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{Convention, Explain};
use crate::optimizer::OptimizerContext;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::BatchPlanFragmenter;
use crate::stream_fragmenter::build_graph;
use crate::utils::explain_stream_graph;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Format;
use postgres_types::FromSql;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::{DataType, Datum};
Expand All @@ -43,7 +44,6 @@ use crate::optimizer::{
};
use crate::planner::Planner;
use crate::scheduler::plan_fragmenter::Query;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{
BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
LocalQueryExecution, LocalQueryStream,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
Expand All @@ -30,7 +31,6 @@ use tokio::sync::watch::Sender;

use crate::catalog::root_catalog::Catalog;
use crate::catalog::FragmentId;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::HummockSnapshotManagerRef;
use crate::user::user_manager::UserInfoManager;
use crate::user::UserInfoVersion;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::fmt::Debug;
use fixedbitset::FixedBitSet;
use generic::PhysicalPlanRef;
use itertools::Itertools;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::catalog::{FieldDisplay, Schema, TableId};
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::batch_plan::exchange_info::{
Expand All @@ -62,7 +63,6 @@ use crate::catalog::FragmentId;
use crate::error::Result;
use crate::optimizer::property::Order;
use crate::optimizer::PlanRef;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;

/// the distribution property provided by a operator.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::executor::block_on;
use petgraph::dot::{Config, Dot};
use petgraph::Graph;
use pgwire::pg_server::SessionId;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::array::DataChunk;
use risingwave_pb::batch_plan::{TaskId as TaskIdPb, TaskOutputId as TaskOutputIdPb};
use risingwave_pb::common::HostAddress;
Expand All @@ -40,7 +41,6 @@ use crate::scheduler::distributed::stage::StageEvent::ScheduledRoot;
use crate::scheduler::distributed::StageEvent::Scheduled;
use crate::scheduler::distributed::StageExecution;
use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID};
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{ExecutionContextRef, ReadSnapshot, SchedulerError, SchedulerResult};

/// Message sent to a `QueryRunner` to control its execution.
Expand Down Expand Up @@ -470,6 +470,9 @@ pub(crate) mod tests {
use std::sync::{Arc, RwLock};

use fixedbitset::FixedBitSet;
use risingwave_batch::worker_manager::worker_node_manager::{
WorkerNodeManager, WorkerNodeSelector,
};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SUPER_USER_ID,
};
Expand All @@ -491,7 +494,6 @@ pub(crate) mod tests {
use crate::optimizer::{OptimizerContext, PlanRef};
use crate::scheduler::distributed::QueryExecution;
use crate::scheduler::plan_fragmenter::{BatchPlanFragmenter, Query};
use crate::scheduler::worker_node_manager::{WorkerNodeManager, WorkerNodeSelector};
use crate::scheduler::{
DistributedQueryMetrics, ExecutionContext, HummockSnapshotManager, QueryExecutionInfo,
ReadSnapshot,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use std::task::{Context, Poll};

use futures::Stream;
use pgwire::pg_server::{BoxedError, Session, SessionId};
use risingwave_batch::worker_manager::worker_node_manager::{
WorkerNodeManagerRef, WorkerNodeSelector,
};
use risingwave_common::array::DataChunk;
use risingwave_common::session_config::QueryMode;
use risingwave_pb::batch_plan::TaskOutputId;
Expand All @@ -31,7 +34,6 @@ use super::stats::DistributedQueryMetrics;
use super::QueryExecution;
use crate::catalog::catalog_service::CatalogReader;
use crate::scheduler::plan_fragmenter::{Query, QueryId};
use crate::scheduler::worker_node_manager::{WorkerNodeManagerRef, WorkerNodeSelector};
use crate::scheduler::{ExecutionContextRef, SchedulerResult};

pub struct DistributedQueryStream {
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use futures::stream::Fuse;
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::for_await;
use itertools::Itertools;
use risingwave_batch::error::BatchError;
use risingwave_batch::executor::ExecutorBuilder;
use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::array::DataChunk;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::util::addr::HostAddr;
Expand Down Expand Up @@ -61,7 +63,6 @@ use crate::scheduler::distributed::QueryMessage;
use crate::scheduler::plan_fragmenter::{
ExecutionPlanNode, PartitionInfo, QueryStageRef, StageId, TaskId, ROOT_TASK_ID,
};
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::SchedulerError::{TaskExecutionError, TaskRunningOutOfMemory};
use crate::scheduler::{ExecutionContextRef, SchedulerError, SchedulerResult};

Expand Down Expand Up @@ -696,6 +697,7 @@ impl StageRunner {
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(fragment_id)
.map_err(|e| e.into())
}

fn choose_worker(
Expand All @@ -714,7 +716,7 @@ impl StageRunner {
.manager
.get_workers_by_parallel_unit_ids(&parallel_unit_ids)?;
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
return Err(BatchError::EmptyWorkerNodes.into());
}
let candidate = if self.stage.batch_enable_distributed_dml {
// If distributed dml is enabled, we need to try our best to distribute dml tasks evenly to each worker.
Expand Down Expand Up @@ -750,7 +752,7 @@ impl StageRunner {
.manager
.get_workers_by_parallel_unit_ids(&[pu])?;
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
return Err(BatchError::EmptyWorkerNodes.into());
}
Ok(Some(candidates[0].clone()))
} else {
Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/scheduler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_rpc_client::error::RpcError;
use thiserror::Error;
use tonic::{Code, Status};

use crate::catalog::FragmentId;
use crate::error::{ErrorCode, RwError};
use crate::scheduler::plan_fragmenter::QueryId;

Expand All @@ -35,15 +34,6 @@ pub enum SchedulerError {
RpcError,
),

#[error("Empty workers found")]
EmptyWorkerNodes,

#[error("Serving vnode mapping not found for fragment {0}")]
ServingVnodeMappingNotFound(FragmentId),

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),

#[error("{0}")]
TaskExecutionError(String),

Expand Down
Loading
Loading