Skip to content

Commit

Permalink
move worker node manager from frontend to batch
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Mar 27, 2024
1 parent 1e84852 commit bc819e6
Show file tree
Hide file tree
Showing 18 changed files with 70 additions and 46 deletions.
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ memcomparable = "0.2"
parking_lot = { version = "0.12", features = ["arc_lock"] }
paste = "1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
risingwave_common = { workspace = true }
risingwave_common_estimate_size = { workspace = true }
risingwave_connector = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use thiserror::Error;
use thiserror_ext::Construct;
use tonic::Status;

use crate::worker_manager::worker_node_manager::FragmentId;

pub type Result<T> = std::result::Result<T, BatchError>;
/// Batch result with shared error.
pub type SharedResult<T> = std::result::Result<T, Arc<BatchError>>;
Expand Down Expand Up @@ -125,6 +127,15 @@ pub enum BatchError {
#[backtrace]
Arc<Self>,
),

#[error("Empty workers found")]
EmptyWorkerNodes,

#[error("Serving vnode mapping not found for fragment {0}")]
ServingVnodeMappingNotFound(FragmentId),

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),
}

// Serialize/deserialize error.
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#![feature(lazy_cell)]
#![feature(array_methods)]
#![feature(error_generic_member_access)]
#![feature(map_try_insert)]

pub mod error;
pub mod exchange_source;
Expand All @@ -41,6 +42,7 @@ pub mod executor;
pub mod monitor;
pub mod rpc;
pub mod task;
pub mod worker_manager;

#[macro_use]
extern crate tracing;
Expand Down
15 changes: 15 additions & 0 deletions src/batch/src/worker_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod worker_node_manager;
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
use risingwave_pb::common::{WorkerNode, WorkerType};

use crate::catalog::FragmentId;
use crate::scheduler::{SchedulerError, SchedulerResult};
use crate::error::{BatchError, Result};

pub(crate) type FragmentId = u32;

/// `WorkerNodeManager` manages live worker nodes and table vnode mapping information.
pub struct WorkerNodeManager {
Expand Down Expand Up @@ -160,9 +161,9 @@ impl WorkerNodeManager {
pub fn get_workers_by_parallel_unit_ids(
&self,
parallel_unit_ids: &[ParallelUnitId],
) -> SchedulerResult<Vec<WorkerNode>> {
) -> Result<Vec<WorkerNode>> {
if parallel_unit_ids.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
return Err(BatchError::EmptyWorkerNodes);
}

let guard = self.inner.read().unwrap();
Expand All @@ -183,14 +184,14 @@ impl WorkerNodeManager {
pub fn get_streaming_fragment_mapping(
&self,
fragment_id: &FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
) -> Result<ParallelUnitMapping> {
self.inner
.read()
.unwrap()
.streaming_fragment_vnode_mapping
.get(fragment_id)
.cloned()
.ok_or_else(|| SchedulerError::StreamingVnodeMappingNotFound(*fragment_id))
.ok_or_else(|| BatchError::StreamingVnodeMappingNotFound(*fragment_id))
}

pub fn insert_streaming_fragment_mapping(
Expand Down Expand Up @@ -227,15 +228,12 @@ impl WorkerNodeManager {
}

/// Returns fragment's vnode mapping for serving.
fn serving_fragment_mapping(
&self,
fragment_id: FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result<ParallelUnitMapping> {
self.inner
.read()
.unwrap()
.get_serving_fragment_mapping(fragment_id)
.ok_or_else(|| SchedulerError::ServingVnodeMappingNotFound(fragment_id))
.ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id))
}

pub fn set_serving_fragment_mapping(&self, mappings: HashMap<FragmentId, ParallelUnitMapping>) {
Expand Down Expand Up @@ -339,10 +337,7 @@ impl WorkerNodeSelector {
.sum()
}

pub fn fragment_mapping(
&self,
fragment_id: FragmentId,
) -> SchedulerResult<ParallelUnitMapping> {
pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<ParallelUnitMapping> {
if self.enable_barrier_read {
self.manager.get_streaming_fragment_mapping(&fragment_id)
} else {
Expand All @@ -357,7 +352,7 @@ impl WorkerNodeSelector {
(Some(o), max_parallelism)
}
Err(e) => {
if !matches!(e, SchedulerError::ServingVnodeMappingNotFound(_)) {
if !matches!(e, BatchError::ServingVnodeMappingNotFound(_)) {
return Err(e);
}
// We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton.
Expand All @@ -375,19 +370,19 @@ impl WorkerNodeSelector {
// 2. Temporary mapping that filters out unavailable workers.
let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes());
let masked_mapping = place_vnode(hint.as_ref(), &new_workers, parallelism);
masked_mapping.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
masked_mapping.ok_or_else(|| BatchError::EmptyWorkerNodes)
}
}

pub fn next_random_worker(&self) -> SchedulerResult<WorkerNode> {
pub fn next_random_worker(&self) -> Result<WorkerNode> {
let worker_nodes = if self.enable_barrier_read {
self.manager.list_streaming_worker_nodes()
} else {
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes
.choose(&mut rand::thread_rng())
.ok_or_else(|| SchedulerError::EmptyWorkerNodes)
.ok_or_else(|| BatchError::EmptyWorkerNodes)
.map(|w| (*w).clone())
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use async_trait::async_trait;
use futures::future::BoxFuture;
use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::acl::AclMode;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{
Expand All @@ -40,7 +41,6 @@ use risingwave_pb::user::grant_privilege::Object;
use crate::catalog::catalog_service::CatalogReader;
use crate::catalog::view_catalog::ViewCatalog;
use crate::meta_client::FrontendMetaClient;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::session::AuthContext;
use crate::user::user_catalog::UserCatalog;
use crate::user::user_privilege::available_prost_privilege;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::Fields;
use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement};
Expand All @@ -32,7 +33,6 @@ use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{Convention, Explain};
use crate::optimizer::OptimizerContext;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::BatchPlanFragmenter;
use crate::stream_fragmenter::build_graph;
use crate::utils::explain_stream_graph;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Format;
use postgres_types::FromSql;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::{DataType, Datum};
Expand All @@ -43,7 +44,6 @@ use crate::optimizer::{
};
use crate::planner::Planner;
use crate::scheduler::plan_fragmenter::Query;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{
BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
LocalQueryExecution, LocalQueryStream,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use itertools::Itertools;
use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
Expand All @@ -30,7 +31,6 @@ use tokio::sync::watch::Sender;

use crate::catalog::root_catalog::Catalog;
use crate::catalog::FragmentId;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::HummockSnapshotManagerRef;
use crate::user::user_manager::UserInfoManager;
use crate::user::UserInfoVersion;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::fmt::Debug;
use fixedbitset::FixedBitSet;
use generic::PhysicalPlanRef;
use itertools::Itertools;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::catalog::{FieldDisplay, Schema, TableId};
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::batch_plan::exchange_info::{
Expand All @@ -62,7 +63,6 @@ use crate::catalog::FragmentId;
use crate::error::Result;
use crate::optimizer::property::Order;
use crate::optimizer::PlanRef;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;

/// the distribution property provided by a operator.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::executor::block_on;
use petgraph::dot::{Config, Dot};
use petgraph::Graph;
use pgwire::pg_server::SessionId;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::array::DataChunk;
use risingwave_pb::batch_plan::{TaskId as TaskIdPb, TaskOutputId as TaskOutputIdPb};
use risingwave_pb::common::HostAddress;
Expand All @@ -40,7 +41,6 @@ use crate::scheduler::distributed::stage::StageEvent::ScheduledRoot;
use crate::scheduler::distributed::StageEvent::Scheduled;
use crate::scheduler::distributed::StageExecution;
use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID};
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::{ExecutionContextRef, ReadSnapshot, SchedulerError, SchedulerResult};

/// Message sent to a `QueryRunner` to control its execution.
Expand Down Expand Up @@ -470,6 +470,9 @@ pub(crate) mod tests {
use std::sync::{Arc, RwLock};

use fixedbitset::FixedBitSet;
use risingwave_batch::worker_manager::worker_node_manager::{
WorkerNodeManager, WorkerNodeSelector,
};
use risingwave_common::catalog::{
ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SUPER_USER_ID,
};
Expand All @@ -491,7 +494,6 @@ pub(crate) mod tests {
use crate::optimizer::{OptimizerContext, PlanRef};
use crate::scheduler::distributed::QueryExecution;
use crate::scheduler::plan_fragmenter::{BatchPlanFragmenter, Query};
use crate::scheduler::worker_node_manager::{WorkerNodeManager, WorkerNodeSelector};
use crate::scheduler::{
DistributedQueryMetrics, ExecutionContext, HummockSnapshotManager, QueryExecutionInfo,
ReadSnapshot,
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use std::task::{Context, Poll};

use futures::Stream;
use pgwire::pg_server::{BoxedError, Session, SessionId};
use risingwave_batch::worker_manager::worker_node_manager::{
WorkerNodeManagerRef, WorkerNodeSelector,
};
use risingwave_common::array::DataChunk;
use risingwave_common::session_config::QueryMode;
use risingwave_pb::batch_plan::TaskOutputId;
Expand All @@ -31,7 +34,6 @@ use super::stats::DistributedQueryMetrics;
use super::QueryExecution;
use crate::catalog::catalog_service::CatalogReader;
use crate::scheduler::plan_fragmenter::{Query, QueryId};
use crate::scheduler::worker_node_manager::{WorkerNodeManagerRef, WorkerNodeSelector};
use crate::scheduler::{ExecutionContextRef, SchedulerResult};

pub struct DistributedQueryStream {
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use futures::stream::Fuse;
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::for_await;
use itertools::Itertools;
use risingwave_batch::error::BatchError;
use risingwave_batch::executor::ExecutorBuilder;
use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::array::DataChunk;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::util::addr::HostAddr;
Expand Down Expand Up @@ -61,7 +63,6 @@ use crate::scheduler::distributed::QueryMessage;
use crate::scheduler::plan_fragmenter::{
ExecutionPlanNode, PartitionInfo, QueryStageRef, StageId, TaskId, ROOT_TASK_ID,
};
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
use crate::scheduler::SchedulerError::{TaskExecutionError, TaskRunningOutOfMemory};
use crate::scheduler::{ExecutionContextRef, SchedulerError, SchedulerResult};

Expand Down Expand Up @@ -696,6 +697,7 @@ impl StageRunner {
self.worker_node_manager
.manager
.get_streaming_fragment_mapping(fragment_id)
.map_err(|e| e.into())
}

fn choose_worker(
Expand All @@ -714,7 +716,7 @@ impl StageRunner {
.manager
.get_workers_by_parallel_unit_ids(&parallel_unit_ids)?;
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
return Err(BatchError::EmptyWorkerNodes.into());
}
let candidate = if self.stage.batch_enable_distributed_dml {
// If distributed dml is enabled, we need to try our best to distribute dml tasks evenly to each worker.
Expand Down Expand Up @@ -750,7 +752,7 @@ impl StageRunner {
.manager
.get_workers_by_parallel_unit_ids(&[pu])?;
if candidates.is_empty() {
return Err(SchedulerError::EmptyWorkerNodes);
return Err(BatchError::EmptyWorkerNodes.into());
}
Ok(Some(candidates[0].clone()))
} else {
Expand Down
10 changes: 0 additions & 10 deletions src/frontend/src/scheduler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_rpc_client::error::RpcError;
use thiserror::Error;
use tonic::{Code, Status};

use crate::catalog::FragmentId;
use crate::error::{ErrorCode, RwError};
use crate::scheduler::plan_fragmenter::QueryId;

Expand All @@ -35,15 +34,6 @@ pub enum SchedulerError {
RpcError,
),

#[error("Empty workers found")]
EmptyWorkerNodes,

#[error("Serving vnode mapping not found for fragment {0}")]
ServingVnodeMappingNotFound(FragmentId),

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),

#[error("{0}")]
TaskExecutionError(String),

Expand Down
Loading

0 comments on commit bc819e6

Please sign in to comment.