Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): refactor trait Executor to get rid of info() #15167

Merged
merged 18 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 53 additions & 64 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,22 @@ use risingwave_stream::error::StreamResult;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::test_utils::MockSource;
use risingwave_stream::executor::{
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedExecutor as StreamBoxedExecutor,
BoxedMessageStream, CdcBackfillExecutor, Executor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, PkIndices, PkIndicesRef, StreamExecutorError,
expect_first_barrier, ActorContext, AddMutation, Barrier, BoxedMessageStream,
CdcBackfillExecutor, Execute, Executor as StreamExecutor, ExecutorInfo, ExternalStorageTable,
MaterializeExecutor, Message, Mutation, StreamExecutorError,
};

// mock upstream binlog offset starting from "1.binlog, pos=0"
pub struct MockOffsetGenExecutor {
upstream: Option<StreamBoxedExecutor>,

schema: Schema,

pk_indices: PkIndices,

identity: String,
upstream: Option<StreamExecutor>,

start_offset: u32,
}

impl MockOffsetGenExecutor {
pub fn new(upstream: StreamBoxedExecutor, schema: Schema, pk_indices: PkIndices) -> Self {
pub fn new(upstream: StreamExecutor) -> Self {
Self {
upstream: Some(upstream),
schema,
pk_indices,
identity: "MockOffsetGenExecutor".to_string(),
start_offset: 0,
}
}
Expand Down Expand Up @@ -131,44 +122,37 @@ impl MockOffsetGenExecutor {
}
}

impl Executor for MockOffsetGenExecutor {
impl Execute for MockOffsetGenExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}

fn schema(&self) -> &Schema {
&self.schema
}

fn pk_indices(&self) -> PkIndicesRef<'_> {
&self.pk_indices
}

fn identity(&self) -> &str {
&self.identity
}
}

#[tokio::test]
async fn test_cdc_backfill() -> StreamResult<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StrikeW Please help check if this test is as expected after change, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed. I will add assert in the test later.

use risingwave_common::types::DataType;
let memory_state_store = MemoryStateStore::new();

let table_id = TableId::new(1002);
let schema = Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]);
let column_ids = vec![0.into(), 1.into()];

let pk_indices = vec![0];

let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());
let _actor_ctx = ActorContext::for_test(0x3a3a3a);
let (mut tx, source) = MockSource::channel();
let source = source.into_executor(
Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
]),
vec![0],
);

// mock upstream offset (start from "1.binlog, pos=0") for ingested chunks
let mock_offset_executor =
MockOffsetGenExecutor::new(Box::new(source), schema.clone(), pk_indices.clone());
let mock_offset_executor = StreamExecutor::new(
ExecutorInfo {
schema: Schema::new(vec![
Field::unnamed(DataType::Jsonb), // payload
Field::unnamed(DataType::Varchar), // _rw_offset
]),
pk_indices: vec![0],
identity: "MockOffsetGenExecutor".to_string(),
},
MockOffsetGenExecutor::new(source).boxed(),
);

let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
Expand All @@ -188,13 +172,15 @@ async fn test_cdc_backfill() -> StreamResult<()> {
Field::with_name(DataType::Int64, "id"), // primary key
Field::with_name(DataType::Float64, "price"),
]);
let table_pk_indices = vec![0];
let table_pk_order_types = vec![OrderType::ascending()];
let external_table = ExternalStorageTable::new(
table_id,
TableId::new(1234),
table_name,
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
table_schema.clone(),
vec![OrderType::ascending()],
pk_indices,
table_pk_order_types,
table_pk_indices.clone(),
vec![0, 1],
);

Expand Down Expand Up @@ -224,32 +210,35 @@ async fn test_cdc_backfill() -> StreamResult<()> {
vec![0_usize],
)
.await;
let info = ExecutorInfo {
schema: table_schema.clone(),
pk_indices: vec![0],
identity: "CdcBackfillExecutor".to_string(),
};
let cdc_backfill = CdcBackfillExecutor::new(
ActorContext::for_test(actor_id),
info,
external_table,
Box::new(mock_offset_executor),
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,

let cdc_backfill = StreamExecutor::new(
ExecutorInfo {
schema: table_schema.clone(),
pk_indices: table_pk_indices,
identity: "CdcBackfillExecutor".to_string(),
},
CdcBackfillExecutor::new(
ActorContext::for_test(actor_id),
external_table,
mock_offset_executor,
vec![0, 1],
None,
Arc::new(StreamingMetrics::unused()),
state_table,
4, // 4 rows in a snapshot chunk
false,
)
.boxed(),
);

// Create a `MaterializeExecutor` to write the changes to storage.
let materialize_table_id = TableId::new(5678);
let mut materialize = MaterializeExecutor::for_test(
Box::new(cdc_backfill),
cdc_backfill,
memory_state_store.clone(),
table_id,
materialize_table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids.clone(),
4,
vec![0.into(), 1.into()],
Arc::new(AtomicU64::new(0)),
ConflictBehavior::Overwrite,
)
Expand Down Expand Up @@ -354,7 +343,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// Since we have not polled `Materialize`, we cannot scan anything from this table
let table = StorageTable::for_test(
memory_state_store.clone(),
table_id,
materialize_table_id,
column_descs.clone(),
vec![OrderType::ascending()],
vec![0],
Expand Down
48 changes: 25 additions & 23 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::executor::row_id_gen::RowIdGenExecutor;
use risingwave_stream::executor::source_executor::SourceExecutor;
use risingwave_stream::executor::{
ActorContext, Barrier, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
ActorContext, Barrier, Execute, Executor, ExecutorInfo, MaterializeExecutor, Message, PkIndices,
};
use tokio::sync::mpsc::unbounded_channel;

Expand Down Expand Up @@ -162,56 +162,58 @@ async fn test_table_materialize() -> StreamResult<()> {
let system_params_manager = LocalSystemParamsManager::for_test();

// Create a `SourceExecutor` to read the changes.
let source_executor = SourceExecutor::<PanicStateStore>::new(
actor_ctx.clone(),
let source_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("SourceExecutor {:X}", 1),
},
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
SourceExecutor::<PanicStateStore>::new(
actor_ctx.clone(),
None, // There is no external stream source.
Arc::new(StreamingMetrics::unused()),
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
ConnectorParams::default(),
)
.boxed(),
);

// Create a `DmlExecutor` to accept data change from users.
let dml_executor = DmlExecutor::new(
let dml_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("DmlExecutor {:X}", 2),
},
Box::new(source_executor),
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
DmlExecutor::new(
source_executor,
dml_manager.clone(),
table_id,
INITIAL_TABLE_VERSION_ID,
column_descs.clone(),
1024,
)
.boxed(),
);

let row_id_gen_executor = RowIdGenExecutor::new(
actor_ctx,
let row_id_gen_executor = Executor::new(
ExecutorInfo {
schema: all_schema.clone(),
pk_indices: pk_indices.clone(),
identity: format!("RowIdGenExecutor {:X}", 3),
},
Box::new(dml_executor),
row_id_index,
vnodes,
RowIdGenExecutor::new(actor_ctx, dml_executor, row_id_index, vnodes).boxed(),
);

// Create a `MaterializeExecutor` to write the changes to storage.
let mut materialize = MaterializeExecutor::for_test(
Box::new(row_id_gen_executor),
row_id_gen_executor,
memory_state_store.clone(),
table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
all_column_ids.clone(),
4,
Arc::new(AtomicU64::new(0)),
ConflictBehavior::NoCheck,
)
Expand Down
11 changes: 6 additions & 5 deletions src/stream/benches/stream_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::StateStore;
use risingwave_stream::executor::test_utils::agg_executor::new_boxed_hash_agg_executor;
use risingwave_stream::executor::test_utils::*;
use risingwave_stream::executor::{BoxedExecutor, PkIndices};
use risingwave_stream::executor::{Executor, PkIndices};
use tokio::runtime::Runtime;

risingwave_expr_impl::enable!();
Expand All @@ -47,7 +47,7 @@ fn bench_hash_agg(c: &mut Criterion) {

/// This aims to mirror `q17`'s aggregator.
/// We can include more executor patterns as needed.
fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
fn setup_bench_hash_agg<S: StateStore>(store: S) -> Executor {
// ---- Define hash agg executor parameters ----
let input_data_types = vec![
// to_char(date_time)
Expand Down Expand Up @@ -119,7 +119,8 @@ fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
);

// ---- Create MockSourceExecutor ----
let (mut tx, source) = MockSource::channel(schema, PkIndices::new());
let (mut tx, source) = MockSource::channel();
let source = source.into_executor(schema, PkIndices::new());
tx.push_barrier(1, false);
for chunk in chunks {
tx.push_chunk(chunk);
Expand All @@ -134,7 +135,7 @@ fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {

block_on(new_boxed_hash_agg_executor(
store,
Box::new(source),
source,
false,
agg_calls,
row_count_index,
Expand All @@ -146,7 +147,7 @@ fn setup_bench_hash_agg<S: StateStore>(store: S) -> BoxedExecutor {
))
}

pub async fn execute_executor(executor: BoxedExecutor) {
pub async fn execute_executor(executor: Executor) {
let mut stream = executor.execute();
while let Some(ret) = stream.next().await {
_ = black_box(ret.unwrap());
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/agg_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
pub version: PbAggNodeVersion,

// basic
pub input: Box<dyn Executor>,
pub input: Executor,
pub actor_ctx: ActorContextRef,
pub info: ExecutorInfo,

Expand Down
Loading
Loading