Skip to content

Commit

Permalink
introduce struct Executor
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Feb 20, 2024
1 parent 501672a commit f4bd48d
Show file tree
Hide file tree
Showing 95 changed files with 703 additions and 981 deletions.
4 changes: 2 additions & 2 deletions src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_pb::stream_plan::PbAggNodeVersion;
use risingwave_storage::StateStore;

use super::aggregation::AggStateStorage;
use super::{Execute, ExecutorInfo};
use super::{Executor, ExecutorInfo};
use crate::common::table::state_table::StateTable;
use crate::executor::ActorContextRef;
use crate::task::AtomicU64Ref;
Expand All @@ -29,7 +29,7 @@ pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Execute>,
pub input: Executor,
pub actor_ctx: ActorContextRef,
pub info: ExecutorInfo,

Expand Down
16 changes: 4 additions & 12 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::executor::backfill::utils::{
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Execute, ExecutorInfo,
HashMap, Message, StreamExecutorError, StreamExecutorResult,
expect_first_barrier, Barrier, BoxedMessageStream, Execute, Executor, ExecutorInfo, HashMap,
Message, StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};

Expand All @@ -56,7 +56,7 @@ pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
upstream_table: ReplicatedStateTable<S, SD>,

/// Upstream with the same schema with the upstream table.
upstream: BoxedExecutor,
upstream: Executor,

/// Internal state table for persisting state of backfill state.
state_table: StateTable<S>,
Expand All @@ -68,8 +68,6 @@ pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {

actor_id: ActorId,

info: ExecutorInfo,

metrics: Arc<StreamingMetrics>,

chunk_size: usize,
Expand All @@ -85,9 +83,8 @@ where
#[allow(clippy::too_many_arguments)]
#[allow(dead_code)]
pub fn new(
info: ExecutorInfo,
upstream_table: ReplicatedStateTable<S, SD>,
upstream: BoxedExecutor,
upstream: Executor,
state_table: StateTable<S>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
Expand All @@ -96,7 +93,6 @@ where
rate_limit: Option<usize>,
) -> Self {
Self {
info,
upstream_table,
upstream,
state_table,
Expand Down Expand Up @@ -714,10 +710,6 @@ where
S: StateStore,
SD: ValueRowSerde,
{
fn info(&self) -> &ExecutorInfo {
&self.info
}

fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}
Expand Down
15 changes: 4 additions & 11 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use crate::executor::backfill::utils::{
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Execute,
ExecutorInfo, Message, StreamExecutorError, StreamExecutorResult,
expect_first_barrier, ActorContextRef, BoxedMessageStream, Execute, Executor, Message,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgress;

Expand All @@ -55,13 +55,12 @@ const METADATA_STATE_LEN: usize = 4;

pub struct CdcBackfillExecutor<S: StateStore> {
actor_ctx: ActorContextRef,
info: ExecutorInfo,

/// The external table to be backfilled
external_table: ExternalStorageTable,

/// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset`
upstream: BoxedExecutor,
upstream: Executor,

/// The column indices need to be forwarded to the downstream from the upstream and table scan.
/// User may select a subset of columns from the upstream table.
Expand All @@ -83,9 +82,8 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
actor_ctx: ActorContextRef,
info: ExecutorInfo,
external_table: ExternalStorageTable,
upstream: BoxedExecutor,
upstream: Executor,
output_indices: Vec<usize>,
progress: Option<CreateMviewProgress>,
metrics: Arc<StreamingMetrics>,
Expand All @@ -95,7 +93,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
) -> Self {
Self {
actor_ctx,
info,
external_table,
upstream,
output_indices,
Expand Down Expand Up @@ -610,10 +607,6 @@ fn get_rw_columns(schema: &Schema) -> Vec<SourceColumnDesc> {
}

impl<S: StateStore> Execute for CdcBackfillExecutor<S> {
fn info(&self) -> &ExecutorInfo {
&self.info
}

fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}
Expand Down
16 changes: 4 additions & 12 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use crate::executor::backfill::utils::{
};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, Barrier, BoxedExecutor, BoxedMessageStream, Execute, ExecutorInfo,
Message, Mutation, StreamExecutorError, StreamExecutorResult,
expect_first_barrier, Barrier, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message,
Mutation, StreamExecutorError, StreamExecutorResult,
};
use crate::task::{ActorId, CreateMviewProgress};

Expand Down Expand Up @@ -75,12 +75,10 @@ pub struct BackfillState {
/// in the same worker, so that we can read uncommitted data from the upstream table without
/// waiting.
pub struct BackfillExecutor<S: StateStore> {
info: ExecutorInfo,

/// Upstream table
upstream_table: StorageTable<S>,
/// Upstream with the same schema with the upstream table.
upstream: BoxedExecutor,
upstream: Executor,

/// Internal state table for persisting state of backfill state.
state_table: Option<StateTable<S>>,
Expand Down Expand Up @@ -109,9 +107,8 @@ where
{
#[allow(clippy::too_many_arguments)]
pub fn new(
info: ExecutorInfo,
upstream_table: StorageTable<S>,
upstream: BoxedExecutor,
upstream: Executor,
state_table: Option<StateTable<S>>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
Expand All @@ -121,7 +118,6 @@ where
) -> Self {
let actor_id = progress.actor_id();
Self {
info,
upstream_table,
upstream,
state_table,
Expand Down Expand Up @@ -745,10 +741,6 @@ impl<S> Execute for BackfillExecutor<S>
where
S: StateStore,
{
fn info(&self) -> &ExecutorInfo {
&self.info
}

fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}
Expand Down
22 changes: 2 additions & 20 deletions src/stream/src/executor/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,43 +26,25 @@ use super::{
/// of the streaming graph.
pub struct BarrierRecvExecutor {
_ctx: ActorContextRef,
info: ExecutorInfo,

/// The barrier receiver registered in the local barrier manager.
barrier_receiver: UnboundedReceiver<Barrier>,
}

impl BarrierRecvExecutor {
pub fn new(
ctx: ActorContextRef,
info: ExecutorInfo,
barrier_receiver: UnboundedReceiver<Barrier>,
) -> Self {
pub fn new(ctx: ActorContextRef, barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
Self {
_ctx: ctx,
info,
barrier_receiver,
}
}

pub fn for_test(barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
Self::new(
ActorContext::for_test(0),
ExecutorInfo {
schema: Schema::empty().clone(),
pk_indices: PkIndices::new(),
identity: "BarrierRecvExecutor".to_string(),
},
barrier_receiver,
)
Self::new(ActorContext::for_test(0), barrier_receiver)
}
}

impl Execute for BarrierRecvExecutor {
fn info(&self) -> &ExecutorInfo {
&self.info
}

fn execute(self: Box<Self>) -> BoxedMessageStream {
UnboundedReceiverStream::new(self.barrier_receiver)
.map(|barrier| Ok(Message::Barrier(barrier)))
Expand Down
15 changes: 6 additions & 9 deletions src/stream/src/executor/batch_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ use await_tree::InstrumentAwait;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::collect_data_chunk;
use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
use super::{Execute, ExecutorInfo, Message};
use super::{Execute, Message};
use crate::executor::BoxedMessageStream;

pub struct BatchQueryExecutor<S: StateStore> {
Expand All @@ -33,18 +34,18 @@ pub struct BatchQueryExecutor<S: StateStore> {
/// The number of tuples in one [`StreamChunk`]
batch_size: usize,

info: ExecutorInfo,
schema: Schema,
}

impl<S> BatchQueryExecutor<S>
where
S: StateStore,
{
pub fn new(table: StorageTable<S>, batch_size: usize, info: ExecutorInfo) -> Self {
pub fn new(table: StorageTable<S>, batch_size: usize, schema: Schema) -> Self {
Self {
table,
batch_size,
info,
schema,
}
}

Expand All @@ -61,7 +62,7 @@ where
pin_mut!(iter);

while let Some(data_chunk) =
collect_data_chunk(&mut iter, self.schema(), Some(self.batch_size))
collect_data_chunk(&mut iter, &self.schema, Some(self.batch_size))
.instrument_await("batch_query_executor_collect_chunk")
.await?
{
Expand All @@ -76,10 +77,6 @@ impl<S> Execute for BatchQueryExecutor<S>
where
S: StateStore,
{
fn info(&self) -> &ExecutorInfo {
&self.info
}

fn execute(self: Box<Self>) -> super::BoxedMessageStream {
unreachable!("should call `execute_with_epoch`")
}
Expand Down
18 changes: 5 additions & 13 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@ use futures::StreamExt;
use futures_async_stream::try_stream;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, BoxedExecutor, Execute, ExecutorInfo, Message};
use super::{expect_first_barrier, Execute, Executor, Message};
use crate::task::{ActorId, CreateMviewProgress};

/// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, [`ChainExecutor`] is mainly used to implement MV on MV
/// feature. It pipes new data of existing MVs to newly created MV only all of the old data in the
/// existing MVs are dispatched.
pub struct ChainExecutor {
info: ExecutorInfo,
snapshot: Executor,

snapshot: BoxedExecutor,

upstream: BoxedExecutor,
upstream: Executor,

progress: CreateMviewProgress,

Expand All @@ -40,14 +38,12 @@ pub struct ChainExecutor {

impl ChainExecutor {
pub fn new(
info: ExecutorInfo,
snapshot: BoxedExecutor,
upstream: BoxedExecutor,
snapshot: Executor,
upstream: Executor,
progress: CreateMviewProgress,
upstream_only: bool,
) -> Self {
Self {
info,
snapshot,
upstream,
actor_id: progress.actor_id(),
Expand Down Expand Up @@ -104,10 +100,6 @@ impl ChainExecutor {
}

impl Execute for ChainExecutor {
fn info(&self) -> &ExecutorInfo {
&self.info
}

fn execute(self: Box<Self>) -> super::BoxedMessageStream {
self.execute_inner().boxed()
}
Expand Down
Loading

0 comments on commit f4bd48d

Please sign in to comment.