diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index f87db26ba84be..8f6429a4ca76b 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -33,6 +33,7 @@ memcomparable = "0.2" parking_lot = { version = "0.12", features = ["arc_lock"] } paste = "1" prometheus = { version = "0.13", features = ["process"] } +rand = "0.8" risingwave_common = { workspace = true } risingwave_common_estimate_size = { workspace = true } risingwave_connector = { workspace = true } diff --git a/src/batch/src/error.rs b/src/batch/src/error.rs index f4d7341cbc6dd..13803437bb075 100644 --- a/src/batch/src/error.rs +++ b/src/batch/src/error.rs @@ -30,6 +30,8 @@ use thiserror::Error; use thiserror_ext::Construct; use tonic::Status; +use crate::worker_manager::worker_node_manager::FragmentId; + pub type Result = std::result::Result; /// Batch result with shared error. pub type SharedResult = std::result::Result>; @@ -125,6 +127,15 @@ pub enum BatchError { #[backtrace] Arc, ), + + #[error("Empty workers found")] + EmptyWorkerNodes, + + #[error("Serving vnode mapping not found for fragment {0}")] + ServingVnodeMappingNotFound(FragmentId), + + #[error("Streaming vnode mapping not found for fragment {0}")] + StreamingVnodeMappingNotFound(FragmentId), } // Serialize/deserialize error. diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index d1d9ab8f302f1..d937c64826550 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -33,6 +33,7 @@ #![feature(lazy_cell)] #![feature(array_methods)] #![feature(error_generic_member_access)] +#![feature(map_try_insert)] pub mod error; pub mod exchange_source; @@ -41,6 +42,7 @@ pub mod executor; pub mod monitor; pub mod rpc; pub mod task; +pub mod worker_manager; #[macro_use] extern crate tracing; diff --git a/src/batch/src/worker_manager/mod.rs b/src/batch/src/worker_manager/mod.rs new file mode 100644 index 0000000000000..26758db5443d3 --- /dev/null +++ b/src/batch/src/worker_manager/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod worker_node_manager; diff --git a/src/frontend/src/scheduler/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs similarity index 94% rename from src/frontend/src/scheduler/worker_node_manager.rs rename to src/batch/src/worker_manager/worker_node_manager.rs index bba3cd97911cc..f116108433063 100644 --- a/src/frontend/src/scheduler/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -23,8 +23,9 @@ use risingwave_common::util::worker_util::get_pu_to_worker_mapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; -use crate::catalog::FragmentId; -use crate::scheduler::{SchedulerError, SchedulerResult}; +use crate::error::{BatchError, Result}; + +pub(crate) type FragmentId = u32; /// `WorkerNodeManager` manages live worker nodes and table vnode mapping information. pub struct WorkerNodeManager { @@ -160,9 +161,9 @@ impl WorkerNodeManager { pub fn get_workers_by_parallel_unit_ids( &self, parallel_unit_ids: &[ParallelUnitId], - ) -> SchedulerResult> { + ) -> Result> { if parallel_unit_ids.is_empty() { - return Err(SchedulerError::EmptyWorkerNodes); + return Err(BatchError::EmptyWorkerNodes); } let guard = self.inner.read().unwrap(); @@ -183,14 +184,14 @@ impl WorkerNodeManager { pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, - ) -> SchedulerResult { + ) -> Result { self.inner .read() .unwrap() .streaming_fragment_vnode_mapping .get(fragment_id) .cloned() - .ok_or_else(|| SchedulerError::StreamingVnodeMappingNotFound(*fragment_id)) + .ok_or_else(|| BatchError::StreamingVnodeMappingNotFound(*fragment_id)) } pub fn insert_streaming_fragment_mapping( @@ -227,15 +228,12 @@ impl WorkerNodeManager { } /// Returns fragment's vnode mapping for serving. - fn serving_fragment_mapping( - &self, - fragment_id: FragmentId, - ) -> SchedulerResult { + fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { self.inner .read() .unwrap() .get_serving_fragment_mapping(fragment_id) - .ok_or_else(|| SchedulerError::ServingVnodeMappingNotFound(fragment_id)) + .ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id)) } pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { @@ -339,10 +337,7 @@ impl WorkerNodeSelector { .sum() } - pub fn fragment_mapping( - &self, - fragment_id: FragmentId, - ) -> SchedulerResult { + pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { @@ -357,7 +352,7 @@ impl WorkerNodeSelector { (Some(o), max_parallelism) } Err(e) => { - if !matches!(e, SchedulerError::ServingVnodeMappingNotFound(_)) { + if !matches!(e, BatchError::ServingVnodeMappingNotFound(_)) { return Err(e); } // We cannot tell whether it's a singleton, set max_parallelism=1 for place_vnode as if it's a singleton. @@ -375,11 +370,11 @@ impl WorkerNodeSelector { // 2. Temporary mapping that filters out unavailable workers. let new_workers = self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()); let masked_mapping = place_vnode(hint.as_ref(), &new_workers, parallelism); - masked_mapping.ok_or_else(|| SchedulerError::EmptyWorkerNodes) + masked_mapping.ok_or_else(|| BatchError::EmptyWorkerNodes) } } - pub fn next_random_worker(&self) -> SchedulerResult { + pub fn next_random_worker(&self) -> Result { let worker_nodes = if self.enable_barrier_read { self.manager.list_streaming_worker_nodes() } else { @@ -387,7 +382,7 @@ impl WorkerNodeSelector { }; worker_nodes .choose(&mut rand::thread_rng()) - .ok_or_else(|| SchedulerError::EmptyWorkerNodes) + .ok_or_else(|| BatchError::EmptyWorkerNodes) .map(|w| (*w).clone()) } diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 3b73be3afbddc..4f5c8941c80df 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -23,6 +23,7 @@ use async_trait::async_trait; use futures::future::BoxFuture; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::acl::AclMode; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{ @@ -40,7 +41,6 @@ use risingwave_pb::user::grant_privilege::Object; use crate::catalog::catalog_service::CatalogReader; use crate::catalog::view_catalog::ViewCatalog; use crate::meta_client::FrontendMetaClient; -use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::session::AuthContext; use crate::user::user_catalog::UserCatalog; use crate::user::user_privilege::available_prost_privilege; diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index f5d7427f9cae0..4e021c894033a 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -13,6 +13,7 @@ // limitations under the License. use pgwire::pg_response::{PgResponse, StatementType}; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail_not_implemented; use risingwave_common::types::Fields; use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement}; @@ -32,7 +33,6 @@ use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{Convention, Explain}; use crate::optimizer::OptimizerContext; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::BatchPlanFragmenter; use crate::stream_fragmenter::build_graph; use crate::utils::explain_stream_graph; diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 321b318fa0068..61d8c31cb1467 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -22,6 +22,7 @@ use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Format; use postgres_types::FromSql; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::Schema; use risingwave_common::session_config::QueryMode; use risingwave_common::types::{DataType, Datum}; @@ -43,7 +44,6 @@ use crate::optimizer::{ }; use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::{ BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef, LocalQueryExecution, LocalQueryStream, diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index b9baa6d8868ad..c66c602394e69 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::RwLock; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::catalog::CatalogVersion; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; @@ -30,7 +31,6 @@ use tokio::sync::watch::Sender; use crate::catalog::root_catalog::Catalog; use crate::catalog::FragmentId; -use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::HummockSnapshotManagerRef; use crate::user::user_manager::UserInfoManager; use crate::user::UserInfoVersion; diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 1b009e3bb9698..4999e1d8630bf 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -49,6 +49,7 @@ use std::fmt::Debug; use fixedbitset::FixedBitSet; use generic::PhysicalPlanRef; use itertools::Itertools; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::{FieldDisplay, Schema, TableId}; use risingwave_common::hash::ParallelUnitId; use risingwave_pb::batch_plan::exchange_info::{ @@ -62,7 +63,6 @@ use crate::catalog::FragmentId; use crate::error::Result; use crate::optimizer::property::Order; use crate::optimizer::PlanRef; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; /// the distribution property provided by a operator. #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 1d96dc9f123a1..2aa52fa96cebb 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -23,6 +23,7 @@ use futures::executor::block_on; use petgraph::dot::{Config, Dot}; use petgraph::Graph; use pgwire::pg_server::SessionId; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_pb::batch_plan::{TaskId as TaskIdPb, TaskOutputId as TaskOutputIdPb}; use risingwave_pb::common::HostAddress; @@ -40,7 +41,6 @@ use crate::scheduler::distributed::stage::StageEvent::ScheduledRoot; use crate::scheduler::distributed::StageEvent::Scheduled; use crate::scheduler::distributed::StageExecution; use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID}; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::{ExecutionContextRef, ReadSnapshot, SchedulerError, SchedulerResult}; /// Message sent to a `QueryRunner` to control its execution. @@ -470,6 +470,9 @@ pub(crate) mod tests { use std::sync::{Arc, RwLock}; use fixedbitset::FixedBitSet; + use risingwave_batch::worker_manager::worker_node_manager::{ + WorkerNodeManager, WorkerNodeSelector, + }; use risingwave_common::catalog::{ ColumnCatalog, ColumnDesc, ConflictBehavior, DEFAULT_SUPER_USER_ID, }; @@ -491,7 +494,6 @@ pub(crate) mod tests { use crate::optimizer::{OptimizerContext, PlanRef}; use crate::scheduler::distributed::QueryExecution; use crate::scheduler::plan_fragmenter::{BatchPlanFragmenter, Query}; - use crate::scheduler::worker_node_manager::{WorkerNodeManager, WorkerNodeSelector}; use crate::scheduler::{ DistributedQueryMetrics, ExecutionContext, HummockSnapshotManager, QueryExecutionInfo, ReadSnapshot, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index abfc75b386ac0..69d48eb70d976 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -20,6 +20,9 @@ use std::task::{Context, Poll}; use futures::Stream; use pgwire::pg_server::{BoxedError, Session, SessionId}; +use risingwave_batch::worker_manager::worker_node_manager::{ + WorkerNodeManagerRef, WorkerNodeSelector, +}; use risingwave_common::array::DataChunk; use risingwave_common::session_config::QueryMode; use risingwave_pb::batch_plan::TaskOutputId; @@ -31,7 +34,6 @@ use super::stats::DistributedQueryMetrics; use super::QueryExecution; use crate::catalog::catalog_service::CatalogReader; use crate::scheduler::plan_fragmenter::{Query, QueryId}; -use crate::scheduler::worker_node_manager::{WorkerNodeManagerRef, WorkerNodeSelector}; use crate::scheduler::{ExecutionContextRef, SchedulerResult}; pub struct DistributedQueryStream { diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 0343867b1f81f..2538277b3bcf9 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -27,8 +27,10 @@ use futures::stream::Fuse; use futures::{stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use itertools::Itertools; +use risingwave_batch::error::BatchError; use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch}; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::addr::HostAddr; @@ -61,7 +63,6 @@ use crate::scheduler::distributed::QueryMessage; use crate::scheduler::plan_fragmenter::{ ExecutionPlanNode, PartitionInfo, QueryStageRef, StageId, TaskId, ROOT_TASK_ID, }; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::SchedulerError::{TaskExecutionError, TaskRunningOutOfMemory}; use crate::scheduler::{ExecutionContextRef, SchedulerError, SchedulerResult}; @@ -696,6 +697,7 @@ impl StageRunner { self.worker_node_manager .manager .get_streaming_fragment_mapping(fragment_id) + .map_err(|e| e.into()) } fn choose_worker( @@ -714,7 +716,7 @@ impl StageRunner { .manager .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; if candidates.is_empty() { - return Err(SchedulerError::EmptyWorkerNodes); + return Err(BatchError::EmptyWorkerNodes.into()); } let candidate = if self.stage.batch_enable_distributed_dml { // If distributed dml is enabled, we need to try our best to distribute dml tasks evenly to each worker. @@ -750,7 +752,7 @@ impl StageRunner { .manager .get_workers_by_parallel_unit_ids(&[pu])?; if candidates.is_empty() { - return Err(SchedulerError::EmptyWorkerNodes); + return Err(BatchError::EmptyWorkerNodes.into()); } Ok(Some(candidates[0].clone())) } else { diff --git a/src/frontend/src/scheduler/error.rs b/src/frontend/src/scheduler/error.rs index 590c235e13901..f841a27b6b42d 100644 --- a/src/frontend/src/scheduler/error.rs +++ b/src/frontend/src/scheduler/error.rs @@ -19,7 +19,6 @@ use risingwave_rpc_client::error::RpcError; use thiserror::Error; use tonic::{Code, Status}; -use crate::catalog::FragmentId; use crate::error::{ErrorCode, RwError}; use crate::scheduler::plan_fragmenter::QueryId; @@ -35,15 +34,6 @@ pub enum SchedulerError { RpcError, ), - #[error("Empty workers found")] - EmptyWorkerNodes, - - #[error("Serving vnode mapping not found for fragment {0}")] - ServingVnodeMappingNotFound(FragmentId), - - #[error("Streaming vnode mapping not found for fragment {0}")] - StreamingVnodeMappingNotFound(FragmentId), - #[error("{0}")] TaskExecutionError(String), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index cf9e1646ec515..ede32d2033353 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -24,8 +24,10 @@ use futures::{FutureExt, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use pgwire::pg_server::BoxedError; +use risingwave_batch::error::BatchError; use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownToken, TaskId}; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_common::bail; use risingwave_common::hash::ParallelUnitMapping; @@ -50,7 +52,6 @@ use crate::error::RwError; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::{ReadSnapshot, SchedulerError, SchedulerResult}; use crate::session::{FrontendEnv, SessionImpl}; @@ -578,6 +579,7 @@ impl LocalQueryExecution { self.worker_node_manager .manager .get_streaming_fragment_mapping(fragment_id) + .map_err(|e| e.into()) } fn choose_worker(&self, stage: &Arc) -> SchedulerResult> { @@ -591,7 +593,7 @@ impl LocalQueryExecution { .manager .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; if candidates.is_empty() { - return Err(SchedulerError::EmptyWorkerNodes); + return Err(BatchError::EmptyWorkerNodes.into()); } candidates[stage.session_id.0 as usize % candidates.len()].clone() }; diff --git a/src/frontend/src/scheduler/mod.rs b/src/frontend/src/scheduler/mod.rs index 225f15d5b66ca..046b3cd54a715 100644 --- a/src/frontend/src/scheduler/mod.rs +++ b/src/frontend/src/scheduler/mod.rs @@ -37,7 +37,6 @@ use crate::scheduler::task_context::FrontendBatchTaskContext; mod error; pub mod streaming_manager; mod task_context; -pub mod worker_node_manager; pub use self::error::SchedulerError; pub type SchedulerResult = std::result::Result; diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 16ccbb8164bf4..c9a9ba9d62151 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -24,6 +24,8 @@ use enum_as_inner::EnumAsInner; use futures::TryStreamExt; use itertools::Itertools; use pgwire::pg_server::SessionId; +use risingwave_batch::error::BatchError; +use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; @@ -55,7 +57,6 @@ use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef}; use crate::optimizer::plan_node::{BatchSource, PlanNodeId, PlanNodeType}; use crate::optimizer::property::Distribution; use crate::optimizer::PlanRef; -use crate::scheduler::worker_node_manager::WorkerNodeSelector; use crate::scheduler::SchedulerResult; #[derive(Clone, Debug, Hash, Eq, PartialEq)] @@ -898,7 +899,7 @@ impl BatchPlanFragmenter { } }; if source_info.is_none() && parallelism == 0 { - return Err(SchedulerError::EmptyWorkerNodes); + return Err(BatchError::EmptyWorkerNodes.into()); } let parallelism = if parallelism == 0 { None diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 47a8ede81884a..98349ec7bfde5 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -35,6 +35,9 @@ use pgwire::pg_server::{ use pgwire::types::{Format, FormatIterator}; use rand::RngCore; use risingwave_batch::task::{ShutdownSender, ShutdownToken}; +use risingwave_batch::worker_manager::worker_node_manager::{ + WorkerNodeManager, WorkerNodeManagerRef, +}; use risingwave_common::acl::AclMode; use risingwave_common::catalog::DEFAULT_SCHEMA_NAME; #[cfg(test)] @@ -97,7 +100,6 @@ use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl}; use crate::monitor::{FrontendMetrics, GLOBAL_FRONTEND_METRICS}; use crate::observer::FrontendObserverNode; use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef}; -use crate::scheduler::worker_node_manager::{WorkerNodeManager, WorkerNodeManagerRef}; use crate::scheduler::{ DistributedQueryMetrics, HummockSnapshotManager, HummockSnapshotManagerRef, QueryManager, GLOBAL_DISTRIBUTED_QUERY_METRICS,