Skip to content

Commit

Permalink
refactor(stream): add prelude for streaming executor modules (#16404)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Apr 19, 2024
1 parent efa1fda commit edc65a0
Show file tree
Hide file tree
Showing 67 changed files with 246 additions and 514 deletions.
1 change: 1 addition & 0 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayI
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::data::PbArray;
pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt};
pub use stream_chunk_builder::StreamChunkBuilder;
pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue};
pub use utf8_array::*;

Expand Down
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::dml::DmlExecutor;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::row_id_gen::RowIdGenExecutor;
use risingwave_stream::executor::source_executor::SourceExecutor;
use risingwave_stream::executor::source::SourceExecutor;
use risingwave_stream::executor::{
ActorContext, Barrier, Execute, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
};
Expand Down
3 changes: 1 addition & 2 deletions src/stream/src/common/log_store_impl/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,14 @@ mod tests {
use std::task::Poll;

use futures::FutureExt;
use risingwave_common::array::Op;
use risingwave_common::array::{Op, StreamChunkBuilder};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::epoch::{test_epoch, EpochPair};
use risingwave_connector::sink::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
};

use crate::common::log_store_impl::in_mem::BoundedInMemLogStoreFactory;
use crate::common::StreamChunkBuilder;

#[tokio::test]
async fn test_in_memory_log_store() {
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

pub use column_mapping::*;
pub use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;

pub mod cache;
mod column_mapping;
Expand Down
6 changes: 1 addition & 5 deletions src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ use std::collections::HashMap;

use risingwave_expr::aggregate::AggCall;
use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::aggregation::AggStateStorage;
use super::{Executor, ExecutorInfo};
use crate::common::table::state_table::StateTable;
use crate::executor::ActorContextRef;
use crate::task::AtomicU64Ref;
use crate::executor::prelude::*;

/// Arguments needed to construct an `XxxAggExecutor`.
pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
Expand Down
20 changes: 6 additions & 14 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::pin;
use std::sync::Arc;
use std::collections::HashMap;

use either::Either;
use futures::stream::{select_all, select_with_strategy};
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use futures::{stream, TryStreamExt};
use itertools::Itertools;
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bail;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::OwnedRow;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use crate::common::table::state_table::{ReplicatedStateTable, StateTable};
use crate::common::table::state_table::ReplicatedStateTable;
#[cfg(debug_assertions)]
use crate::executor::backfill::utils::METADATA_STATE_LEN;
use crate::executor::backfill::utils::{
compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk,
mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode,
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, Barrier, BoxedMessageStream, Execute, Executor, HashMap, Message,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;

type Builders = HashMap<VirtualNode, DataChunkBuilder>;

Expand Down
21 changes: 6 additions & 15 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::{pin, Pin};
use std::sync::Arc;
use std::pin::Pin;

use either::Either;
use futures::stream;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::array::DataChunk;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, ScalarRefImpl};
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::row::RowExt;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::parser::{
DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::cdc::external::CdcOffset;
use risingwave_connector::source::{SourceColumnDesc, SourceContext};
use risingwave_storage::StateStore;
use rw_futures_util::pausable;

use crate::common::table::state_table::StateTable;
use crate::executor::backfill::cdc::state::CdcBackfillState;
use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
use crate::executor::backfill::cdc::upstream_table::snapshot::{
Expand All @@ -44,11 +39,7 @@ use crate::executor::backfill::cdc::upstream_table::snapshot::{
use crate::executor::backfill::utils::{
get_cdc_chunk_last_offset, get_new_pos, mapping_chunk, mapping_message, mark_cdc_chunk,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message,
StreamExecutorError, StreamExecutorResult,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;

/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
Expand Down
20 changes: 4 additions & 16 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::pin;
use std::sync::Arc;

use either::Either;
use futures::stream;
use futures::stream::select_with_strategy;
use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::Datum;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::{bail, row};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTable;
use crate::executor::backfill::utils;
use crate::executor::backfill::utils::{
compute_bounds, construct_initial_finished_state, create_builder, create_limiter, get_new_pos,
mapping_chunk, mapping_message, mark_chunk, owned_row_iter, BackfillRateLimiter,
METADATA_STATE_LEN,
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, Barrier, BoxedMessageStream, Execute, Executor, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;

/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
/// We can decode that into `BackfillState` on recovery.
Expand Down
6 changes: 1 addition & 5 deletions src/stream/src/executor/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{
ActorContext, ActorContextRef, Barrier, BoxedMessageStream, Execute, Message,
StreamExecutorError,
};
use crate::executor::prelude::*;

/// The executor only for receiving barrier from the meta service. It always resides in the leaves
/// of the streaming graph.
Expand Down
11 changes: 2 additions & 9 deletions src/stream/src/executor/batch_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use await_tree::InstrumentAwait;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::array::Op;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::collect_data_chunk;
use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
use super::{Execute, Message};
use crate::executor::BoxedMessageStream;
use crate::executor::prelude::*;

pub struct BatchQueryExecutor<S: StateStore> {
/// The [`StorageTable`] that needs to be queried
Expand Down
8 changes: 2 additions & 6 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::StreamExt;
use futures_async_stream::try_stream;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, Execute, Executor, Message};
use crate::task::{ActorId, CreateMviewProgress};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;

/// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, [`ChainExecutor`] is mainly used to implement MV on MV
Expand Down
19 changes: 4 additions & 15 deletions src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use futures::stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::Op;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_storage::StateStore;
use risingwave_common::row::RowExt;

use super::cache::DedupCache;
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message,
StreamExecutorResult,
};
use crate::task::AtomicU64Ref;
use crate::executor::prelude::*;

/// [`AppendOnlyDedupExecutor`] drops any message that has duplicate pk columns with previous
/// messages. It only accepts append-only input, and its output will be append-only as well.
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dedup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common_estimate_size::EstimateSize;

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::task::AtomicU64Ref;
use crate::executor::prelude::*;

/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only
/// accepts a key without a value. This could be refined in the future to support k-v pairs.
Expand Down
17 changes: 6 additions & 11 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::iter::repeat_with;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use await_tree::InstrumentAwait;
use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::Op;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
use risingwave_common::metrics::LabelGuardedIntCounter;
Expand All @@ -35,11 +31,10 @@ use tokio::time::Instant;
use tracing::{event, Instrument};

use super::exchange::output::{new_output, BoxedOutput};
use super::{AddMutation, Executor, TroublemakerExecutor, UpdateMutation, Watermark};
use crate::error::StreamResult;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{Barrier, Message, Mutation, StreamConsumer};
use crate::task::{ActorId, DispatcherId, SharedContext};
use super::{AddMutation, TroublemakerExecutor, UpdateMutation};
use crate::executor::prelude::*;
use crate::executor::StreamConsumer;
use crate::task::{DispatcherId, SharedContext};

/// [`DispatchExecutor`] consumes messages and send them into downstream actors. Usually,
/// data chunks will be dispatched with some specified policy, while control message
Expand Down
8 changes: 2 additions & 6 deletions src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@ use std::collections::BTreeMap;
use std::mem;

use either::Either;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use futures::TryStreamExt;
use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId};
use risingwave_common::transaction::transaction_id::TxnId;
use risingwave_common::transaction::transaction_message::TxnMsg;
use risingwave_dml::dml_manager::DmlManagerRef;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, BoxedMessageStream, Execute, Executor, Message, Mutation};
use crate::common::StreamChunkBuilder;
use crate::executor::prelude::*;
use crate::executor::stream_reader::StreamReaderWithPause;

/// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific
Expand Down
17 changes: 5 additions & 12 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
use std::ops::Bound::{self, *};
use std::sync::Arc;

use futures::{pin_mut, stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Array, ArrayImpl, Op, StreamChunk};
use futures::stream;
use risingwave_common::array::{Array, ArrayImpl, Op};
use risingwave_common::bail;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{self, once, OwnedRow, OwnedRow as RowData, Row};
use risingwave_common::row::{self, once, OwnedRow as RowData};
use risingwave_common::types::{DataType, Datum, DefaultOrd, ScalarImpl, ToDatumRef, ToOwnedDatum};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::{
Expand All @@ -33,16 +31,11 @@ use risingwave_pb::expr::expr_node::Type::{
GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual,
};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use super::barrier_align::*;
use super::error::StreamExecutorError;
use super::monitor::StreamingMetrics;
use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message};
use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable};
use crate::common::StreamChunkBuilder;
use crate::common::table::state_table::WatermarkCacheParameterizedStateTable;
use crate::consistency::consistency_panic;
use crate::executor::expect_first_barrier_from_aligned_stream;
use crate::executor::prelude::*;
use crate::task::ActorEvalErrorReport;

pub struct DynamicFilterExecutor<S: StateStore, const USE_WATERMARK_CACHE: bool> {
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/exchange/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use super::permit::Receiver;
use crate::error::StreamResult;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::*;
use crate::executor::prelude::*;
use crate::task::{
FragmentId, LocalBarrierManager, SharedContext, UpDownActorIds, UpDownFragmentIds,
};

/// `Input` provides an interface for [`MergeExecutor`] and [`ReceiverExecutor`] to receive data
/// from upstream actors.
/// `Input` provides an interface for [`MergeExecutor`](crate::executor::MergeExecutor) and
/// [`ReceiverExecutor`](crate::executor::ReceiverExecutor) to receive data from upstream actors.
pub trait Input: MessageStream {
/// The upstream actor id.
fn actor_id(&self) -> ActorId;
Expand Down
Loading

0 comments on commit edc65a0

Please sign in to comment.