From edc65a063aed8ffabdc71161982cb2732b34056d Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 19 Apr 2024 20:53:05 +0800 Subject: [PATCH] refactor(stream): add prelude for streaming executor modules (#16404) Signed-off-by: Richard Chien --- src/common/src/array/mod.rs | 1 + src/compute/tests/integration_tests.rs | 2 +- .../src/common/log_store_impl/in_mem.rs | 3 +- src/stream/src/common/mod.rs | 1 - src/stream/src/executor/agg_common.rs | 6 +-- .../executor/backfill/arrangement_backfill.rs | 20 +++------- .../src/executor/backfill/cdc/cdc_backfill.rs | 21 +++------- .../executor/backfill/no_shuffle_backfill.rs | 20 ++-------- src/stream/src/executor/barrier_recv.rs | 6 +-- src/stream/src/executor/batch_query.rs | 11 +---- src/stream/src/executor/chain.rs | 8 +--- .../src/executor/dedup/append_only_dedup.rs | 19 ++------- src/stream/src/executor/dedup/cache.rs | 2 +- src/stream/src/executor/dispatch.rs | 17 +++----- src/stream/src/executor/dml.rs | 8 +--- src/stream/src/executor/dynamic_filter.rs | 17 +++----- src/stream/src/executor/exchange/input.rs | 6 +-- src/stream/src/executor/expand.rs | 9 +---- src/stream/src/executor/filter.rs | 12 +----- src/stream/src/executor/hash_agg.rs | 18 +-------- src/stream/src/executor/hash_join.rs | 19 ++------- src/stream/src/executor/hop_window.rs | 6 +-- src/stream/src/executor/lookup.rs | 6 +-- src/stream/src/executor/lookup/cache.rs | 6 +-- src/stream/src/executor/lookup/impl_.rs | 12 ++---- src/stream/src/executor/lookup_union.rs | 4 +- src/stream/src/executor/merge.rs | 6 +-- src/stream/src/executor/mod.rs | 3 +- src/stream/src/executor/mview/materialize.rs | 22 ++++------ src/stream/src/executor/no_op.rs | 2 +- src/stream/src/executor/now.rs | 13 ++---- src/stream/src/executor/over_window/eowc.rs | 15 ++----- .../src/executor/over_window/general.rs | 17 ++------ src/stream/src/executor/prelude.rs | 40 +++++++++++++++++++ src/stream/src/executor/project.rs | 14 ++----- src/stream/src/executor/project_set.rs | 14 ++----- src/stream/src/executor/rearranged_chain.rs | 11 ++--- src/stream/src/executor/receiver.rs | 9 +---- src/stream/src/executor/row_id_gen.rs | 9 +---- src/stream/src/executor/simple_agg.rs | 17 +++----- src/stream/src/executor/sink.rs | 15 ++----- src/stream/src/executor/sort.rs | 11 +---- .../src/executor/source/fetch_executor.rs | 23 ++++------- .../src/executor/source/fs_source_executor.rs | 20 +++++----- .../src/executor/source/list_executor.rs | 11 ++--- src/stream/src/executor/source/mod.rs | 26 +++++++----- .../source/source_backfill_executor.rs | 17 ++++---- .../src/executor/source/source_executor.rs | 21 ++++++---- .../src/executor/stateless_simple_agg.rs | 8 +--- src/stream/src/executor/stream_reader.rs | 3 +- src/stream/src/executor/subscription.rs | 7 +--- src/stream/src/executor/subtask.rs | 6 +-- src/stream/src/executor/temporal_join.rs | 20 ++-------- src/stream/src/executor/top_n/group_top_n.rs | 11 +---- .../executor/top_n/group_top_n_appendonly.rs | 12 +----- .../src/executor/top_n/top_n_appendonly.rs | 9 +---- src/stream/src/executor/top_n/top_n_plain.rs | 9 +---- src/stream/src/executor/top_n/utils.rs | 14 ++----- src/stream/src/executor/troublemaker.rs | 6 +-- src/stream/src/executor/union.rs | 6 +-- src/stream/src/executor/utils.rs | 6 +-- src/stream/src/executor/values.rs | 8 +--- src/stream/src/executor/watermark_filter.rs | 12 +----- src/stream/src/executor/wrapper.rs | 6 +-- src/stream/src/from_proto/source/fs_fetch.rs | 5 +-- .../src/from_proto/source/trad_source.rs | 8 ++-- src/stream/src/from_proto/source_backfill.rs | 8 ++-- 67 files changed, 246 insertions(+), 514 deletions(-) create mode 100644 src/stream/src/executor/prelude.rs diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 0fcb1222eef74..93f6255038e9e 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -63,6 +63,7 @@ pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayI use risingwave_common_estimate_size::EstimateSize; use risingwave_pb::data::PbArray; pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt}; +pub use stream_chunk_builder::StreamChunkBuilder; pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue}; pub use utf8_array::*; diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 49f54c18a5ff6..f34a2940e9ce5 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -53,7 +53,7 @@ use risingwave_stream::error::StreamResult; use risingwave_stream::executor::dml::DmlExecutor; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::executor::row_id_gen::RowIdGenExecutor; -use risingwave_stream::executor::source_executor::SourceExecutor; +use risingwave_stream::executor::source::SourceExecutor; use risingwave_stream::executor::{ ActorContext, Barrier, Execute, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices, }; diff --git a/src/stream/src/common/log_store_impl/in_mem.rs b/src/stream/src/common/log_store_impl/in_mem.rs index 4b1f003ce6562..ccb2eb878ede9 100644 --- a/src/stream/src/common/log_store_impl/in_mem.rs +++ b/src/stream/src/common/log_store_impl/in_mem.rs @@ -319,7 +319,7 @@ mod tests { use std::task::Poll; use futures::FutureExt; - use risingwave_common::array::Op; + use risingwave_common::array::{Op, StreamChunkBuilder}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::epoch::{test_epoch, EpochPair}; use risingwave_connector::sink::log_store::{ @@ -327,7 +327,6 @@ mod tests { }; use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory; - use crate::common::StreamChunkBuilder; #[tokio::test] async fn test_in_memory_log_store() { diff --git a/src/stream/src/common/mod.rs b/src/stream/src/common/mod.rs index e7d6fda232ac0..23dc006bf8d63 100644 --- a/src/stream/src/common/mod.rs +++ b/src/stream/src/common/mod.rs @@ -13,7 +13,6 @@ // limitations under the License. pub use column_mapping::*; -pub use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; pub mod cache; mod column_mapping; diff --git a/src/stream/src/executor/agg_common.rs b/src/stream/src/executor/agg_common.rs index 91c414877a76b..2cb3cad8fb2d8 100644 --- a/src/stream/src/executor/agg_common.rs +++ b/src/stream/src/executor/agg_common.rs @@ -16,13 +16,9 @@ use std::collections::HashMap; use risingwave_expr::aggregate::AggCall; use risingwave_pb::stream_plan::PbAggNodeVersion; -use risingwave_storage::StateStore; use super::aggregation::AggStateStorage; -use super::{Executor, ExecutorInfo}; -use crate::common::table::state_table::StateTable; -use crate::executor::ActorContextRef; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// Arguments needed to construct an `XxxAggExecutor`. pub struct AggExecutorArgs { diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index b4adf038e5b78..f43d7767b45ce 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -12,24 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::pin; -use std::sync::Arc; +use std::collections::HashMap; use either::Either; use futures::stream::{select_all, select_with_strategy}; -use futures::{stream, StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::{stream, TryStreamExt}; use itertools::Itertools; -use risingwave_common::array::{DataChunk, Op, StreamChunk}; +use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bail; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; -use risingwave_common::row::OwnedRow; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::StateStore; -use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; +use crate::common::table::state_table::ReplicatedStateTable; #[cfg(debug_assertions)] use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ @@ -37,12 +33,8 @@ use crate::executor::backfill::utils::{ mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState, }; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ - expect_first_barrier, Barrier, BoxedMessageStream, Execute, Executor, HashMap, Message, - StreamExecutorError, StreamExecutorResult, -}; -use crate::task::{ActorId, CreateMviewProgress}; +use crate::executor::prelude::*; +use crate::task::CreateMviewProgress; type Builders = HashMap; diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 96a53ace0b25b..dc8e6bc67ddaa 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -12,19 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::{pin, Pin}; -use std::sync::Arc; +use std::pin::Pin; use either::Either; +use futures::stream; use futures::stream::select_with_strategy; -use futures::{pin_mut, stream, StreamExt}; -use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::array::DataChunk; use risingwave_common::bail; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; -use risingwave_common::row::{OwnedRow, Row, RowExt}; -use risingwave_common::types::{DataType, ScalarRefImpl}; +use risingwave_common::catalog::{ColumnDesc, ColumnId}; +use risingwave_common::row::RowExt; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::parser::{ DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, @@ -32,10 +29,8 @@ use risingwave_connector::parser::{ }; use risingwave_connector::source::cdc::external::CdcOffset; use risingwave_connector::source::{SourceColumnDesc, SourceContext}; -use risingwave_storage::StateStore; use rw_futures_util::pausable; -use crate::common::table::state_table::StateTable; use crate::executor::backfill::cdc::state::CdcBackfillState; use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable; use crate::executor::backfill::cdc::upstream_table::snapshot::{ @@ -44,11 +39,7 @@ use crate::executor::backfill::cdc::upstream_table::snapshot::{ use crate::executor::backfill::utils::{ get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk, }; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, - StreamExecutorError, StreamExecutorResult, -}; +use crate::executor::prelude::*; use crate::task::CreateMviewProgress; /// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each. diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index ca14f904e2dd0..fdf955fbbdfc7 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -12,37 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::pin; -use std::sync::Arc; - use either::Either; +use futures::stream; use futures::stream::select_with_strategy; -use futures::{stream, StreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::{DataChunk, Op, StreamChunk}; +use risingwave_common::array::{DataChunk, Op}; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::Datum; use risingwave_common::util::epoch::EpochPair; use risingwave_common::{bail, row}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::StateStore; -use crate::common::table::state_table::StateTable; use crate::executor::backfill::utils; use crate::executor::backfill::utils::{ compute_bounds, construct_initial_finished_state, create_builder, create_limiter, get_new_pos, mapping_chunk, mapping_message, mark_chunk, owned_row_iter, BackfillRateLimiter, METADATA_STATE_LEN, }; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ - expect_first_barrier, Barrier, BoxedMessageStream, Execute, Executor, Message, Mutation, - StreamExecutorError, StreamExecutorResult, -}; -use crate::task::{ActorId, CreateMviewProgress}; +use crate::executor::prelude::*; +use crate::task::CreateMviewProgress; /// Schema: | vnode | pk ... | `backfill_finished` | `row_count` | /// We can decode that into `BackfillState` on recovery. diff --git a/src/stream/src/executor/barrier_recv.rs b/src/stream/src/executor/barrier_recv.rs index 79f6157ab6bcd..a7c02ad6e24f5 100644 --- a/src/stream/src/executor/barrier_recv.rs +++ b/src/stream/src/executor/barrier_recv.rs @@ -12,14 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; -use super::{ - ActorContext, ActorContextRef, Barrier, BoxedMessageStream, Execute, Message, - StreamExecutorError, -}; +use crate::executor::prelude::*; /// The executor only for receiving barrier from the meta service. It always resides in the leaves /// of the streaming graph. diff --git a/src/stream/src/executor/batch_query.rs b/src/stream/src/executor/batch_query.rs index 7c92bcd732423..d7c7f38d99504 100644 --- a/src/stream/src/executor/batch_query.rs +++ b/src/stream/src/executor/batch_query.rs @@ -12,20 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use await_tree::InstrumentAwait; -use futures::{pin_mut, StreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::Schema; +use risingwave_common::array::Op; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::collect_data_chunk; -use risingwave_storage::StateStore; -use super::error::StreamExecutorError; -use super::{Execute, Message}; -use crate::executor::BoxedMessageStream; +use crate::executor::prelude::*; pub struct BatchQueryExecutor { /// The [`StorageTable`] that needs to be queried diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index c8b7b25852e74..ebcbe1e4e49bb 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -12,12 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; -use futures_async_stream::try_stream; - -use super::error::StreamExecutorError; -use super::{expect_first_barrier, Execute, Executor, Message}; -use crate::task::{ActorId, CreateMviewProgress}; +use crate::executor::prelude::*; +use crate::task::CreateMviewProgress; /// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and /// newly appended executors. Currently, [`ChainExecutor`] is mainly used to implement MV on MV diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 7b171a1ac844d..f73e196815400 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -12,26 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use futures::{stream, StreamExt}; -use futures_async_stream::try_stream; +use futures::stream; use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::BitmapBuilder; -use risingwave_common::row::{OwnedRow, Row, RowExt}; -use risingwave_storage::StateStore; +use risingwave_common::row::RowExt; use super::cache::DedupCache; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTable; -use crate::executor::error::StreamExecutorError; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, - StreamExecutorResult, -}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous /// messages. It only accepts append-only input, and its output will be append-only as well. diff --git a/src/stream/src/executor/dedup/cache.rs b/src/stream/src/executor/dedup/cache.rs index 5a9d876c356e3..245e7b7dbae95 100644 --- a/src/stream/src/executor/dedup/cache.rs +++ b/src/stream/src/executor/dedup/cache.rs @@ -18,7 +18,7 @@ use risingwave_common_estimate_size::EstimateSize; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only /// accepts a key without a value. This could be refined in the future to support k-v pairs. diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 28c5edd57ead8..6c644466f9683 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -13,17 +13,13 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; -use std::fmt::{Debug, Formatter}; use std::future::Future; use std::iter::repeat_with; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; -use await_tree::InstrumentAwait; -use futures::{Stream, StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::TryStreamExt; use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode}; use risingwave_common::metrics::LabelGuardedIntCounter; @@ -35,11 +31,10 @@ use tokio::time::Instant; use tracing::{event, Instrument}; use super::exchange::output::{new_output, BoxedOutput}; -use super::{AddMutation, Executor, TroublemakerExecutor, UpdateMutation, Watermark}; -use crate::error::StreamResult; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{Barrier, Message, Mutation, StreamConsumer}; -use crate::task::{ActorId, DispatcherId, SharedContext}; +use super::{AddMutation, TroublemakerExecutor, UpdateMutation}; +use crate::executor::prelude::*; +use crate::executor::StreamConsumer; +use crate::task::{DispatcherId, SharedContext}; /// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually, /// data chunks will be dispatched with some specified policy, while control message diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 69230ae60735b..b8839d76000c1 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -16,17 +16,13 @@ use std::collections::BTreeMap; use std::mem; use either::Either; -use futures::{StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::StreamChunk; +use futures::TryStreamExt; use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId}; use risingwave_common::transaction::transaction_id::TxnId; use risingwave_common::transaction::transaction_message::TxnMsg; use risingwave_dml::dml_manager::DmlManagerRef; -use super::error::StreamExecutorError; -use super::{expect_first_barrier, BoxedMessageStream, Execute, Executor, Message, Mutation}; -use crate::common::StreamChunkBuilder; +use crate::executor::prelude::*; use crate::executor::stream_reader::StreamReaderWithPause; /// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index fe94c6449ac49..ddbe8352b2e8a 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -15,14 +15,12 @@ use std::ops::Bound::{self, *}; use std::sync::Arc; -use futures::{pin_mut, stream, StreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk}; +use futures::stream; +use risingwave_common::array::{Array, ArrayImpl, Op}; use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; -use risingwave_common::catalog::Schema; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::row::{self, once, OwnedRow, OwnedRow as RowData, Row}; +use risingwave_common::row::{self, once, OwnedRow as RowData}; use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::{ @@ -33,16 +31,11 @@ use risingwave_pb::expr::expr_node::Type::{ GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, }; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::StateStore; use super::barrier_align::*; -use super::error::StreamExecutorError; -use super::monitor::StreamingMetrics; -use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message}; -use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable}; -use crate::common::StreamChunkBuilder; +use crate::common::table::state_table::WatermarkCacheParameterizedStateTable; use crate::consistency::consistency_panic; -use crate::executor::expect_first_barrier_from_aligned_stream; +use crate::executor::prelude::*; use crate::task::ActorEvalErrorReport; pub struct DynamicFilterExecutor { diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 7b7cc151f46cd..98ac3d278f0a3 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -29,13 +29,13 @@ use super::permit::Receiver; use crate::error::StreamResult; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; -use crate::executor::*; +use crate::executor::prelude::*; use crate::task::{ FragmentId, LocalBarrierManager, SharedContext, UpDownActorIds, UpDownFragmentIds, }; -/// `Input` provides an interface for [`MergeExecutor`] and [`ReceiverExecutor`] to receive data -/// from upstream actors. +/// `Input` provides an interface for [`MergeExecutor`](crate::executor::MergeExecutor) and +/// [`ReceiverExecutor`](crate::executor::ReceiverExecutor) to receive data from upstream actors. pub trait Input: MessageStream { /// The upstream actor id. fn actor_id(&self) -> ActorId; diff --git a/src/stream/src/executor/expand.rs b/src/stream/src/executor/expand.rs index 2375ebca3d0df..46e1d5765ee0e 100644 --- a/src/stream/src/executor/expand.rs +++ b/src/stream/src/executor/expand.rs @@ -12,14 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Debug; +use risingwave_common::array::{Array, I64Array}; -use futures::StreamExt; -use futures_async_stream::try_stream; -use risingwave_common::array::{Array, I64Array, StreamChunk}; - -use super::error::StreamExecutorError; -use super::{BoxedMessageStream, Execute, Executor, Message}; +use crate::executor::prelude::*; pub struct ExpandExecutor { input: Executor, diff --git a/src/stream/src/executor/filter.rs b/src/stream/src/executor/filter.rs index ffc8847c42abf..4d1ecb098bd8f 100644 --- a/src/stream/src/executor/filter.rs +++ b/src/stream/src/executor/filter.rs @@ -12,20 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; - -use futures::StreamExt; -use futures_async_stream::try_stream; -use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk}; +use risingwave_common::array::{Array, ArrayImpl, Op}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::NonStrictExpression; -use super::{ - ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, - StreamExecutorResult, -}; +use crate::executor::prelude::*; /// `FilterExecutor` filters data with the `expr`. The `expr` takes a chunk of data, /// and returns a boolean array on whether each item should be retained. And then, diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 685b2e65e0831..02ad790e53983 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -14,23 +14,17 @@ use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::Arc; -use futures::{stream, StreamExt}; -use futures_async_stream::try_stream; +use futures::stream; use itertools::Itertools; -use risingwave_common::array::StreamChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; -use risingwave_common::catalog::Schema; use risingwave_common::hash::{HashKey, PrecomputedBuildHasher}; -use risingwave_common::types::ScalarImpl; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common_estimate_size::collections::EstimatedHashMap; use risingwave_common_estimate_size::EstimateSize; use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; -use risingwave_storage::StateStore; use super::agg_common::{AggExecutorArgs, HashAggExecutorExtraArgs}; use super::aggregation::{ @@ -38,18 +32,10 @@ use super::aggregation::{ OnlyOutputIfHasInput, }; use super::sort_buffer::SortBuffer; -use super::{ - expect_first_barrier, ActorContextRef, Executor, ExecutorInfo, StreamExecutorResult, Watermark, -}; use crate::cache::{cache_may_stale, new_with_hasher, ManagedLruCache}; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTable; -use crate::common::StreamChunkBuilder; -use crate::error::StreamResult; use crate::executor::aggregation::AggGroup as GenericAggGroup; -use crate::executor::error::StreamExecutorError; -use crate::executor::{BoxedMessageStream, Execute, Message}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; type AggGroup = GenericAggGroup; type BoxedAggGroup = Box>; diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index b3f6b8102b4f4..ea0253c9cbbc1 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -13,40 +13,27 @@ // limitations under the License. use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; use std::time::Duration; -use await_tree::InstrumentAwait; -use futures::{pin_mut, Stream, StreamExt}; -use futures_async_stream::try_stream; use itertools::Itertools; use multimap::MultiMap; -use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::array::{Op, RowRef}; use risingwave_common::hash::{HashKey, NullBitmap}; -use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{DataType, DefaultOrd, ToOwnedDatum}; +use risingwave_common::types::{DefaultOrd, ToOwnedDatum}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::ExprError; -use risingwave_storage::StateStore; use tokio::time::Instant; use self::builder::JoinChunkBuilder; use super::barrier_align::*; -use super::error::{StreamExecutorError, StreamExecutorResult}; use super::join::hash_join::*; use super::join::row::JoinRow; use super::join::{JoinTypePrimitive, SideTypePrimitive, *}; -use super::monitor::StreamingMetrics; use super::watermark::*; -use super::{ - ActorContextRef, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message, Watermark, -}; -use crate::common::table::state_table::StateTable; -use crate::executor::expect_first_barrier_from_aligned_stream; use crate::executor::join::builder::JoinStreamChunkBuilder; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// Evict the cache every n rows. const EVICT_EVERY_N_ROWS: u32 = 16; diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 801f3daa9f53a..28408ad046a4e 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -14,17 +14,13 @@ use std::num::NonZeroUsize; -use futures::StreamExt; -use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::types::Interval; use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::ExprError; -use super::error::StreamExecutorError; -use super::{ActorContextRef, Execute, Executor, Message}; -use crate::common::StreamChunkBuilder; +use crate::executor::prelude::*; pub struct HopWindowExecutor { _ctx: ActorContextRef, diff --git a/src/stream/src/executor/lookup.rs b/src/stream/src/executor/lookup.rs index 2c1de3170a801..8d62d27e37f06 100644 --- a/src/stream/src/executor/lookup.rs +++ b/src/stream/src/executor/lookup.rs @@ -14,10 +14,6 @@ use async_trait::async_trait; use futures::StreamExt; -use risingwave_common::types::DataType; -use risingwave_storage::StateStore; - -use crate::executor::{Barrier, BoxedMessageStream, Execute}; mod cache; mod sides; @@ -27,7 +23,7 @@ mod impl_; pub use impl_::LookupExecutorParams; -use super::{ActorContextRef, Executor}; +use crate::executor::prelude::*; #[cfg(test)] mod tests; diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index 610551352f371..287f11c2af608 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::array::Op; +use risingwave_common::row::RowExt; use risingwave_common_estimate_size::collections::{EstimatedHashSet, EstimatedVec}; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::consistency::consistency_panic; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; pub type LookupEntryState = EstimatedHashSet; diff --git a/src/stream/src/executor/lookup/impl_.rs b/src/stream/src/executor/lookup/impl_.rs index 20873a8c7338f..8ce6688540e3f 100644 --- a/src/stream/src/executor/lookup/impl_.rs +++ b/src/stream/src/executor/lookup/impl_.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::{pin_mut, StreamExt}; -use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::RowRef; -use risingwave_common::catalog::{ColumnDesc, Schema}; -use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::catalog::ColumnDesc; +use risingwave_common::row::RowExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::ColumnOrder; @@ -26,18 +23,15 @@ use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableIter; -use risingwave_storage::StateStore; use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch}; use crate::cache::cache_may_stale; use crate::common::metrics::MetricsInfo; -use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; use crate::executor::join::builder::JoinStreamChunkBuilder; use crate::executor::lookup::cache::LookupCache; use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide}; use crate::executor::lookup::LookupExecutor; -use crate::executor::{ActorContextRef, Barrier, Executor, ExecutorInfo, Message}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// Parameters for [`LookupExecutor`]. pub struct LookupExecutorParams { diff --git a/src/stream/src/executor/lookup_union.rs b/src/stream/src/executor/lookup_union.rs index 1a8cff3ba3aba..2189a821fab11 100644 --- a/src/stream/src/executor/lookup_union.rs +++ b/src/stream/src/executor/lookup_union.rs @@ -16,11 +16,9 @@ use async_trait::async_trait; use futures::channel::mpsc; use futures::future::{join_all, select, Either}; use futures::{FutureExt, SinkExt, StreamExt}; -use futures_async_stream::try_stream; use itertools::Itertools; -use super::error::StreamExecutorError; -use super::{Barrier, BoxedMessageStream, Execute, Executor, Message}; +use crate::executor::prelude::*; /// Merges data from multiple inputs with order. If `order = [2, 1, 0]`, then /// it will first pipe data from the third input; after the third input gets a barrier, it will then diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 44e5f9c93f910..0f27386ffc836 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -18,16 +18,14 @@ use std::task::{Context, Poll}; use anyhow::Context as _; use futures::stream::{FusedStream, FuturesUnordered, StreamFuture}; -use futures::{pin_mut, Stream, StreamExt}; -use futures_async_stream::try_stream; +use futures::StreamExt; use tokio::time::Instant; -use super::error::StreamExecutorError; use super::exchange::input::BoxedInput; use super::watermark::*; use super::*; use crate::executor::exchange::input::new_input; -use crate::executor::monitor::StreamingMetrics; +use crate::executor::prelude::*; use crate::executor::utils::ActorInputMetrics; use crate::task::{FragmentId, SharedContext}; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ab53b02ef7ce9..0748fac0a6569 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod prelude; + use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; @@ -137,7 +139,6 @@ use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; pub use simple_agg::SimpleAggExecutor; pub use sink::SinkExecutor; pub use sort::*; -pub use source::*; pub use stateless_simple_agg::StatelessSimpleAggExecutor; pub use subscription::SubscriptionExecutor; pub use temporal_join::*; diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 3215fdd0fc9c9..29a3e131fd2f9 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -17,17 +17,15 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::ops::{Deref, Index}; -use std::sync::Arc; use bytes::Bytes; -use futures::{stream, StreamExt}; -use futures_async_stream::try_stream; +use futures::stream; use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Schema, TableId}; +use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, TableId}; use risingwave_common::row::{CompactedRow, OwnedRow, RowDeserializer}; -use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl}; +use risingwave_common::types::DefaultOrd; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; use risingwave_common::util::sort_util::ColumnOrder; @@ -35,18 +33,12 @@ use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer}; use risingwave_pb::catalog::Table; use risingwave_storage::mem_table::KeyOp; use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; -use risingwave_storage::StateStore; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTableInner; -use crate::executor::error::StreamExecutorError; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ - expect_first_barrier, ActorContext, ActorContextRef, AddMutation, BoxedMessageStream, Execute, - Executor, Message, Mutation, StreamExecutorResult, UpdateMutation, -}; -use crate::task::{ActorId, AtomicU64Ref}; +use crate::executor::prelude::*; +use crate::executor::{AddMutation, UpdateMutation}; /// `MaterializeExecutor` materializes changes in stream into a materialized view on storage. pub struct MaterializeExecutor { @@ -790,9 +782,9 @@ mod tests { use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; + use super::*; use crate::executor::test_utils::prelude::StateTable; use crate::executor::test_utils::*; - use crate::executor::*; #[tokio::test] async fn test_materialize_executor() { diff --git a/src/stream/src/executor/no_op.rs b/src/stream/src/executor/no_op.rs index ac12bf99d5d7f..d3957e0b02765 100644 --- a/src/stream/src/executor/no_op.rs +++ b/src/stream/src/executor/no_op.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::{ActorContextRef, BoxedMessageStream, Execute, Executor}; +use crate::executor::prelude::*; /// No-op executor directly forwards the input stream. Currently used to break the multiple edges in /// the fragment graph. diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 755e48f325965..43316fef71094 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -15,19 +15,12 @@ use std::ops::Bound; use std::ops::Bound::Unbounded; -use futures::{pin_mut, StreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::row::{self, OwnedRow}; -use risingwave_common::types::{DataType, Datum}; -use risingwave_storage::StateStore; +use risingwave_common::array::Op; +use risingwave_common::row; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; -use super::{ - Barrier, BoxedMessageStream, Execute, Message, Mutation, StreamExecutorError, Watermark, -}; -use crate::common::table::state_table::StateTable; +use crate::executor::prelude::*; pub struct NowExecutor { data_types: Vec, diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 16fe77cb64ebc..1691f0f0c1b60 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -15,13 +15,10 @@ use std::marker::PhantomData; use std::ops::Bound; -use futures::StreamExt; -use futures_async_stream::{for_await, try_stream}; use itertools::Itertools; use risingwave_common::array::stream_record::Record; -use risingwave_common::array::{ArrayRef, Op, StreamChunk}; -use risingwave_common::catalog::Schema; -use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::array::{ArrayRef, Op}; +use risingwave_common::row::RowExt; use risingwave_common::types::{ToDatumRef, ToOwnedDatum}; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded}; @@ -33,16 +30,10 @@ use risingwave_expr::window_function::{ create_window_state, StateEvictHint, StateKey, WindowFuncCall, WindowStates, }; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::StateStore; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTable; -use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, - StreamExecutorError, StreamExecutorResult, -}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; struct Partition { states: WindowStates, diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index a7245c57f368c..7cb29c39f7e37 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -15,18 +15,14 @@ use std::collections::{btree_map, BTreeMap, HashSet}; use std::marker::PhantomData; use std::ops::RangeInclusive; -use std::sync::Arc; use delta_btree_map::{Change, PositionType}; -use futures::StreamExt; -use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::stream_record::Record; -use risingwave_common::array::{Op, RowRef, StreamChunk}; -use risingwave_common::catalog::Schema; -use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::array::Op; +use risingwave_common::row::RowExt; use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; -use risingwave_common::types::{DataType, DefaultOrdered}; +use risingwave_common::types::DefaultOrdered; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded}; use risingwave_common::util::sort_util::OrderType; @@ -43,14 +39,9 @@ use super::over_partition::{ use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; -use crate::common::StreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; use crate::executor::over_window::over_partition::AffectedRange; -use crate::executor::{ - expect_first_barrier, ActorContextRef, Execute, Executor, Message, StreamExecutorError, - StreamExecutorResult, -}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// [`OverWindowExecutor`] consumes retractable input stream and produces window function outputs. /// One [`OverWindowExecutor`] can handle one combination of partition key and order key. diff --git a/src/stream/src/executor/prelude.rs b/src/stream/src/executor/prelude.rs new file mode 100644 index 0000000000000..446efa2705810 --- /dev/null +++ b/src/stream/src/executor/prelude.rs @@ -0,0 +1,40 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(unused_imports)] + +pub use std::fmt::{Debug, Formatter}; +pub use std::pin::pin; +pub use std::sync::Arc; + +pub use await_tree::InstrumentAwait; +pub use futures::{pin_mut, Stream, StreamExt}; +pub use futures_async_stream::{for_await, try_stream}; +pub use risingwave_common::array::{RowRef, StreamChunk, StreamChunkBuilder}; +pub use risingwave_common::catalog::Schema; +pub use risingwave_common::row::{OwnedRow, Row}; +pub use risingwave_common::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; +pub use risingwave_storage::StateStore; + +pub use crate::common::table::state_table::StateTable; +pub use crate::error::StreamResult; +pub use crate::executor::actor::{ActorContext, ActorContextRef}; +pub use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; +pub use crate::executor::monitor::streaming_stats::StreamingMetrics; +pub use crate::executor::{ + expect_first_barrier, expect_first_barrier_from_aligned_stream, Barrier, BoxedMessageStream, + Execute, Executor, ExecutorInfo, Message, MessageStream, MessageStreamItem, Mutation, + PkDataTypes, PkIndices, PkIndicesRef, Watermark, +}; +pub use crate::task::{ActorId, AtomicU64Ref}; diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 12b8dff2d7742..e78238585c9fa 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -12,21 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Formatter}; - -use futures::StreamExt; -use futures_async_stream::try_stream; use multimap::MultiMap; -use risingwave_common::array::StreamChunk; -use risingwave_common::row::{Row, RowExt}; -use risingwave_common::types::{ScalarImpl, ToOwnedDatum}; +use risingwave_common::row::RowExt; +use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::NonStrictExpression; -use super::{ - ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError, - StreamExecutorResult, Watermark, -}; +use crate::executor::prelude::*; /// `ProjectExecutor` project data with the `expr`. The `expr` takes a chunk of data, /// and returns a new data chunk. And then, `ProjectExecutor` will insert, delete diff --git a/src/stream/src/executor/project_set.rs b/src/stream/src/executor/project_set.rs index dc2a3ea3e15e7..43a2d65cbfb11 100644 --- a/src/stream/src/executor/project_set.rs +++ b/src/stream/src/executor/project_set.rs @@ -12,23 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Formatter}; - use either::Either; -use futures::StreamExt; -use futures_async_stream::try_stream; use multimap::MultiMap; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::bail; -use risingwave_common::row::{Row, RowExt}; -use risingwave_common::types::{DataType, Datum, DatumRef, ToOwnedDatum}; +use risingwave_common::row::RowExt; +use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::{LogReport, NonStrictExpression}; use risingwave_expr::table_function::ProjectSetSelectItem; -use super::error::StreamExecutorError; -use super::{ActorContextRef, Execute, Executor, Message, StreamExecutorResult, Watermark}; -use crate::common::StreamChunkBuilder; +use crate::executor::prelude::*; const PROJ_ROW_ID_OFFSET: usize = 1; diff --git a/src/stream/src/executor/rearranged_chain.rs b/src/stream/src/executor/rearranged_chain.rs index 0f96c07a1e95c..19ebfeabc2988 100644 --- a/src/stream/src/executor/rearranged_chain.rs +++ b/src/stream/src/executor/rearranged_chain.rs @@ -12,17 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::pin; - use futures::channel::{mpsc, oneshot}; +use futures::stream; use futures::stream::select_with_strategy; -use futures::{stream, StreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::StreamChunk; -use super::error::StreamExecutorError; -use super::{expect_first_barrier, Barrier, Execute, Executor, Message, MessageStream}; -use crate::task::{ActorId, CreateMviewProgress}; +use crate::executor::prelude::*; +use crate::task::CreateMviewProgress; /// `ChainExecutor` is an executor that enables synchronization between the existing stream and /// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 436ee8768cb80..3966b25febf5d 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -11,21 +11,17 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; use anyhow::Context; -use futures::StreamExt; -use futures_async_stream::try_stream; use itertools::Itertools; use tokio::time::Instant; use super::exchange::input::BoxedInput; -use super::ActorContextRef; use crate::executor::exchange::input::new_input; -use crate::executor::monitor::StreamingMetrics; +use crate::executor::prelude::*; use crate::executor::utils::ActorInputMetrics; -use crate::executor::{expect_first_barrier, BoxedMessageStream, Execute, Message}; use crate::task::{FragmentId, SharedContext}; + /// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel, /// there should be a `ReceiverExecutor` running in the background, so as to push /// messages down to the executors. @@ -80,7 +76,6 @@ impl ReceiverExecutor { pub fn for_test(input: super::exchange::permit::Receiver) -> Self { use super::exchange::input::LocalInput; use crate::executor::exchange::input::Input; - use crate::executor::ActorContext; Self::new( ActorContext::for_test(114), diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 12de3e0b88a36..c779092a307fb 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -12,20 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; -use futures_async_stream::try_stream; use risingwave_common::array::stream_chunk::Ops; -use risingwave_common::array::{ - Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder, StreamChunk, -}; +use risingwave_common::array::{Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder}; use risingwave_common::buffer::Bitmap; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::types::Serial; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::row_id::RowIdGenerator; -use super::{expect_first_barrier, ActorContextRef, Execute, Executor}; -use crate::executor::{Message, StreamExecutorError}; +use crate::executor::prelude::*; /// [`RowIdGenExecutor`] generates row id for data, where the user has not specified a pk. pub struct RowIdGenExecutor { diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 890754f33462f..9c35627857828 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -12,26 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; -use futures_async_stream::try_stream; -use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::Schema; +use std::collections::HashMap; + +use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::{build_retractable, AggCall, BoxedAggregateFunction}; use risingwave_pb::stream_plan::PbAggNodeVersion; -use risingwave_storage::StateStore; use super::agg_common::{AggExecutorArgs, SimpleAggExecutorExtraArgs}; use super::aggregation::{ agg_call_filter_res, iter_table_storage, AggStateStorage, AlwaysOutput, DistinctDeduplicater, }; -use super::*; -use crate::common::table::state_table::StateTable; -use crate::error::StreamResult; use crate::executor::aggregation::AggGroup; -use crate::executor::error::StreamExecutorError; -use crate::executor::{BoxedMessageStream, Message}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// `SimpleAggExecutor` is the aggregation operator for streaming system. /// To create an aggregation operator, states and expressions should be passed along the @@ -306,9 +299,9 @@ mod tests { use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::StateStore; + use super::*; use crate::executor::test_utils::agg_executor::new_boxed_simple_agg_executor; use crate::executor::test_utils::*; - use crate::executor::*; #[tokio::test] async fn test_simple_aggregation_in_memory() { diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index f552f8ba8e184..858fe39bd1b8e 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -16,14 +16,12 @@ use std::mem; use anyhow::anyhow; use futures::stream::select; -use futures::{pin_mut, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::{FutureExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use risingwave_common::array::stream_chunk::StreamChunkMut; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; +use risingwave_common::array::Op; +use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; -use risingwave_common::types::DataType; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ @@ -34,13 +32,8 @@ use risingwave_connector::sink::{ }; use thiserror_ext::AsReport; -use super::error::{StreamExecutorError, StreamExecutorResult}; -use super::{Execute, Executor, ExecutorInfo, Message, PkIndices}; use crate::common::compact_chunk::{merge_chunk_row, StreamChunkCompactor}; -use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, MessageStream, Mutation, -}; -use crate::task::ActorId; +use crate::executor::prelude::*; pub struct SinkExecutor { actor_context: ActorContextRef, diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index dc38d4dcfbeba..37d08c746bba8 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -12,19 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; -use futures_async_stream::try_stream; use risingwave_common::array::Op; -use risingwave_common::catalog::Schema; -use risingwave_storage::StateStore; use super::sort_buffer::SortBuffer; -use super::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, - StreamExecutorError, Watermark, -}; -use crate::common::table::state_table::StateTable; -use crate::common::StreamChunkBuilder; +use crate::executor::prelude::*; pub struct SortExecutor { input: Executor, diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index c555fb036d94c..6399b2bc930ce 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -12,20 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::Bound; -use std::sync::Arc; use either::Either; -use futures::stream::{self, StreamExt}; -use futures::{pin_mut, TryStreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::StreamChunk; +use futures::{stream, TryStreamExt}; use risingwave_common::catalog::{ColumnId, TableId}; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{ScalarRef, ScalarRefImpl}; +use risingwave_common::types::ScalarRef; use risingwave_connector::source::filesystem::opendal_source::{ OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; @@ -35,17 +29,14 @@ use risingwave_connector::source::{ BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitImpl, SplitMetaData, }; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::StateStore; use thiserror_ext::AsReport; -use super::{get_split_offset_col_idx, SourceStateTableHandler}; -use crate::executor::source_executor::apply_rate_limit; -use crate::executor::stream_reader::StreamReaderWithPause; -use crate::executor::{ - expect_first_barrier, get_split_offset_mapping_from_chunk, prune_additional_cols, - ActorContextRef, BoxedMessageStream, Execute, Executor, Message, Mutation, StreamExecutorError, - StreamExecutorResult, StreamSourceCore, +use super::{get_split_offset_col_idx, SourceStateTableHandler, StreamSourceCore}; +use crate::executor::prelude::*; +use crate::executor::source::{ + apply_rate_limit, get_split_offset_mapping_from_chunk, prune_additional_cols, }; +use crate::executor::stream_reader::StreamReaderWithPause; const SPLIT_BATCH_SIZE: usize = 1000; diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 36b7783950232..77e93e086c8fd 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -15,32 +15,32 @@ #![deprecated = "will be replaced by new fs source (list + fetch)"] #![expect(deprecated)] -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; +use std::collections::{HashMap, HashSet}; use anyhow::anyhow; use either::Either; -use futures::{StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::TryStreamExt; +use itertools::Itertools; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; +use risingwave_common::util::epoch::EpochPair; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::reader::desc::{FsSourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, }; -use risingwave_storage::StateStore; use tokio::sync::mpsc::UnboundedReceiver; use tokio::time::Instant; use super::executor_core::StreamSourceCore; -use crate::error::StreamResult; -use crate::executor::error::StreamExecutorError; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::source_executor::apply_rate_limit; +use crate::executor::prelude::*; +use crate::executor::source::{ + apply_rate_limit, barrier_to_message_stream, get_split_offset_col_idx, + get_split_offset_mapping_from_chunk, prune_additional_cols, +}; use crate::executor::stream_reader::StreamReaderWithPause; -use crate::executor::*; +use crate::executor::{AddMutation, UpdateMutation}; /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to /// some latencies in network and cost in meta. diff --git a/src/stream/src/executor/source/list_executor.rs b/src/stream/src/executor/source/list_executor.rs index 6d5fde8ee86bb..a5c2be3cb10b6 100644 --- a/src/stream/src/executor/source/list_executor.rs +++ b/src/stream/src/executor/source/list_executor.rs @@ -12,25 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Formatter; -use std::sync::Arc; - use anyhow::anyhow; use either::Either; -use futures::{StreamExt, TryStreamExt}; +use futures::TryStreamExt; use futures_async_stream::try_stream; use risingwave_common::array::Op; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::SourceCtrlOpts; -use risingwave_storage::StateStore; use thiserror_ext::AsReport; use tokio::sync::mpsc::UnboundedReceiver; -use crate::executor::error::StreamExecutorError; -use crate::executor::monitor::StreamingMetrics; +use super::{barrier_to_message_stream, StreamSourceCore}; +use crate::executor::prelude::*; use crate::executor::stream_reader::StreamReaderWithPause; -use crate::executor::*; const CHUNK_SIZE: usize = 1024; diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index eae70cfe657a5..5777926a6a34f 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -12,14 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod executor_core; use std::collections::HashMap; use await_tree::InstrumentAwait; -pub use executor_core::StreamSourceCore; -mod fs_source_executor; -#[expect(deprecated)] -pub use fs_source_executor::*; use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::bail; @@ -28,18 +23,27 @@ use risingwave_connector::source::{SourceColumnDesc, SplitId}; use risingwave_pb::plan_common::additional_column::ColumnType; use risingwave_pb::plan_common::AdditionalColumn; pub use state_table_handler::*; -pub mod fetch_executor; + +mod executor_core; +pub use executor_core::StreamSourceCore; + +mod fs_source_executor; +#[expect(deprecated)] +pub use fs_source_executor::*; +mod source_executor; +pub use source_executor::*; +mod source_backfill_executor; +pub use source_backfill_executor::*; +mod list_executor; +pub use list_executor::*; +mod fetch_executor; pub use fetch_executor::*; -pub mod source_backfill_executor; -pub mod source_backfill_state_table; +mod source_backfill_state_table; pub use source_backfill_state_table::BackfillStateTableHandler; -pub mod source_executor; -pub mod list_executor; pub mod state_table_handler; use futures_async_stream::try_stream; -pub use list_executor::*; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::error::StreamExecutorError; diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 7b3c594cab5af..f399fdb8d5a55 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -14,35 +14,32 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; -use std::fmt::Formatter; +use std::collections::{HashMap, HashSet}; use std::time::Instant; use anyhow::anyhow; use either::Either; use futures::stream::{select_with_strategy, PollNext}; -use futures::StreamExt; -use futures_async_stream::try_stream; +use itertools::Itertools; use prometheus::IntCounter; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; -use risingwave_common::row::Row; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::JsonbVal; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, + BoxChunkSourceStream, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData, }; -use risingwave_storage::StateStore; use serde::{Deserialize, Serialize}; -use source_backfill_executor::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; use thiserror_ext::AsReport; use super::executor_core::StreamSourceCore; use super::source_backfill_state_table::BackfillStateTableHandler; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::source_executor::apply_rate_limit; -use crate::executor::*; +use crate::executor::prelude::*; +use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES; +use crate::executor::source::{apply_rate_limit, get_split_offset_col_idx}; +use crate::executor::{AddMutation, UpdateMutation}; #[derive(Clone, Debug, Deserialize, Serialize)] pub enum BackfillState { diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index cf54166afd396..a752fa234cad5 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -12,37 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Formatter; +use std::collections::HashMap; use std::num::NonZeroU32; use std::str::FromStr; use std::time::Duration; use anyhow::anyhow; use either::Either; -use futures::{StreamExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::TryStreamExt; use governor::clock::MonotonicClock; use governor::{Quota, RateLimiter}; +use itertools::Itertools; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; +use risingwave_common::util::epoch::{Epoch, EpochPair}; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::cdc::jni_source; use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder}; use risingwave_connector::source::{ - BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData, + BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, + SplitMetaData, }; use risingwave_hummock_sdk::HummockReadEpoch; -use risingwave_storage::StateStore; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::time::Instant; use super::executor_core::StreamSourceCore; -use crate::executor::monitor::StreamingMetrics; +use crate::executor::prelude::*; +use crate::executor::source::{ + barrier_to_message_stream, get_split_offset_col_idx, get_split_offset_mapping_from_chunk, + prune_additional_cols, +}; use crate::executor::stream_reader::StreamReaderWithPause; -use crate::executor::*; +use crate::executor::{AddMutation, UpdateMutation}; /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to /// some latencies in network and cost in meta. @@ -769,6 +774,7 @@ impl WaitEpochWorker { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::time::Duration; use futures::StreamExt; @@ -788,6 +794,7 @@ mod tests { use tracing_test::traced_test; use super::*; + use crate::executor::source::{default_source_internal_table, SourceStateTableHandler}; use crate::executor::ActorContext; const MOCK_SOURCE_NAME: &str = "mock_source"; diff --git a/src/stream/src/executor/stateless_simple_agg.rs b/src/stream/src/executor/stateless_simple_agg.rs index c6bca07df8cad..1360f4166008e 100644 --- a/src/stream/src/executor/stateless_simple_agg.rs +++ b/src/stream/src/executor/stateless_simple_agg.rs @@ -12,19 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; -use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::{ build_retractable, AggCall, AggregateState, BoxedAggregateFunction, }; use super::aggregation::agg_call_filter_res; -use super::error::StreamExecutorError; -use super::*; -use crate::error::StreamResult; +use crate::executor::prelude::*; pub struct StatelessSimpleAggExecutor { _ctx: ActorContextRef, diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 6bbf76d0bf85c..7fbf43a5794b0 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -137,7 +137,8 @@ mod tests { use tokio::sync::mpsc; use super::*; - use crate::executor::{barrier_to_message_stream, Barrier, StreamExecutorError}; + use crate::executor::source::barrier_to_message_stream; + use crate::executor::{Barrier, StreamExecutorError}; const TEST_TRANSACTION_ID1: TxnId = 0; const TEST_TRANSACTION_ID2: TxnId = 1; diff --git a/src/stream/src/executor/subscription.rs b/src/stream/src/executor/subscription.rs index cd752dfdf28cd..4bacb2dc75f05 100644 --- a/src/stream/src/executor/subscription.rs +++ b/src/stream/src/executor/subscription.rs @@ -16,19 +16,14 @@ use core::str::FromStr; use core::time::Duration; use std::collections::HashMap; -use futures::prelude::stream::StreamExt; -use futures_async_stream::try_stream; use risingwave_common::types::{Interval, Timestamptz}; use risingwave_common::util::epoch::Epoch; use risingwave_storage::store::LocalStateStore; use tokio::time::Instant; -use super::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, - StreamExecutorError, StreamExecutorResult, -}; use crate::common::log_store_impl::kv_log_store::ReaderTruncationOffsetType; use crate::common::log_store_impl::subscription_log_store::SubscriptionLogStoreWriter; +use crate::executor::prelude::*; const EXECUTE_GC_INTERVAL: u64 = 3600; const MAX_RETENTION_DAYS: i32 = 365; diff --git a/src/stream/src/executor/subtask.rs b/src/stream/src/executor/subtask.rs index fea9644b151f3..45e02932d1908 100644 --- a/src/stream/src/executor/subtask.rs +++ b/src/stream/src/executor/subtask.rs @@ -12,16 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use await_tree::InstrumentAwait; -use futures::{Future, StreamExt}; +use futures::Future; use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::error::SendError; use tokio_stream::wrappers::ReceiverStream; use super::actor::spawn_blocking_drop_stream; -use super::{Execute, Executor, Message, MessageStreamItem}; -use crate::task::ActorId; +use crate::executor::prelude::*; /// Handle used to drive the subtask. pub type SubtaskHandle = impl Future + Send + 'static; diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index f365706317a90..b744fb245a3b4 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -15,21 +15,17 @@ use std::alloc::Global; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::pin::pin; -use std::sync::Arc; use either::Either; use futures::stream::{self, PollNext}; -use futures::{pin_mut, StreamExt, TryStreamExt}; -use futures_async_stream::{for_await, try_stream}; +use futures::TryStreamExt; use itertools::Itertools; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::hash::{HashKey, NullBitmap}; -use risingwave_common::row::{OwnedRow, Row, RowExt}; -use risingwave_common::types::DataType; +use risingwave_common::row::RowExt; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common_estimate_size::{EstimateSize, KvSize}; use risingwave_expr::expr::NonStrictExpression; @@ -37,20 +33,12 @@ use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableIter; -use risingwave_storage::StateStore; use super::join::{JoinType, JoinTypePrimitive}; -use super::{ - Barrier, Execute, ExecutorInfo, Message, MessageStream, StreamExecutorError, - StreamExecutorResult, -}; use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache}; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTable; use crate::executor::join::builder::JoinStreamChunkBuilder; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ActorContextRef, Executor, Watermark}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; pub struct TemporalJoinExecutor< K: HashKey, diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 4f8e3da165994..3a8a618e3c836 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -13,28 +13,21 @@ // limitations under the License. use std::ops::{Deref, DerefMut}; -use std::sync::Arc; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::Schema; use risingwave_common::hash::HashKey; use risingwave_common::row::RowExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_storage::StateStore; use super::top_n_cache::TopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache}; use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTable; -use crate::error::StreamResult; -use crate::executor::error::StreamExecutorResult; -use crate::executor::{ActorContextRef, Executor, PkIndices, Watermark}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; pub type GroupTopNExecutor = TopNExecutorWrapper>; diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 346d73fed0196..cbd3f8215c87b 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -12,28 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::Schema; use risingwave_common::hash::HashKey; use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_storage::StateStore; use super::group_top_n::GroupTopNCache; use super::top_n_cache::AppendOnlyTopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache}; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTable; -use crate::error::StreamResult; -use crate::executor::error::StreamExecutorResult; -use crate::executor::{ActorContextRef, Executor, PkIndices, Watermark}; -use crate::task::AtomicU64Ref; +use crate::executor::prelude::*; /// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need /// to keep all the rows seen. As long as a record diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index c88d9af3c0f7f..c99b911b951ef 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -12,20 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::Schema; +use risingwave_common::array::Op; use risingwave_common::row::{RowDeserializer, RowExt}; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_storage::StateStore; use super::top_n_cache::AppendOnlyTopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache, NO_GROUP_KEY}; -use crate::common::table::state_table::StateTable; -use crate::error::StreamResult; -use crate::executor::error::StreamExecutorResult; -use crate::executor::{ActorContextRef, Executor, PkIndices, Watermark}; +use crate::executor::prelude::*; /// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need /// to keep all the rows seen. As long as a record diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 5a70efc3f3f8e..30aee860a8fc8 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -12,19 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::catalog::Schema; +use risingwave_common::array::Op; use risingwave_common::row::RowExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_storage::StateStore; use super::utils::*; use super::{ManagedTopNState, TopNCache, TopNCacheTrait}; -use crate::common::table::state_table::StateTable; -use crate::error::StreamResult; -use crate::executor::error::StreamExecutorResult; -use crate::executor::{ActorContextRef, Executor, PkIndices, Watermark}; +use crate::executor::prelude::*; /// `TopNExecutor` works with input with modification, it keeps all the data /// records/rows that have been seen, and returns topN records overall. diff --git a/src/stream/src/executor/top_n/utils.rs b/src/stream/src/executor/top_n/utils.rs index bbd956cde2168..4700a8900221e 100644 --- a/src/stream/src/executor/top_n/utils.rs +++ b/src/stream/src/executor/top_n/utils.rs @@ -13,26 +13,18 @@ // limitations under the License. use std::future::Future; -use std::sync::Arc; -use futures::StreamExt; -use futures_async_stream::try_stream; use itertools::Itertools; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::Op; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::Schema; -use risingwave_common::row::{CompactedRow, Row, RowDeserializer}; +use risingwave_common::row::{CompactedRow, RowDeserializer}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::ColumnOrder; use super::CacheKey; -use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; -use crate::executor::{ - expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message, - Watermark, -}; +use crate::executor::prelude::*; pub trait TopNExecutorBase: Send + 'static { /// Apply the chunk to the dirty state and get the diffs. diff --git a/src/stream/src/executor/troublemaker.rs b/src/stream/src/executor/troublemaker.rs index 72a257f331450..7b93392fef419 100644 --- a/src/stream/src/executor/troublemaker.rs +++ b/src/stream/src/executor/troublemaker.rs @@ -12,20 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; -use futures_async_stream::try_stream; use rand::Rng; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::stream_record::{Record, RecordType}; use risingwave_common::array::Op; use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; -use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use smallvec::SmallVec; -use super::{BoxedMessageStream, Execute, Executor, Message, StreamExecutorError}; use crate::consistency::insane; +use crate::executor::prelude::*; /// [`TroublemakerExecutor`] is used to make some trouble in the stream graph. Specifically, /// it is attached to `StreamScan` and `Source` executors in **insane mode**. It randomly diff --git a/src/stream/src/executor/union.rs b/src/stream/src/executor/union.rs index 5881474d90197..ac8f3581dda18 100644 --- a/src/stream/src/executor/union.rs +++ b/src/stream/src/executor/union.rs @@ -17,14 +17,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures::stream::{FusedStream, FuturesUnordered}; -use futures::{Stream, StreamExt}; -use futures_async_stream::try_stream; use pin_project::pin_project; use super::watermark::BufferedWatermarks; -use super::{ - Barrier, BoxedMessageStream, Execute, Executor, Message, MessageStreamItem, StreamExecutorError, -}; +use crate::executor::prelude::*; /// `UnionExecutor` merges data from multiple inputs. pub struct UnionExecutor { diff --git a/src/stream/src/executor/utils.rs b/src/stream/src/executor/utils.rs index 18b64633a3e12..565866e0d9e95 100644 --- a/src/stream/src/executor/utils.rs +++ b/src/stream/src/executor/utils.rs @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::StreamExt; use risingwave_common::metrics::LabelGuardedIntCounter; -use crate::executor::monitor::StreamingMetrics; -use crate::executor::{BoxedMessageStream, Execute}; -use crate::task::{ActorId, FragmentId}; +use crate::executor::prelude::*; +use crate::task::FragmentId; #[derive(Default)] pub struct DummyExecutor; diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index 79fdf890cc7e7..dfa5579d66b49 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -14,17 +14,13 @@ use std::vec; -use await_tree::InstrumentAwait; -use futures::StreamExt; -use futures_async_stream::try_stream; -use risingwave_common::array::{DataChunk, Op, StreamChunk}; -use risingwave_common::catalog::Schema; +use risingwave_common::array::{DataChunk, Op}; use risingwave_common::ensure; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::expr::NonStrictExpression; use tokio::sync::mpsc::UnboundedReceiver; -use super::{ActorContextRef, Barrier, BoxedMessageStream, Execute, Message, StreamExecutorError}; +use crate::executor::prelude::*; use crate::task::CreateMviewProgress; const DEFAULT_CHUNK_SIZE: usize = 1024; diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index fc2f922371155..813ccbef28920 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -14,14 +14,10 @@ use std::cmp; use std::ops::Deref; -use std::sync::Arc; use futures::future::{try_join, try_join_all}; -use futures::StreamExt; -use futures_async_stream::try_stream; use risingwave_common::hash::VnodeBitmapExt; -use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl}; +use risingwave_common::types::{DefaultOrd, ScalarImpl}; use risingwave_common::{bail, row}; use risingwave_expr::expr::{ build_func_non_strict, ExpressionBoxExt, InputRefExpression, LiteralExpression, @@ -32,13 +28,9 @@ use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::expr::expr_node::Type; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableDistribution; -use risingwave_storage::StateStore; -use super::error::StreamExecutorError; use super::filter::FilterExecutor; -use super::{ActorContextRef, Execute, Executor, Message, StreamExecutorResult}; -use crate::common::table::state_table::StateTable; -use crate::executor::{expect_first_barrier, Watermark}; +use crate::executor::prelude::*; use crate::task::ActorEvalErrorReport; /// The executor will generate a `Watermark` after each chunk. diff --git a/src/stream/src/executor/wrapper.rs b/src/stream/src/executor/wrapper.rs index 74923928eaf6d..3da1a9f0127d3 100644 --- a/src/stream/src/executor/wrapper.rs +++ b/src/stream/src/executor/wrapper.rs @@ -12,11 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use futures::StreamExt; - -use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, ExecutorInfo, MessageStream}; +use crate::executor::prelude::*; mod epoch_check; mod epoch_provide; diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index e07670fb97a6d..04baf944b552f 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -24,9 +24,8 @@ use risingwave_pb::stream_plan::StreamFsFetchNode; use risingwave_storage::StateStore; use crate::error::StreamResult; -use crate::executor::{ - Execute, Executor, FsFetchExecutor, SourceStateTableHandler, StreamSourceCore, -}; +use crate::executor::source::{FsFetchExecutor, SourceStateTableHandler, StreamSourceCore}; +use crate::executor::{Execute, Executor}; use crate::from_proto::ExecutorBuilder; use crate::task::ExecutorParams; diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 3610c8a95caf0..ac305e53d386a 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -35,9 +35,9 @@ use risingwave_storage::panic_store::PanicStateStore; use tokio::sync::mpsc::unbounded_channel; use super::*; -use crate::executor::source::{FsListExecutor, StreamSourceCore}; -use crate::executor::source_executor::SourceExecutor; -use crate::executor::state_table_handler::SourceStateTableHandler; +use crate::executor::source::{ + FsListExecutor, SourceExecutor, SourceStateTableHandler, StreamSourceCore, +}; use crate::executor::TroublemakerExecutor; const FS_CONNECTORS: &[&str] = &["s3"]; @@ -213,7 +213,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { if is_fs_connector { #[expect(deprecated)] - crate::executor::FsSourceExecutor::new( + crate::executor::source::FsSourceExecutor::new( params.actor_context.clone(), stream_source_core, params.executor_stats, diff --git a/src/stream/src/from_proto/source_backfill.rs b/src/stream/src/from_proto/source_backfill.rs index eadb949217070..12b159b6fdd7c 100644 --- a/src/stream/src/from_proto/source_backfill.rs +++ b/src/stream/src/from_proto/source_backfill.rs @@ -17,12 +17,10 @@ use risingwave_connector::source::SourceCtrlOpts; use risingwave_pb::stream_plan::SourceBackfillNode; use super::*; -use crate::executor::source::StreamSourceCore; -use crate::executor::source_backfill_executor::{ - SourceBackfillExecutor, SourceBackfillExecutorInner, +use crate::executor::source::{ + BackfillStateTableHandler, SourceBackfillExecutor, SourceBackfillExecutorInner, + SourceStateTableHandler, StreamSourceCore, }; -use crate::executor::state_table_handler::SourceStateTableHandler; -use crate::executor::BackfillStateTableHandler; pub struct SourceBackfillExecutorBuilder;