Skip to content

Commit

Permalink
refactor(stream): refactor trait Executor to get rid of info() (#…
Browse files Browse the repository at this point in the history
…15167)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Feb 26, 2024
1 parent 8c1ecae commit ead3e53
Show file tree
Hide file tree
Showing 107 changed files with 1,647 additions and 2,757 deletions.
117 changes: 53 additions & 64 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,22 @@ use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedExecutor as StreamBoxedExecutor,
BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef, StreamExecutorError,
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedMessageStream,
CdcBackfillExecutor, Execute, Executor as StreamExecutor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
pub struct MockOffsetGenExecutor {
upstream: Option<StreamBoxedExecutor>,

schema: Schema,

pk_indices: PkIndices,

identity: String,
upstream: Option<StreamExecutor>,

start_offset: u32,
}

impl MockOffsetGenExecutor {
pub fn new(upstream: StreamBoxedExecutor, schema: Schema, pk_indices: PkIndices) -> Self {
pub fn new(upstream: StreamExecutor) -> Self {
Self {
upstream: Some(upstream),
schema,
pk_indices,
identity: "MockOffsetGenExecutor".to_string(),
start_offset: 0,
}
}
Expand Down Expand Up @@ -131,44 +122,37 @@ impl MockOffsetGenExecutor {
}
}

impl Executor for MockOffsetGenExecutor {
impl Execute for MockOffsetGenExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}

fn schema(&self) -> &Schema {
&self.schema
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
&self.pk_indices
}

fn identity(&self) -> &str {
&self.identity
}
}

#[tokio::test]
async fn test_cdc_backfill() -> StreamResult<()> {
use risingwave_common::types::DataType;
let memory_state_store = MemoryStateStore::new();

let table_id = TableId::new(1002);
let schema = Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]);
let column_ids = vec![0.into(), 1.into()];

let pk_indices = vec![0];

let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());
let _actor_ctx = ActorContext::for_test(0x3a3a3a);
let (mut tx, source) = MockSource::channel();
let source = source.into_executor(
Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
]),
vec![0],
);

// mock upstream offset (start from "1.binlog, pos=0") for ingested chunks
let mock_offset_executor =
MockOffsetGenExecutor::new(Box::new(source), schema.clone(), pk_indices.clone());
let mock_offset_executor = StreamExecutor::new(
ExecutorInfo {
schema: Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]),
pk_indices: vec![0],
identity: "MockOffsetGenExecutor".to_string(),
},
MockOffsetGenExecutor::new(source).boxed(),
);

let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
Expand All @@ -188,13 +172,15 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Field::with_name(DataType::Int64, "id"), // primary key
Field::with_name(DataType::Float64, "price"),
]);
let table_pk_indices = vec![0];
let table_pk_order_types = vec![OrderType::ascending()];
let external_table = ExternalStorageTable::new(
table_id,
TableId::new(1234),
table_name,
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
table_schema.clone(),
vec![OrderType::ascending()],
pk_indices,
table_pk_order_types,
table_pk_indices.clone(),
vec![0, 1],
);

Expand Down Expand Up @@ -224,32 +210,35 @@ async fn test_cdc_backfill() -> StreamResult<()> {
vec![0_usize],
)
.await;
let info = ExecutorInfo {
schema: table_schema.clone(),
pk_indices: vec![0],
identity: "CdcBackfillExecutor".to_string(),
};
let cdc_backfill = CdcBackfillExecutor::new(
ActorContext::for_test(actor_id),
info,
external_table,
Box::new(mock_offset_executor),
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,

let cdc_backfill = StreamExecutor::new(
ExecutorInfo {
schema: table_schema.clone(),
pk_indices: table_pk_indices,
identity: "CdcBackfillExecutor".to_string(),
},
CdcBackfillExecutor::new(
ActorContext::for_test(actor_id),
external_table,
mock_offset_executor,
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
)
.boxed(),
);

// Create a `MaterializeExecutor` to write the changes to storage.
let materialize_table_id = TableId::new(5678);
let mut materialize = MaterializeExecutor::for_test(
Box::new(cdc_backfill),
cdc_backfill,
memory_state_store.clone(),
table_id,
materialize_table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids.clone(),
4,
vec![0.into(), 1.into()],
Arc::new(AtomicU64::new(0)),
ConflictBehavior::Overwrite,
)
Expand Down Expand Up @@ -354,7 +343,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// Since we have not polled `Materialize`, we cannot scan anything from this table
let table = StorageTable::for_test(
memory_state_store.clone(),
table_id,
materialize_table_id,
column_descs.clone(),
vec![OrderType::ascending()],
vec![0],
Expand Down
48 changes: 25 additions & 23 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::row_id_gen::RowIdGenExecutor;
use risingwave_stream::executor::source_executor::SourceExecutor;
use risingwave_stream::executor::{
ActorContext, Barrier, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
ActorContext, Barrier, Execute, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
};
use tokio::sync::mpsc::unbounded_channel;

Expand Down Expand Up @@ -162,56 +162,58 @@ async fn test_table_materialize() -> StreamResult<()> {
let system_params_manager = LocalSystemParamsManager::for_test();

// Create a `SourceExecutor` to read the changes.
let source_executor = SourceExecutor::<PanicStateStore>::new(
actor_ctx.clone(),
let source_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("SourceExecutor {:X}", 1),
},
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
SourceExecutor::<PanicStateStore>::new(
actor_ctx.clone(),
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
)
.boxed(),
);

// Create a `DmlExecutor` to accept data change from users.
let dml_executor = DmlExecutor::new(
let dml_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("DmlExecutor {:X}", 2),
},
Box::new(source_executor),
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
DmlExecutor::new(
source_executor,
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
)
.boxed(),
);

let row_id_gen_executor = RowIdGenExecutor::new(
actor_ctx,
let row_id_gen_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("RowIdGenExecutor {:X}", 3),
},
Box::new(dml_executor),
row_id_index,
vnodes,
RowIdGenExecutor::new(actor_ctx, dml_executor, row_id_index, vnodes).boxed(),
);

// Create a `MaterializeExecutor` to write the changes to storage.
let mut materialize = MaterializeExecutor::for_test(
Box::new(row_id_gen_executor),
row_id_gen_executor,
memory_state_store.clone(),
table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
all_column_ids.clone(),
4,
Arc::new(AtomicU64::new(0)),
ConflictBehavior::NoCheck,
)
Expand Down
11 changes: 6 additions & 5 deletions src/stream/benches/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::StateStore;
use risingwave_stream::executor::test_utils::agg_executor::new_boxed_hash_agg_executor;
use risingwave_stream::executor::test_utils::*;
use risingwave_stream::executor::{BoxedExecutor, PkIndices};
use risingwave_stream::executor::{Executor, PkIndices};
use tokio::runtime::Runtime;

risingwave_expr_impl::enable!();
Expand All @@ -47,7 +47,7 @@ fn bench_hash_agg(c: &mut Criterion) {

/// This aims to mirror `q17`'s aggregator.
/// We can include more executor patterns as needed.
fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
fn setup_bench_hash_agg<S: StateStore>(store: S) -> Executor {
// ---- Define hash agg executor parameters ----
let input_data_types = vec![
// to_char(date_time)
Expand Down Expand Up @@ -119,7 +119,8 @@ fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
);

// ---- Create MockSourceExecutor ----
let (mut tx, source) = MockSource::channel(schema, PkIndices::new());
let (mut tx, source) = MockSource::channel();
let source = source.into_executor(schema, PkIndices::new());
tx.push_barrier(1, false);
for chunk in chunks {
tx.push_chunk(chunk);
Expand All @@ -134,7 +135,7 @@ fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {

block_on(new_boxed_hash_agg_executor(
store,
Box::new(source),
source,
false,
agg_calls,
row_count_index,
Expand All @@ -146,7 +147,7 @@ fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
))
}

pub async fn execute_executor(executor: BoxedExecutor) {
pub async fn execute_executor(executor: Executor) {
let mut stream = executor.execute();
while let Some(ret) = stream.next().await {
_ = black_box(ret.unwrap());
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Executor>,
pub input: Executor,
pub actor_ctx: ActorContextRef,
pub info: ExecutorInfo,

Expand Down
Loading

0 comments on commit ead3e53

Please sign in to comment.