diff --git a/Cargo.toml b/Cargo.toml index e615efa3987f5..233e6644c39a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,6 +158,7 @@ clap = { version = "4", features = ["cargo", "derive", "env"] } deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] } itertools = "0.13.0" jsonbb = "0.1.4" +linkme = { version = "0.3", features = ["used_linker"] } lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } parquet = { version = "53.2", features = ["async"] } mysql_async = { version = "0.34", default-features = false, features = [ diff --git a/src/batch/src/execution/local_exchange.rs b/src/batch/src/execution/local_exchange.rs index 2f9907e8151a3..714c1d9fd355a 100644 --- a/src/batch/src/execution/local_exchange.rs +++ b/src/batch/src/execution/local_exchange.rs @@ -31,7 +31,7 @@ pub struct LocalExchangeSource { impl LocalExchangeSource { pub fn create( output_id: TaskOutputId, - context: impl BatchTaskContext, + context: &dyn BatchTaskContext, task_id: TaskId, ) -> Result { let task_output = context.get_task_output(output_id)?; diff --git a/src/batch/src/executor/delete.rs b/src/batch/src/executor/delete.rs index f85fad254a750..6ffa6cb13dc13 100644 --- a/src/batch/src/executor/delete.rs +++ b/src/batch/src/executor/delete.rs @@ -28,7 +28,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// [`DeleteExecutor`] implements table deletion with values from its child executor. // Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting @@ -164,10 +163,9 @@ impl DeleteExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for DeleteExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/expand.rs b/src/batch/src/executor/expand.rs index e06b2cea2e99e..166dbe90e507a 100644 --- a/src/batch/src/executor/expand.rs +++ b/src/batch/src/executor/expand.rs @@ -22,7 +22,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; use crate::error::{BatchError, Result}; -use crate::task::BatchTaskContext; pub struct ExpandExecutor { column_subsets: Vec>, @@ -90,10 +89,9 @@ impl ExpandExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for ExpandExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let expand_node = try_match_expand!( diff --git a/src/batch/src/executor/filter.rs b/src/batch/src/executor/filter.rs index 0291582b95b59..9463e85993549 100644 --- a/src/batch/src/executor/filter.rs +++ b/src/batch/src/executor/filter.rs @@ -24,7 +24,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; pub struct FilterExecutor { expr: BoxedExpression, @@ -76,10 +75,9 @@ impl FilterExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for FilterExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [input]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index c1cd2ae41b80c..2c51c24cb653b 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; use futures::StreamExt; @@ -33,16 +34,16 @@ use crate::execution::local_exchange::LocalExchangeSource; use crate::executor::ExecutorBuilder; use crate::task::{BatchTaskContext, TaskId}; -pub type ExchangeExecutor = GenericExchangeExecutor; +pub type ExchangeExecutor = GenericExchangeExecutor; use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor}; use crate::monitor::BatchMetrics; -pub struct GenericExchangeExecutor { +pub struct GenericExchangeExecutor { proto_sources: Vec, /// Mock-able `CreateSource`. source_creators: Vec, sequential: bool, - context: C, + context: Arc, schema: Schema, #[expect(dead_code)] @@ -59,7 +60,7 @@ pub struct GenericExchangeExecutor { pub trait CreateSource: Send { async fn create_source( &self, - context: impl BatchTaskContext, + context: &dyn BatchTaskContext, prost_source: &PbExchangeSource, ) -> Result; } @@ -79,7 +80,7 @@ impl DefaultCreateSource { impl CreateSource for DefaultCreateSource { async fn create_source( &self, - context: impl BatchTaskContext, + context: &dyn BatchTaskContext, prost_source: &PbExchangeSource, ) -> Result { let peer_addr = prost_source.get_host()?.into(); @@ -146,10 +147,9 @@ impl CreateSource for DefaultCreateSource { pub struct GenericExchangeExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!( @@ -170,7 +170,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder { let input_schema: Vec = node.get_input_schema().to_vec(); let fields = input_schema.iter().map(Field::from).collect::>(); - Ok(Box::new(ExchangeExecutor:: { + Ok(Box::new(ExchangeExecutor { proto_sources, source_creators, sequential, @@ -183,9 +183,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder { } } -impl Executor - for GenericExchangeExecutor -{ +impl Executor for GenericExchangeExecutor { fn schema(&self) -> &Schema { &self.schema } @@ -199,7 +197,7 @@ impl Executor } } -impl GenericExchangeExecutor { +impl GenericExchangeExecutor { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { let streams = self @@ -210,7 +208,7 @@ impl GenericExchangeExec Self::data_chunk_stream( prost_source, source_creator, - self.context.clone(), + &*self.context, self.metrics.clone(), ) }); @@ -235,12 +233,10 @@ impl GenericExchangeExec async fn data_chunk_stream( prost_source: PbExchangeSource, source_creator: CS, - context: C, + context: &dyn BatchTaskContext, metrics: Option, ) { - let mut source = source_creator - .create_source(context.clone(), &prost_source) - .await?; + let mut source = source_creator.create_source(context, &prost_source).await?; // create the collector let counter = metrics .as_ref() @@ -290,20 +286,18 @@ mod tests { source_creators.push(fake_create_source); } - let executor = Box::new( - GenericExchangeExecutor:: { - metrics: None, - proto_sources, - source_creators, - sequential: false, - context, - schema: Schema { - fields: vec![Field::unnamed(DataType::Int32)], - }, - task_id: TaskId::default(), - identity: "GenericExchangeExecutor2".to_string(), + let executor = Box::new(GenericExchangeExecutor:: { + metrics: None, + proto_sources, + source_creators, + sequential: false, + context, + schema: Schema { + fields: vec![Field::unnamed(DataType::Int32)], }, - ); + task_id: TaskId::default(), + identity: "GenericExchangeExecutor2".to_string(), + }); let mut stream = executor.execute(); let mut chunks: Vec = vec![]; diff --git a/src/batch/src/executor/group_top_n.rs b/src/batch/src/executor/group_top_n.rs index 7dda468066e93..fb21152eb18e7 100644 --- a/src/batch/src/executor/group_top_n.rs +++ b/src/batch/src/executor/group_top_n.rs @@ -36,7 +36,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// Group Top-N Executor /// @@ -90,10 +89,9 @@ impl HashKeyDispatcher for GroupTopNExecutorBuilder { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for GroupTopNExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/hash_agg.rs b/src/batch/src/executor/hash_agg.rs index ea780d7bf83b3..db2fb8515c44f 100644 --- a/src/batch/src/executor/hash_agg.rs +++ b/src/batch/src/executor/hash_agg.rs @@ -47,7 +47,7 @@ use crate::spill::spill_op::SpillBackend::Disk; use crate::spill::spill_op::{ SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, }; -use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; +use crate::task::{ShutdownToken, TaskId}; type AggHashMap = hashbrown::HashMap, PrecomputedBuildHasher, A>; @@ -149,10 +149,9 @@ impl HashAggExecutorBuilder { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for HashAggExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/hop_window.rs b/src/batch/src/executor/hop_window.rs index 5345aeaa90552..3cd99dd0e5b8e 100644 --- a/src/batch/src/executor/hop_window.rs +++ b/src/batch/src/executor/hop_window.rs @@ -27,7 +27,7 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; + pub struct HopWindowExecutor { child: BoxedExecutor, identity: String, @@ -39,10 +39,9 @@ pub struct HopWindowExecutor { output_indices: Vec, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for HopWindowExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 4318fa2cd682d..d17cb0083e39c 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -40,7 +40,6 @@ use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; use crate::error::BatchError; use crate::executor::{DataChunk, Executor}; use crate::monitor::BatchMetrics; -use crate::task::BatchTaskContext; static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0; static POSITION_DELETE_FILE_POS: usize = 1; @@ -225,10 +224,9 @@ impl IcebergScanExecutor { pub struct IcebergScanExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> crate::error::Result { ensure!( diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index c007deaa9a8e0..6b7fe19228760 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -34,7 +34,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// [`InsertExecutor`] implements table insertion with values from its child executor. pub struct InsertExecutor { @@ -208,10 +207,9 @@ impl InsertExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for InsertExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 0a328e2f985f1..c124d4e224356 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -40,7 +40,7 @@ use crate::executor::{ unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase, }; -use crate::task::{BatchTaskContext, ShutdownToken}; +use crate::task::ShutdownToken; /// Distributed Lookup Join Executor. /// High level Execution flow: @@ -81,10 +81,9 @@ impl DistributedLookupJoinExecutor { pub struct DistributedLookupJoinExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [outer_side_input]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/join/hash_join.rs b/src/batch/src/executor/join/hash_join.rs index 5ec115e0990a2..f4e4b2f270a42 100644 --- a/src/batch/src/executor/join/hash_join.rs +++ b/src/batch/src/executor/join/hash_join.rs @@ -47,7 +47,7 @@ use crate::spill::spill_op::SpillBackend::Disk; use crate::spill::spill_op::{ SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, }; -use crate::task::{BatchTaskContext, ShutdownToken}; +use crate::task::ShutdownToken; /// Hash Join Executor /// @@ -2144,10 +2144,9 @@ impl DataChunkMutator { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for HashJoinExecutor<()> { - async fn new_boxed_executor( - context: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + context: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 7f6dfb1d99a7c..32c139c6929fe 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::marker::PhantomData; +use std::sync::Arc; use anyhow::Context; use itertools::Itertools; @@ -49,7 +50,7 @@ use crate::executor::{ use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; /// Inner side executor builder for the `LocalLookupJoinExecutor` -struct InnerSideExecutorBuilder { +struct InnerSideExecutorBuilder { table_desc: StorageTableDesc, table_distribution: TableDistribution, vnode_mapping: ExpandedWorkerSlotMapping, @@ -58,7 +59,7 @@ struct InnerSideExecutorBuilder { inner_side_column_ids: Vec, inner_side_key_types: Vec, lookup_prefix_len: usize, - context: C, + context: Arc, task_id: TaskId, epoch: BatchQueryEpoch, worker_slot_mapping: HashMap, @@ -82,7 +83,7 @@ pub trait LookupExecutorBuilder: Send { pub type BoxedLookupExecutorBuilder = Box; -impl InnerSideExecutorBuilder { +impl InnerSideExecutorBuilder { /// Gets the virtual node based on the given `scan_range` fn get_virtual_node(&self, scan_range: &ScanRange) -> Result { let virtual_node = scan_range @@ -161,7 +162,7 @@ impl InnerSideExecutorBuilder { } #[async_trait::async_trait] -impl LookupExecutorBuilder for InnerSideExecutorBuilder { +impl LookupExecutorBuilder for InnerSideExecutorBuilder { fn reset(&mut self) { self.worker_slot_to_scan_range_mapping = HashMap::new(); } @@ -287,10 +288,9 @@ impl LocalLookupJoinExecutor { pub struct LocalLookupJoinExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [outer_side_input]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/join/nested_loop_join.rs b/src/batch/src/executor/join/nested_loop_join.rs index ae3e5b8fb4b35..1c1991ed548b2 100644 --- a/src/batch/src/executor/join/nested_loop_join.rs +++ b/src/batch/src/executor/join/nested_loop_join.rs @@ -33,7 +33,7 @@ use crate::executor::join::{concatenate, convert_row_to_chunk, JoinType}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::{BatchTaskContext, ShutdownToken}; +use crate::task::ShutdownToken; /// Nested loop join executor. /// @@ -153,10 +153,9 @@ impl NestedLoopJoinExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for NestedLoopJoinExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/limit.rs b/src/batch/src/executor/limit.rs index 35e7a096714e2..08719b9e4efea 100644 --- a/src/batch/src/executor/limit.rs +++ b/src/batch/src/executor/limit.rs @@ -25,7 +25,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// Limit executor. pub struct LimitExecutor { @@ -38,10 +37,9 @@ pub struct LimitExecutor { identity: String, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for LimitExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 6f40f42fbba8c..8ea36e71cb73e 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -36,7 +36,6 @@ use risingwave_storage::{dispatch_state_store, StateStore}; use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; use crate::error::{BatchError, Result}; use crate::monitor::BatchMetrics; -use crate::task::BatchTaskContext; pub struct LogRowSeqScanExecutor { chunk_size: usize, @@ -87,10 +86,9 @@ impl LogRowSeqScanExecutor { pub struct LogStoreRowSeqScanExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!( diff --git a/src/batch/src/executor/max_one_row.rs b/src/batch/src/executor/max_one_row.rs index 7ddc95e08cf49..a86b872ce1f83 100644 --- a/src/batch/src/executor/max_one_row.rs +++ b/src/batch/src/executor/max_one_row.rs @@ -19,7 +19,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; -use crate::task::BatchTaskContext; pub struct MaxOneRowExecutor { child: BoxedExecutor, @@ -28,10 +27,9 @@ pub struct MaxOneRowExecutor { identity: String, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for MaxOneRowExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/merge_sort_exchange.rs b/src/batch/src/executor/merge_sort_exchange.rs index 3b5647729db25..1d6f5fc801e57 100644 --- a/src/batch/src/executor/merge_sort_exchange.rs +++ b/src/batch/src/executor/merge_sort_exchange.rs @@ -29,12 +29,12 @@ use crate::executor::{ }; use crate::task::{BatchTaskContext, TaskId}; -pub type MergeSortExchangeExecutor = MergeSortExchangeExecutorImpl; +pub type MergeSortExchangeExecutor = MergeSortExchangeExecutorImpl; /// `MergeSortExchangeExecutor2` takes inputs from multiple sources and /// The outputs of all the sources have been sorted in the same way. -pub struct MergeSortExchangeExecutorImpl { - context: C, +pub struct MergeSortExchangeExecutorImpl { + context: Arc, column_orders: Arc>, proto_sources: Vec, /// Mock-able `CreateSource`. @@ -47,10 +47,10 @@ pub struct MergeSortExchangeExecutorImpl { mem_ctx: MemoryContext, } -impl MergeSortExchangeExecutorImpl { +impl MergeSortExchangeExecutorImpl { #[allow(clippy::too_many_arguments)] pub fn new( - context: C, + context: Arc, column_orders: Arc>, proto_sources: Vec, source_creators: Vec, @@ -75,9 +75,7 @@ impl MergeSortExchangeEx } } -impl Executor - for MergeSortExchangeExecutorImpl -{ +impl Executor for MergeSortExchangeExecutorImpl { fn schema(&self) -> &Schema { &self.schema } @@ -93,13 +91,13 @@ impl Executor /// Everytime `execute` is called, it tries to produce a chunk of size /// `self.chunk_size`. It is possible that the chunk's size is smaller than the /// `self.chunk_size` as the executor runs out of input from `sources`. -impl MergeSortExchangeExecutorImpl { +impl MergeSortExchangeExecutorImpl { #[try_stream(boxed, ok = DataChunk, error = BatchError)] async fn do_execute(self: Box) { let mut sources: Vec = vec![]; for source_idx in 0..self.proto_sources.len() { let new_source = self.source_creators[source_idx] - .create_source(self.context.clone(), &self.proto_sources[source_idx]) + .create_source(&*self.context, &self.proto_sources[source_idx]) .await?; sources.push(Box::new(WrapStreamExecutor::new( @@ -126,10 +124,9 @@ impl MergeSortExchangeEx pub struct MergeSortExchangeExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!( @@ -159,7 +156,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder { .map(Field::from) .collect::>(); - Ok(Box::new(MergeSortExchangeExecutor::::new( + Ok(Box::new(MergeSortExchangeExecutor::new( source.context().clone(), column_orders, proto_sources, @@ -209,10 +206,7 @@ mod tests { order_type: OrderType::ascending(), }]); - let executor = Box::new(MergeSortExchangeExecutorImpl::< - FakeCreateSource, - ComputeNodeContext, - >::new( + let executor = Box::new(MergeSortExchangeExecutorImpl::::new( ComputeNodeContext::for_test(), column_orders, proto_sources, diff --git a/src/batch/src/executor/mod.rs b/src/batch/src/executor/mod.rs index ce84065d9d41c..f144576fb5808 100644 --- a/src/batch/src/executor/mod.rs +++ b/src/batch/src/executor/mod.rs @@ -48,6 +48,9 @@ mod update; mod utils; mod values; +use std::future::Future; +use std::sync::Arc; + use anyhow::Context; use async_recursion::async_recursion; pub use delete::*; @@ -127,18 +130,17 @@ impl std::fmt::Debug for BoxedExecutor { /// Every Executor should impl this trait to provide a static method to build a `BoxedExecutor` /// from proto and global environment. -#[async_trait::async_trait] pub trait BoxedExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, - ) -> Result; + ) -> impl Future> + Send; } -pub struct ExecutorBuilder<'a, C> { +pub struct ExecutorBuilder<'a> { pub plan_node: &'a PlanNode, pub task_id: &'a TaskId, - context: C, + context: Arc, epoch: BatchQueryEpoch, shutdown_rx: ShutdownToken, } @@ -148,18 +150,18 @@ macro_rules! build_executor { match $source.plan_node().get_node_body().unwrap() { $( $proto_type_name(..) => { - <$data_type>::new_boxed_executor($source, $inputs) + <$data_type>::new_boxed_executor($source, $inputs).await? }, )* } } } -impl<'a, C: Clone> ExecutorBuilder<'a, C> { +impl<'a> ExecutorBuilder<'a> { pub fn new( plan_node: &'a PlanNode, task_id: &'a TaskId, - context: C, + context: Arc, epoch: BatchQueryEpoch, shutdown_rx: ShutdownToken, ) -> Self { @@ -187,7 +189,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> { self.plan_node } - pub fn context(&self) -> &C { + pub fn context(&self) -> &Arc { &self.context } @@ -196,7 +198,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> { } } -impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { +impl<'a> ExecutorBuilder<'a> { pub async fn build(&self) -> Result { self.try_build() .await @@ -254,8 +256,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> { NodeBody::BlockExecutor => BlockExecutorBuilder, NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder, NodeBody::LogRowSeqScan => LogStoreRowSeqScanExecutorBuilder, - } - .await?; + }; Ok(Box::new(ManagedExecutor::new( real_executor, diff --git a/src/batch/src/executor/mysql_query.rs b/src/batch/src/executor/mysql_query.rs index 721c9c5e55bf1..52c271e330ce8 100644 --- a/src/batch/src/executor/mysql_query.rs +++ b/src/batch/src/executor/mysql_query.rs @@ -25,7 +25,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, BatchExternalSystemError}; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder}; -use crate::task::BatchTaskContext; /// `MySqlQuery` executor. Runs a query against a `MySql` database. pub struct MySqlQueryExecutor { @@ -144,10 +143,9 @@ impl MySqlQueryExecutor { pub struct MySqlQueryExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for MySqlQueryExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, _inputs: Vec, ) -> crate::error::Result { let mysql_query_node = try_match_expand!( diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs index 95019c226ffda..042073fc51ff6 100644 --- a/src/batch/src/executor/order_by.rs +++ b/src/batch/src/executor/order_by.rs @@ -40,7 +40,6 @@ use crate::spill::spill_op::SpillBackend::Disk; use crate::spill::spill_op::{ SpillBackend, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, }; -use crate::task::BatchTaskContext; /// Sort Executor /// @@ -76,10 +75,9 @@ impl Executor for SortExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for SortExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/postgres_query.rs b/src/batch/src/executor/postgres_query.rs index 4ae1fcba65da9..cdaab09004fed 100644 --- a/src/batch/src/executor/postgres_query.rs +++ b/src/batch/src/executor/postgres_query.rs @@ -25,7 +25,6 @@ use tokio_postgres; use crate::error::BatchError; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder}; -use crate::task::BatchTaskContext; /// `PostgresQuery` executor. Runs a query against a Postgres database. pub struct PostgresQueryExecutor { @@ -174,10 +173,9 @@ impl PostgresQueryExecutor { pub struct PostgresQueryExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for PostgresQueryExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, _inputs: Vec, ) -> crate::error::Result { let postgres_query_node = try_match_expand!( diff --git a/src/batch/src/executor/project.rs b/src/batch/src/executor/project.rs index 4e23aaa587985..db29319cccc72 100644 --- a/src/batch/src/executor/project.rs +++ b/src/batch/src/executor/project.rs @@ -25,7 +25,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; pub struct ProjectExecutor { expr: Vec, @@ -73,10 +72,9 @@ impl ProjectExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for ProjectExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/project_set.rs b/src/batch/src/executor/project_set.rs index 17eaadab76da2..c1ecd2489aef5 100644 --- a/src/batch/src/executor/project_set.rs +++ b/src/batch/src/executor/project_set.rs @@ -30,7 +30,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; pub struct ProjectSetExecutor { select_list: Vec, @@ -132,10 +131,9 @@ impl ProjectSetExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for ProjectSetExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 6f65d3fc16c78..763ed8e333ab9 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -40,7 +40,6 @@ use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; use crate::monitor::BatchMetrics; -use crate::task::BatchTaskContext; /// Executor that scans data from row table pub struct RowSeqScanExecutor { @@ -192,10 +191,9 @@ impl RowSeqScanExecutor { pub struct RowSeqScanExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!( diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs index 38907c63f8416..5418e58aa0987 100644 --- a/src/batch/src/executor/s3_file_scan.rs +++ b/src/batch/src/executor/s3_file_scan.rs @@ -22,7 +22,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::BatchError; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, DataChunk, Executor, ExecutorBuilder}; -use crate::task::BatchTaskContext; #[derive(PartialEq, Debug)] pub enum FileFormat { @@ -101,10 +100,9 @@ impl S3FileScanExecutor { pub struct FileScanExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for FileScanExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, _inputs: Vec, ) -> crate::error::Result { let file_scan_node = try_match_expand!( diff --git a/src/batch/src/executor/sort_agg.rs b/src/batch/src/executor/sort_agg.rs index 1cb5968045fa6..ad0000fc60980 100644 --- a/src/batch/src/executor/sort_agg.rs +++ b/src/batch/src/executor/sort_agg.rs @@ -28,7 +28,7 @@ use crate::executor::aggregation::build as build_agg; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::{BatchTaskContext, ShutdownToken}; +use crate::task::ShutdownToken; /// `SortAggExecutor` implements the sort aggregate algorithm, which assumes /// that the input chunks has already been sorted by group columns. @@ -47,10 +47,9 @@ pub struct SortAggExecutor { shutdown_rx: ShutdownToken, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for SortAggExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/sort_over_window.rs b/src/batch/src/executor/sort_over_window.rs index 86fe3670b7f1e..237d611c75741 100644 --- a/src/batch/src/executor/sort_over_window.rs +++ b/src/batch/src/executor/sort_over_window.rs @@ -27,7 +27,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; use crate::error::{BatchError, Result}; -use crate::task::{BatchTaskContext, ShutdownToken}; +use crate::task::ShutdownToken; /// [`SortOverWindowExecutor`] accepts input chunks sorted by partition key and order key, and /// outputs chunks with window function result columns. @@ -48,10 +48,9 @@ struct ExecutorInner { chunk_size: usize, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for SortOverWindowExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 1ba28404a4d8b..d5cb717df92a2 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -32,7 +32,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::Executor; use crate::error::{BatchError, Result}; use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder}; -use crate::task::BatchTaskContext; pub struct SourceExecutor { source: SourceReader, @@ -49,10 +48,9 @@ pub struct SourceExecutor { chunk_size: usize, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for SourceExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!(inputs.is_empty(), "Source should not have input executor!"); diff --git a/src/batch/src/executor/sys_row_seq_scan.rs b/src/batch/src/executor/sys_row_seq_scan.rs index 07521f73d74f0..3a60039d9107f 100644 --- a/src/batch/src/executor/sys_row_seq_scan.rs +++ b/src/batch/src/executor/sys_row_seq_scan.rs @@ -22,7 +22,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; pub struct SysRowSeqScanExecutor { table_id: TableId, @@ -53,10 +52,9 @@ impl SysRowSeqScanExecutor { pub struct SysRowSeqScanExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!( diff --git a/src/batch/src/executor/table_function.rs b/src/batch/src/executor/table_function.rs index 8ced12ebbf2ca..aa7235c98eba6 100644 --- a/src/batch/src/executor/table_function.rs +++ b/src/batch/src/executor/table_function.rs @@ -22,7 +22,6 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use super::{BoxedExecutor, BoxedExecutorBuilder}; use crate::error::{BatchError, Result}; use crate::executor::{BoxedDataChunkStream, Executor, ExecutorBuilder}; -use crate::task::BatchTaskContext; pub struct TableFunctionExecutor { schema: Schema, @@ -68,10 +67,9 @@ pub struct TableFunctionExecutorBuilder {} impl TableFunctionExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for TableFunctionExecutorBuilder { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!( diff --git a/src/batch/src/executor/test_utils.rs b/src/batch/src/executor/test_utils.rs index 1d949d6e0e06a..ac0c829d60aa5 100644 --- a/src/batch/src/executor/test_utils.rs +++ b/src/batch/src/executor/test_utils.rs @@ -277,7 +277,7 @@ impl FakeCreateSource { impl CreateSource for FakeCreateSource { async fn create_source( &self, - _: impl BatchTaskContext, + _: &dyn BatchTaskContext, _: &PbExchangeSource, ) -> Result { Ok(ExchangeSourceImpl::Fake(self.fake_exchange_source.clone())) @@ -341,10 +341,9 @@ impl LookupExecutorBuilder for FakeInnerSideExecutorBuilder { pub struct BlockExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for BlockExecutorBuilder { - async fn new_boxed_executor( - _source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + _source: &ExecutorBuilder<'_>, _inputs: Vec, ) -> Result { Ok(Box::new(BlockExecutor {})) @@ -378,10 +377,9 @@ impl BlockExecutor { pub struct BusyLoopExecutorBuilder {} -#[async_trait::async_trait] impl BoxedExecutorBuilder for BusyLoopExecutorBuilder { - async fn new_boxed_executor( - _source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + _source: &ExecutorBuilder<'_>, _inputs: Vec, ) -> Result { Ok(Box::new(BusyLoopExecutor {})) diff --git a/src/batch/src/executor/top_n.rs b/src/batch/src/executor/top_n.rs index a4450a76c17a8..ca60cec0e9351 100644 --- a/src/batch/src/executor/top_n.rs +++ b/src/batch/src/executor/top_n.rs @@ -30,7 +30,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// Top-N Executor /// @@ -47,10 +46,9 @@ pub struct TopNExecutor { mem_ctx: MemoryContext, } -#[async_trait::async_trait] impl BoxedExecutorBuilder for TopNExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/union.rs b/src/batch/src/executor/union.rs index e37baed08debc..343368e80f026 100644 --- a/src/batch/src/executor/union.rs +++ b/src/batch/src/executor/union.rs @@ -24,7 +24,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; pub struct UnionExecutor { inputs: Vec, @@ -63,10 +62,9 @@ impl UnionExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for UnionExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let _union_node = diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index 95f1963cf582e..b665e3597aa63 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -30,7 +30,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// [`UpdateExecutor`] implements table update with values from its child executor and given /// expressions. @@ -217,10 +216,9 @@ impl UpdateExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for UpdateExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { let [child]: [_; 1] = inputs.try_into().unwrap(); diff --git a/src/batch/src/executor/values.rs b/src/batch/src/executor/values.rs index 3d506ff8b6fb0..c3239b0e93768 100644 --- a/src/batch/src/executor/values.rs +++ b/src/batch/src/executor/values.rs @@ -26,7 +26,6 @@ use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, }; -use crate::task::BatchTaskContext; /// [`ValuesExecutor`] implements Values executor. pub struct ValuesExecutor { @@ -102,10 +101,9 @@ impl ValuesExecutor { } } -#[async_trait::async_trait] impl BoxedExecutorBuilder for ValuesExecutor { - async fn new_boxed_executor( - source: &ExecutorBuilder<'_, C>, + async fn new_boxed_executor( + source: &ExecutorBuilder<'_>, inputs: Vec, ) -> Result { ensure!(inputs.is_empty(), "ValuesExecutor should have no child!"); diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 38eb0efd80553..2f8bfd26fb873 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -29,6 +29,7 @@ #![feature(error_generic_member_access)] #![feature(map_try_insert)] #![feature(iter_from_coroutine)] +#![feature(used_with_arg)] pub mod error; pub mod exchange_source; diff --git a/src/batch/src/rpc/service/task_service.rs b/src/batch/src/rpc/service/task_service.rs index e61585f2751e8..e81f21cba15d1 100644 --- a/src/batch/src/rpc/service/task_service.rs +++ b/src/batch/src/rpc/service/task_service.rs @@ -74,7 +74,7 @@ impl TaskService for BatchServiceImpl { task_id.as_ref().expect("no task id found"), plan.expect("no plan found").clone(), epoch.expect("no epoch found"), - ComputeNodeContext::new(self.env.clone()), + ComputeNodeContext::create(self.env.clone()), state_reporter, TracingContext::from_protobuf(&tracing_context), expr_context.expect("no expression context found"), @@ -140,7 +140,7 @@ impl BatchServiceImpl { let tracing_context = TracingContext::from_protobuf(&tracing_context); let expr_context = expr_context.expect("no expression context found"); - let context = ComputeNodeContext::new(env.clone()); + let context = ComputeNodeContext::create(env.clone()); trace!( "local execute request: plan:{:?} with task id:{:?}", plan, diff --git a/src/batch/src/task/context.rs b/src/batch/src/task/context.rs index 0f796d53b4b8b..8c8289548f47c 100644 --- a/src/batch/src/task/context.rs +++ b/src/batch/src/task/context.rs @@ -32,7 +32,7 @@ use crate::worker_manager::worker_node_manager::WorkerNodeManagerRef; /// Context for batch task execution. /// /// This context is specific to one task execution, and should *not* be shared by different tasks. -pub trait BatchTaskContext: Clone + Send + Sync + 'static { +pub trait BatchTaskContext: Send + Sync + 'static { /// Get task output identified by `task_output_id`. /// /// Returns error if the task of `task_output_id` doesn't run in same worker as current task. @@ -133,25 +133,25 @@ impl BatchTaskContext for ComputeNodeContext { impl ComputeNodeContext { #[cfg(test)] - pub fn for_test() -> Self { - Self { + pub fn for_test() -> Arc { + Arc::new(Self { env: BatchEnvironment::for_test(), batch_metrics: BatchMetricsInner::for_test(), mem_context: MemoryContext::none(), - } + }) } - pub fn new(env: BatchEnvironment) -> Self { + pub fn create(env: BatchEnvironment) -> Arc { let mem_context = env.task_manager().memory_context_ref(); let batch_metrics = Arc::new(BatchMetricsInner::new( env.task_manager().metrics(), env.executor_metrics(), env.iceberg_scan_metrics(), )); - Self { + Arc::new(Self { env, batch_metrics, mem_context, - } + }) } } diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index da9f38e030c61..c50eb7f593692 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -296,7 +296,7 @@ impl ShutdownToken { } /// `BatchTaskExecution` represents a single task execution. -pub struct BatchTaskExecution { +pub struct BatchTaskExecution { /// Task id. task_id: TaskId, @@ -313,7 +313,7 @@ pub struct BatchTaskExecution { sender: ChanSenderImpl, /// Context for task execution - context: C, + context: Arc, /// The execution failure. failure: Arc>>>, @@ -328,11 +328,11 @@ pub struct BatchTaskExecution { heartbeat_join_handle: Mutex>>, } -impl BatchTaskExecution { +impl BatchTaskExecution { pub fn new( prost_tid: &PbTaskId, plan: PlanFragment, - context: C, + context: Arc, epoch: BatchQueryEpoch, runtime: Arc, ) -> Result { @@ -412,10 +412,7 @@ impl BatchTaskExecution { // Clone `self` to make compiler happy because of the move block. let t_1 = self.clone(); let this = self.clone(); - async fn notify_panic( - this: &BatchTaskExecution, - state_tx: Option<&mut StateReporter>, - ) { + async fn notify_panic(this: &BatchTaskExecution, state_tx: Option<&mut StateReporter>) { let err_str = "execution panic".into(); if let Err(e) = this .change_state_notify(TaskStatus::Failed, state_tx, Some(err_str)) @@ -669,7 +666,7 @@ impl BatchTaskExecution { } } -impl BatchTaskExecution { +impl BatchTaskExecution { pub(crate) fn set_heartbeat_join_handle(&self, join_handle: JoinHandle<()>) { *self.heartbeat_join_handle.lock() = Some(join_handle); } diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index c9fbde6369ef5..81c53a9844981 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -30,18 +30,17 @@ use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse}; use tokio::sync::mpsc::Sender; use tonic::Status; +use super::BatchTaskContext; use crate::error::Result; use crate::monitor::BatchManagerMetrics; use crate::rpc::service::exchange::GrpcExchangeWriter; -use crate::task::{ - BatchTaskExecution, ComputeNodeContext, StateReporter, TaskId, TaskOutput, TaskOutputId, -}; +use crate::task::{BatchTaskExecution, StateReporter, TaskId, TaskOutput, TaskOutputId}; /// `BatchManager` is responsible for managing all batch tasks. #[derive(Clone)] pub struct BatchManager { /// Every task id has a corresponding task execution. - tasks: Arc>>>>, + tasks: Arc>>>, /// Runtime for the batch manager. runtime: Arc, @@ -93,7 +92,7 @@ impl BatchManager { tid: &PbTaskId, plan: PlanFragment, epoch: BatchQueryEpoch, - context: ComputeNodeContext, + context: Arc, // ComputeNodeContext state_reporter: StateReporter, tracing_context: TracingContext, expr_context: ExprContext, @@ -139,6 +138,8 @@ impl BatchManager { ) -> Result<()> { use risingwave_hummock_sdk::test_batch_query_epoch; + use crate::task::ComputeNodeContext; + self.fire_task( tid, plan, diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 6a8a82f414ee4..9851186260e5a 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -35,7 +35,7 @@ futures = "0.3" futures-async-stream = { workspace = true } futures-util = "0.3" itertools = { workspace = true } -linkme = { version = "0.3", features = ["used_linker"] } +linkme = { workspace = true } num-traits = "0.2" parse-display = "0.10" paste = "1" diff --git a/src/expr/impl/Cargo.toml b/src/expr/impl/Cargo.toml index b8e2448a76397..87b27a9e670de 100644 --- a/src/expr/impl/Cargo.toml +++ b/src/expr/impl/Cargo.toml @@ -48,7 +48,7 @@ hmac = "0.12" icelake = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } -linkme = { version = "0.3", features = ["used_linker"] } +linkme = { workspace = true } md5 = "0.7" moka = { version = "0.12.0", features = ["sync"] } num-traits = "0.2" diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a0039a849c234..116ac1dacc07c 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -42,7 +42,7 @@ iceberg = { workspace = true } icelake = { workspace = true } itertools = { workspace = true } jsonbb = { workspace = true } -linkme = { version = "0.3", features = ["used_linker"] } +linkme = { workspace = true } maplit = "1" md5 = "0.7.0" memcomparable = "0.2" diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 633dac2d2d562..d447df55d453e 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -100,7 +100,7 @@ impl LocalQueryExecution { pub async fn run_inner(self) { debug!(%self.query.query_id, self.sql, "Starting to run query"); - let context = FrontendBatchTaskContext::new(self.session.clone()); + let context = FrontendBatchTaskContext::create(self.session.clone()); let task_id = TaskId { query_id: self.query.query_id.id.clone(), diff --git a/src/frontend/src/scheduler/mod.rs b/src/frontend/src/scheduler/mod.rs index 046b3cd54a715..81ee15ef79fd9 100644 --- a/src/frontend/src/scheduler/mod.rs +++ b/src/frontend/src/scheduler/mod.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use futures::Stream; +use risingwave_batch::task::BatchTaskContext; use risingwave_common::array::DataChunk; use crate::error::Result; @@ -64,7 +65,7 @@ impl ExecutionContext { self.timeout } - pub fn to_batch_task_context(&self) -> FrontendBatchTaskContext { - FrontendBatchTaskContext::new(self.session.clone()) + pub fn to_batch_task_context(&self) -> Arc { + FrontendBatchTaskContext::create(self.session.clone()) } } diff --git a/src/frontend/src/scheduler/task_context.rs b/src/frontend/src/scheduler/task_context.rs index ede057c9a9c24..14d7a493b75f2 100644 --- a/src/frontend/src/scheduler/task_context.rs +++ b/src/frontend/src/scheduler/task_context.rs @@ -39,13 +39,13 @@ pub struct FrontendBatchTaskContext { } impl FrontendBatchTaskContext { - pub fn new(session: Arc) -> Self { + pub fn create(session: Arc) -> Arc { let mem_context = MemoryContext::new(Some(session.env().mem_context()), TrAdderAtomic::new(0)); - Self { + Arc::new(Self { session, mem_context, - } + }) } } diff --git a/src/prost/build.rs b/src/prost/build.rs index 1dc7cd66c3456..3325246d5101b 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -100,7 +100,7 @@ fn main() -> Result<(), Box> { .type_attribute(".", "#[derive(prost_helpers::AnyPB)]") .type_attribute( "node_body", - "#[derive(::enum_as_inner::EnumAsInner, ::strum::Display)]", + "#[derive(::enum_as_inner::EnumAsInner, ::strum::Display, ::strum::EnumDiscriminants)]", ) .type_attribute("rex_node", "#[derive(::enum_as_inner::EnumAsInner)]") .type_attribute(