From 09dea549f7ceb3bf311548589cd834a63f2fa257 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 6 Nov 2023 23:20:41 +0800 Subject: [PATCH] refactor(stream): remove `async_trait` macro for traits (#13273) Signed-off-by: TennyZhuang --- src/stream/src/executor/top_n/group_top_n.rs | 2 -- .../executor/top_n/group_top_n_appendonly.rs | 3 +-- .../src/executor/top_n/top_n_appendonly.rs | 2 -- src/stream/src/executor/top_n/top_n_cache.rs | 12 +++-------- src/stream/src/executor/top_n/top_n_plain.rs | 2 -- src/stream/src/executor/top_n/utils.rs | 20 +++++++++++++------ .../src/from_proto/append_only_dedup.rs | 1 - src/stream/src/from_proto/barrier_recv.rs | 1 - src/stream/src/from_proto/batch_query.rs | 1 - src/stream/src/from_proto/chain.rs | 1 - src/stream/src/from_proto/dml.rs | 1 - src/stream/src/from_proto/dynamic_filter.rs | 1 - src/stream/src/from_proto/eowc_over_window.rs | 1 - src/stream/src/from_proto/expand.rs | 1 - src/stream/src/from_proto/filter.rs | 1 - src/stream/src/from_proto/group_top_n.rs | 1 - src/stream/src/from_proto/hash_agg.rs | 1 - src/stream/src/from_proto/hash_join.rs | 1 - src/stream/src/from_proto/hop_window.rs | 1 - src/stream/src/from_proto/lookup.rs | 1 - src/stream/src/from_proto/lookup_union.rs | 1 - src/stream/src/from_proto/merge.rs | 1 - src/stream/src/from_proto/mod.rs | 5 ++--- src/stream/src/from_proto/mview.rs | 2 -- src/stream/src/from_proto/no_op.rs | 1 - src/stream/src/from_proto/now.rs | 1 - src/stream/src/from_proto/over_window.rs | 1 - src/stream/src/from_proto/project.rs | 1 - src/stream/src/from_proto/project_set.rs | 1 - src/stream/src/from_proto/row_id_gen.rs | 1 - src/stream/src/from_proto/simple_agg.rs | 1 - src/stream/src/from_proto/sink.rs | 1 - src/stream/src/from_proto/sort.rs | 1 - src/stream/src/from_proto/source/fs_fetch.rs | 1 - .../src/from_proto/source/trad_source.rs | 1 - .../src/from_proto/stateless_simple_agg.rs | 1 - src/stream/src/from_proto/temporal_join.rs | 1 - src/stream/src/from_proto/top_n.rs | 1 - src/stream/src/from_proto/union.rs | 1 - src/stream/src/from_proto/values.rs | 1 - src/stream/src/from_proto/watermark_filter.rs | 1 - .../tests/integration_tests/snapshot.rs | 2 +- 42 files changed, 21 insertions(+), 62 deletions(-) diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 92681e3c31426..a162527bcc830 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -15,7 +15,6 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use async_trait::async_trait; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::HashKey; @@ -157,7 +156,6 @@ impl DerefMut for GroupTopNCache TopNExecutorBase for InnerGroupTopNExecutor where diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index f8f3d4887b970..7204acae0d3af 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -28,7 +28,6 @@ use std::sync::Arc; -use async_trait::async_trait; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::HashKey; @@ -151,7 +150,7 @@ impl }) } } -#[async_trait] + impl TopNExecutorBase for InnerAppendOnlyGroupTopNExecutor where diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index a56b43e4c5903..499ba97ee3084 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use async_trait::async_trait; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; @@ -104,7 +103,6 @@ impl InnerAppendOnlyTopNExecutor TopNExecutorBase for InnerAppendOnlyTopNExecutor where diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index a1b7e26e8ae3a..aed23760c332f 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -14,8 +14,8 @@ use std::cmp::Ordering; use std::fmt::Debug; +use std::future::Future; -use async_trait::async_trait; use itertools::Itertools; use risingwave_common::array::{Op, RowRef}; use risingwave_common::estimate_size::EstimateSize; @@ -116,7 +116,6 @@ impl Debug for TopNCache { /// This trait is used as a bound. It is needed since /// `TopNCache::::f` and `TopNCache::::f` /// don't imply `TopNCache::::f`. -#[async_trait] pub trait TopNCacheTrait { /// Insert input row to corresponding cache range according to its order key. /// @@ -140,7 +139,7 @@ pub trait TopNCacheTrait { /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix /// scan of the state table. #[allow(clippy::too_many_arguments)] - async fn delete( + fn delete( &mut self, group_key: Option, managed_state: &mut ManagedTopNState, @@ -148,7 +147,7 @@ pub trait TopNCacheTrait { row: impl Row + Send, res_ops: &mut Vec, res_rows: &mut Vec, - ) -> StreamExecutorResult<()>; + ) -> impl Future> + Send; } impl TopNCache { @@ -249,7 +248,6 @@ impl TopNCache { } } -#[async_trait] impl TopNCacheTrait for TopNCache { fn insert( &mut self, @@ -387,7 +385,6 @@ impl TopNCacheTrait for TopNCache { } } -#[async_trait] impl TopNCacheTrait for TopNCache { fn insert( &mut self, @@ -547,7 +544,6 @@ impl TopNCacheTrait for TopNCache { } /// Similar to [`TopNCacheTrait`], but for append-only TopN. -#[async_trait] pub trait AppendOnlyTopNCacheTrait { /// Insert input row to corresponding cache range according to its order key. /// @@ -567,7 +563,6 @@ pub trait AppendOnlyTopNCacheTrait { ) -> StreamExecutorResult<()>; } -#[async_trait] impl AppendOnlyTopNCacheTrait for TopNCache { fn insert( &mut self, @@ -630,7 +625,6 @@ impl AppendOnlyTopNCacheTrait for TopNCache { } } -#[async_trait] impl AppendOnlyTopNCacheTrait for TopNCache { fn insert( &mut self, diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index e3cc70b9fc0b7..933cc9a31d7c2 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use async_trait::async_trait; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::row::RowExt; use risingwave_common::util::epoch::EpochPair; @@ -127,7 +126,6 @@ impl InnerTopNExecutor { } } -#[async_trait] impl TopNExecutorBase for InnerTopNExecutor where TopNCache: TopNCacheTrait, diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index cd235e9a26e00..7c04485673c5e 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::sync::Arc; -use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; @@ -34,13 +34,18 @@ use crate::executor::{ ExecutorInfo, Message, PkIndicesRef, Watermark, }; -#[async_trait] pub trait TopNExecutorBase: Send + 'static { /// Apply the chunk to the dirty state and get the diffs. - async fn apply_chunk(&mut self, chunk: StreamChunk) -> StreamExecutorResult; + fn apply_chunk( + &mut self, + chunk: StreamChunk, + ) -> impl Future> + Send; /// Flush the buffered chunk to the storage backend. - async fn flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>; + fn flush_data( + &mut self, + epoch: EpochPair, + ) -> impl Future> + Send; fn info(&self) -> &ExecutorInfo; @@ -68,10 +73,13 @@ pub trait TopNExecutorBase: Send + 'static { fn evict(&mut self) {} fn update_epoch(&mut self, _epoch: u64) {} - async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()>; + fn init(&mut self, epoch: EpochPair) -> impl Future> + Send; /// Handle incoming watermarks - async fn handle_watermark(&mut self, watermark: Watermark) -> Option; + fn handle_watermark( + &mut self, + watermark: Watermark, + ) -> impl Future> + Send; } /// The struct wraps a [`TopNExecutorBase`] diff --git a/src/stream/src/from_proto/append_only_dedup.rs b/src/stream/src/from_proto/append_only_dedup.rs index f8c78532dc661..ac44fcb87ed9e 100644 --- a/src/stream/src/from_proto/append_only_dedup.rs +++ b/src/stream/src/from_proto/append_only_dedup.rs @@ -26,7 +26,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct AppendOnlyDedupExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder { type Node = DedupNode; diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs index d4e164a38e456..42efc234ebb8c 100644 --- a/src/stream/src/from_proto/barrier_recv.rs +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -20,7 +20,6 @@ use crate::executor::BarrierRecvExecutor; pub struct BarrierRecvExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for BarrierRecvExecutorBuilder { type Node = BarrierRecvNode; diff --git a/src/stream/src/from_proto/batch_query.rs b/src/stream/src/from_proto/batch_query.rs index 8b9054b9a776e..2913ae3cd7262 100644 --- a/src/stream/src/from_proto/batch_query.rs +++ b/src/stream/src/from_proto/batch_query.rs @@ -27,7 +27,6 @@ use crate::executor::{BatchQueryExecutor, DummyExecutor}; pub struct BatchQueryExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for BatchQueryExecutorBuilder { type Node = BatchPlanNode; diff --git a/src/stream/src/from_proto/chain.rs b/src/stream/src/from_proto/chain.rs index e119007199068..5e41328f5ff81 100644 --- a/src/stream/src/from_proto/chain.rs +++ b/src/stream/src/from_proto/chain.rs @@ -34,7 +34,6 @@ use crate::executor::{ pub struct ChainExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for ChainExecutorBuilder { type Node = ChainNode; diff --git a/src/stream/src/from_proto/dml.rs b/src/stream/src/from_proto/dml.rs index 97d565f47aa0c..e9242edf98f9f 100644 --- a/src/stream/src/from_proto/dml.rs +++ b/src/stream/src/from_proto/dml.rs @@ -25,7 +25,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct DmlExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for DmlExecutorBuilder { type Node = DmlNode; diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index 23341c157524a..81c268530789a 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -25,7 +25,6 @@ use crate::executor::DynamicFilterExecutor; pub struct DynamicFilterExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for DynamicFilterExecutorBuilder { type Node = DynamicFilterNode; diff --git a/src/stream/src/from_proto/eowc_over_window.rs b/src/stream/src/from_proto/eowc_over_window.rs index bcee0736ae30f..a6f9615d1a2a5 100644 --- a/src/stream/src/from_proto/eowc_over_window.rs +++ b/src/stream/src/from_proto/eowc_over_window.rs @@ -28,7 +28,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct EowcOverWindowExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for EowcOverWindowExecutorBuilder { type Node = PbEowcOverWindowNode; diff --git a/src/stream/src/from_proto/expand.rs b/src/stream/src/from_proto/expand.rs index b4897684a04c4..e88e944396203 100644 --- a/src/stream/src/from_proto/expand.rs +++ b/src/stream/src/from_proto/expand.rs @@ -19,7 +19,6 @@ use crate::executor::ExpandExecutor; pub struct ExpandExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for ExpandExecutorBuilder { type Node = ExpandNode; diff --git a/src/stream/src/from_proto/filter.rs b/src/stream/src/from_proto/filter.rs index 47661e105c506..f703153fa4edd 100644 --- a/src/stream/src/from_proto/filter.rs +++ b/src/stream/src/from_proto/filter.rs @@ -20,7 +20,6 @@ use crate::executor::FilterExecutor; pub struct FilterExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for FilterExecutorBuilder { type Node = FilterNode; diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index a7fc0d741206e..aed722e41132d 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -26,7 +26,6 @@ use crate::task::AtomicU64Ref; pub struct GroupTopNExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for GroupTopNExecutorBuilder { type Node = GroupTopNNode; diff --git a/src/stream/src/from_proto/hash_agg.rs b/src/stream/src/from_proto/hash_agg.rs index faf8a1f7fdad1..8160b1eb63658 100644 --- a/src/stream/src/from_proto/hash_agg.rs +++ b/src/stream/src/from_proto/hash_agg.rs @@ -48,7 +48,6 @@ impl HashKeyDispatcher for HashAggExecutorDispatcherArgs { pub struct HashAggExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for HashAggExecutorBuilder { type Node = HashAggNode; diff --git a/src/stream/src/from_proto/hash_join.rs b/src/stream/src/from_proto/hash_join.rs index 87174282e517a..19b2760f2d91e 100644 --- a/src/stream/src/from_proto/hash_join.rs +++ b/src/stream/src/from_proto/hash_join.rs @@ -33,7 +33,6 @@ use crate::task::AtomicU64Ref; pub struct HashJoinExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for HashJoinExecutorBuilder { type Node = HashJoinNode; diff --git a/src/stream/src/from_proto/hop_window.rs b/src/stream/src/from_proto/hop_window.rs index 5bf0240155fc2..8cc61888c3880 100644 --- a/src/stream/src/from_proto/hop_window.rs +++ b/src/stream/src/from_proto/hop_window.rs @@ -22,7 +22,6 @@ use crate::executor::HopWindowExecutor; pub struct HopWindowExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for HopWindowExecutorBuilder { type Node = HopWindowNode; diff --git a/src/stream/src/from_proto/lookup.rs b/src/stream/src/from_proto/lookup.rs index c6e481fdfd77b..edf67e57303d7 100644 --- a/src/stream/src/from_proto/lookup.rs +++ b/src/stream/src/from_proto/lookup.rs @@ -24,7 +24,6 @@ use crate::executor::{LookupExecutor, LookupExecutorParams}; pub struct LookupExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for LookupExecutorBuilder { type Node = LookupNode; diff --git a/src/stream/src/from_proto/lookup_union.rs b/src/stream/src/from_proto/lookup_union.rs index 38d2fbf5bf036..582b5fb745996 100644 --- a/src/stream/src/from_proto/lookup_union.rs +++ b/src/stream/src/from_proto/lookup_union.rs @@ -19,7 +19,6 @@ use crate::executor::LookupUnionExecutor; pub struct LookupUnionExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for LookupUnionExecutorBuilder { type Node = LookupUnionNode; diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index 450d55d9b8641..bc963f8d2351f 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -21,7 +21,6 @@ use crate::executor::{MergeExecutor, ReceiverExecutor}; pub struct MergeExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for MergeExecutorBuilder { type Node = MergeNode; diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 2ec9476d0e904..11f428a09b71d 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -92,17 +92,16 @@ use crate::executor::{BoxedExecutor, Executor, ExecutorInfo}; use crate::from_proto::values::ValuesExecutorBuilder; use crate::task::{ExecutorParams, LocalStreamManagerCore}; -#[async_trait::async_trait] trait ExecutorBuilder { type Node; /// Create a [`BoxedExecutor`] from [`StreamNode`]. - async fn new_boxed_executor( + fn new_boxed_executor( params: ExecutorParams, node: &Self::Node, store: impl StateStore, stream: &mut LocalStreamManagerCore, - ) -> StreamResult; + ) -> impl std::future::Future> + Send; } macro_rules! build_executor { diff --git a/src/stream/src/from_proto/mview.rs b/src/stream/src/from_proto/mview.rs index d64490b29b84a..e960beedc3691 100644 --- a/src/stream/src/from_proto/mview.rs +++ b/src/stream/src/from_proto/mview.rs @@ -25,7 +25,6 @@ use crate::executor::MaterializeExecutor; pub struct MaterializeExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for MaterializeExecutorBuilder { type Node = MaterializeNode; @@ -85,7 +84,6 @@ impl ExecutorBuilder for MaterializeExecutorBuilder { pub struct ArrangeExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for ArrangeExecutorBuilder { type Node = ArrangeNode; diff --git a/src/stream/src/from_proto/no_op.rs b/src/stream/src/from_proto/no_op.rs index b9bae5f906ffc..07afc3fba9d6a 100644 --- a/src/stream/src/from_proto/no_op.rs +++ b/src/stream/src/from_proto/no_op.rs @@ -22,7 +22,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct NoOpExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for NoOpExecutorBuilder { type Node = NoOpNode; diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index f34f2ab07abe3..c763ff952f3d9 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -24,7 +24,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct NowExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for NowExecutorBuilder { type Node = NowNode; diff --git a/src/stream/src/from_proto/over_window.rs b/src/stream/src/from_proto/over_window.rs index e18e753caf126..9616ab959d30e 100644 --- a/src/stream/src/from_proto/over_window.rs +++ b/src/stream/src/from_proto/over_window.rs @@ -28,7 +28,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct OverWindowExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for OverWindowExecutorBuilder { type Node = PbOverWindowNode; diff --git a/src/stream/src/from_proto/project.rs b/src/stream/src/from_proto/project.rs index ea01fd5c129c8..e6656f357f6bd 100644 --- a/src/stream/src/from_proto/project.rs +++ b/src/stream/src/from_proto/project.rs @@ -23,7 +23,6 @@ use crate::executor::ProjectExecutor; pub struct ProjectExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for ProjectExecutorBuilder { type Node = ProjectNode; diff --git a/src/stream/src/from_proto/project_set.rs b/src/stream/src/from_proto/project_set.rs index 57c422169e54f..33a7d0c3565d1 100644 --- a/src/stream/src/from_proto/project_set.rs +++ b/src/stream/src/from_proto/project_set.rs @@ -22,7 +22,6 @@ use crate::executor::ProjectSetExecutor; pub struct ProjectSetExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for ProjectSetExecutorBuilder { type Node = ProjectSetNode; diff --git a/src/stream/src/from_proto/row_id_gen.rs b/src/stream/src/from_proto/row_id_gen.rs index dca80cfa89449..a54dad43acacf 100644 --- a/src/stream/src/from_proto/row_id_gen.rs +++ b/src/stream/src/from_proto/row_id_gen.rs @@ -23,7 +23,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct RowIdGenExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for RowIdGenExecutorBuilder { type Node = RowIdGenNode; diff --git a/src/stream/src/from_proto/simple_agg.rs b/src/stream/src/from_proto/simple_agg.rs index fdd6d877b99ed..acd618720c9f2 100644 --- a/src/stream/src/from_proto/simple_agg.rs +++ b/src/stream/src/from_proto/simple_agg.rs @@ -27,7 +27,6 @@ use crate::executor::SimpleAggExecutor; pub struct SimpleAggExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for SimpleAggExecutorBuilder { type Node = SimpleAggNode; diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 47f21c0a223cf..9a2ca590b1c5a 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -31,7 +31,6 @@ use crate::executor::SinkExecutor; pub struct SinkExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for SinkExecutorBuilder { type Node = SinkNode; diff --git a/src/stream/src/from_proto/sort.rs b/src/stream/src/from_proto/sort.rs index 8399bcb39866e..2e15e041842cb 100644 --- a/src/stream/src/from_proto/sort.rs +++ b/src/stream/src/from_proto/sort.rs @@ -22,7 +22,6 @@ use crate::executor::{SortExecutor, SortExecutorArgs}; pub struct SortExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for SortExecutorBuilder { type Node = SortNode; diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 65923d2dced3a..5519428d0e46b 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -31,7 +31,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; pub struct FsFetchExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for FsFetchExecutorBuilder { type Node = StreamFsFetchNode; diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index ec0c2ffcb5d13..7557d9ed72871 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -32,7 +32,6 @@ use crate::executor::{CdcBackfillExecutor, FlowControlExecutor, FsSourceExecutor const FS_CONNECTORS: &[&str] = &["s3"]; pub struct SourceExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for SourceExecutorBuilder { type Node = SourceNode; diff --git a/src/stream/src/from_proto/stateless_simple_agg.rs b/src/stream/src/from_proto/stateless_simple_agg.rs index f26316c86e6d0..0654aea6da41f 100644 --- a/src/stream/src/from_proto/stateless_simple_agg.rs +++ b/src/stream/src/from_proto/stateless_simple_agg.rs @@ -20,7 +20,6 @@ use crate::executor::StatelessSimpleAggExecutor; pub struct StatelessSimpleAggExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for StatelessSimpleAggExecutorBuilder { type Node = SimpleAggNode; diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index 58699089e8c27..32d89af3926de 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -30,7 +30,6 @@ use crate::task::AtomicU64Ref; pub struct TemporalJoinExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for TemporalJoinExecutorBuilder { type Node = TemporalJoinNode; diff --git a/src/stream/src/from_proto/top_n.rs b/src/stream/src/from_proto/top_n.rs index f0aa967aae143..6c3f303c66e15 100644 --- a/src/stream/src/from_proto/top_n.rs +++ b/src/stream/src/from_proto/top_n.rs @@ -23,7 +23,6 @@ use crate::executor::{AppendOnlyTopNExecutor, TopNExecutor}; pub struct TopNExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for TopNExecutorBuilder { type Node = TopNNode; diff --git a/src/stream/src/from_proto/union.rs b/src/stream/src/from_proto/union.rs index 4a4b86e319682..077ee5a0c41a0 100644 --- a/src/stream/src/from_proto/union.rs +++ b/src/stream/src/from_proto/union.rs @@ -19,7 +19,6 @@ use crate::executor::UnionExecutor; pub struct UnionExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for UnionExecutorBuilder { type Node = UnionNode; diff --git a/src/stream/src/from_proto/values.rs b/src/stream/src/from_proto/values.rs index 077eea3511108..75e49741ddc0c 100644 --- a/src/stream/src/from_proto/values.rs +++ b/src/stream/src/from_proto/values.rs @@ -28,7 +28,6 @@ use crate::task::{ExecutorParams, LocalStreamManagerCore}; /// this executor. May refractor with `BarrierRecvExecutor` in the near future. pub struct ValuesExecutorBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for ValuesExecutorBuilder { type Node = ValuesNode; diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index 52c452115a4ce..fa7856cb3c9a9 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -23,7 +23,6 @@ use crate::executor::WatermarkFilterExecutor; pub struct WatermarkFilterBuilder; -#[async_trait::async_trait] impl ExecutorBuilder for WatermarkFilterBuilder { type Node = WatermarkFilterNode; diff --git a/src/stream/tests/integration_tests/snapshot.rs b/src/stream/tests/integration_tests/snapshot.rs index 272a0e20bcf47..123654ecb2aa6 100644 --- a/src/stream/tests/integration_tests/snapshot.rs +++ b/src/stream/tests/integration_tests/snapshot.rs @@ -100,7 +100,7 @@ pub async fn check_with_script( /// This is a DSL for the input and output of executor snapshot tests. /// -/// It immitates [`Message`], but more ser/de friendly. +/// It imitates [`Message`], but more ser/de friendly. #[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] enum SnapshotEvent {