From 80d571dac9548c04a2e162cf30a2be5c0e4872c6 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 23 Jan 2024 17:30:35 +0800 Subject: [PATCH] rename --- src/compute/tests/cdc_tests.rs | 4 ++-- src/compute/tests/integration_tests.rs | 2 +- src/stream/src/executor/actor.rs | 4 ++-- .../src/executor/aggregation/distinct.rs | 6 +++--- src/stream/src/executor/barrier_recv.rs | 2 +- .../src/executor/dedup/append_only_dedup.rs | 2 +- src/stream/src/executor/dynamic_filter.rs | 2 +- src/stream/src/executor/filter.rs | 2 +- src/stream/src/executor/hash_join.rs | 4 ++-- src/stream/src/executor/hop_window.rs | 2 +- src/stream/src/executor/integration_tests.rs | 2 +- src/stream/src/executor/lookup/tests.rs | 4 ++-- src/stream/src/executor/merge.rs | 4 ++-- src/stream/src/executor/mview/materialize.rs | 2 +- src/stream/src/executor/project.rs | 4 ++-- src/stream/src/executor/receiver.rs | 4 ++-- src/stream/src/executor/row_id_gen.rs | 2 +- src/stream/src/executor/simple_agg.rs | 2 +- src/stream/src/executor/sink.rs | 6 +++--- src/stream/src/executor/sort.rs | 2 +- .../src/executor/source/source_executor.rs | 4 ++-- .../src/executor/stateless_simple_agg.rs | 4 ++-- src/stream/src/executor/test_utils.rs | 2 +- src/stream/src/executor/top_n/group_top_n.rs | 6 +++--- .../src/executor/top_n/top_n_appendonly.rs | 4 ++-- src/stream/src/executor/top_n/top_n_plain.rs | 20 +++++++++---------- src/stream/src/executor/values.rs | 2 +- src/stream/src/executor/watermark_filter.rs | 2 +- src/stream/src/task/stream_manager.rs | 2 +- .../integration_tests/eowc_over_window.rs | 2 +- .../tests/integration_tests/hop_window.rs | 2 +- .../tests/integration_tests/over_window.rs | 2 +- .../tests/integration_tests/project_set.rs | 2 +- 33 files changed, 58 insertions(+), 58 deletions(-) diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index a2bab33a6c92f..2f1feec23cd46 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -164,7 +164,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { let pk_indices = vec![0]; let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone()); - let _actor_ctx = ActorContext::create(0x3a3a3a); + let _actor_ctx = ActorContext::for_test(0x3a3a3a); // mock upstream offset (start from "1.binlog, pos=0") for ingested chunks let mock_offset_executor = @@ -230,7 +230,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { identity: "CdcBackfillExecutor".to_string(), }; let cdc_backfill = CdcBackfillExecutor::new( - ActorContext::create(actor_id), + ActorContext::for_test(actor_id), info, external_table, Box::new(mock_offset_executor), diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 490e90d174013..967c3a888398f 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -159,7 +159,7 @@ async fn test_table_materialize() -> StreamResult<()> { let barrier_tx = Arc::new(barrier_tx); let vnodes = Bitmap::from_bytes(&[0b11111111]); - let actor_ctx = ActorContext::create(0x3f3f3f); + let actor_ctx = ActorContext::for_test(0x3f3f3f); let system_params_manager = LocalSystemParamsManager::for_test(); // Create a `SourceExecutor` to read the changes. diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index d5c6e36679457..259353dc7c1d7 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -58,7 +58,7 @@ pub struct ActorContext { pub type ActorContextRef = Arc; impl ActorContext { - pub fn create(id: ActorId) -> ActorContextRef { + pub fn for_test(id: ActorId) -> ActorContextRef { Arc::new(Self { id, fragment_id: 0, @@ -73,7 +73,7 @@ impl ActorContext { }) } - pub fn create_with_metrics( + pub fn create( stream_actor: &PbStreamActor, total_mem_val: Arc>, streaming_metrics: Arc, diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index 191c3048da7f0..5b95ee89b4b0f 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -392,7 +392,7 @@ mod tests { &agg_calls, Arc::new(AtomicU64::new(0)), &dedup_tables, - ActorContext::create(0), + ActorContext::for_test(0), ); // --- chunk 1 --- @@ -483,7 +483,7 @@ mod tests { &agg_calls, Arc::new(AtomicU64::new(0)), &dedup_tables, - ActorContext::create(0), + ActorContext::for_test(0), ); // --- chunk 3 --- @@ -572,7 +572,7 @@ mod tests { &agg_calls, Arc::new(AtomicU64::new(0)), &dedup_tables, - ActorContext::create(0), + ActorContext::for_test(0), ); let chunk = StreamChunk::from_pretty( diff --git a/src/stream/src/executor/barrier_recv.rs b/src/stream/src/executor/barrier_recv.rs index 745fee0f66881..66480ef1cb591 100644 --- a/src/stream/src/executor/barrier_recv.rs +++ b/src/stream/src/executor/barrier_recv.rs @@ -47,7 +47,7 @@ impl BarrierRecvExecutor { pub fn for_test(barrier_receiver: UnboundedReceiver) -> Self { Self::new( - ActorContext::create(0), + ActorContext::for_test(0), ExecutorInfo { schema: Schema::empty().clone(), pk_indices: PkIndices::new(), diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 8fcbc3ad894ce..b2d17e9da638f 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -263,7 +263,7 @@ mod tests { Box::new(input), state_table, info, - ActorContext::create(123), + ActorContext::for_test(123), Arc::new(AtomicU64::new(0)), Arc::new(StreamingMetrics::unused()), )) diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 1f2388f6ed4f0..14cf7192bd4db 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -576,7 +576,7 @@ mod tests { }; let executor = DynamicFilterExecutor::::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source_l), Box::new(source_r), diff --git a/src/stream/src/executor/filter.rs b/src/stream/src/executor/filter.rs index 34ec98367cc7f..e51307b2ffee0 100644 --- a/src/stream/src/executor/filter.rs +++ b/src/stream/src/executor/filter.rs @@ -229,7 +229,7 @@ mod tests { let test_expr = build_from_pretty("(greater_than:boolean $0:int8 $1:int8)"); let filter = Box::new(FilterExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source), test_expr, diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index b3ad2f2471cd6..af35e7c7b9603 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -1404,7 +1404,7 @@ mod tests { }; let executor = HashJoinExecutor::::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source_l), Box::new(source_r), @@ -1494,7 +1494,7 @@ mod tests { }; let executor = HashJoinExecutor::::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source_l), Box::new(source_r), diff --git a/src/stream/src/executor/hop_window.rs b/src/stream/src/executor/hop_window.rs index 6c6f7b99a8876..4bcca4d593072 100644 --- a/src/stream/src/executor/hop_window.rs +++ b/src/stream/src/executor/hop_window.rs @@ -292,7 +292,7 @@ mod tests { .unwrap(); super::HopWindowExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), ExecutorInfo { // TODO: the schema is incorrect, but it seems useless here. schema, diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 3ba532976b079..0759f4bd2d813 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -47,7 +47,7 @@ async fn test_merger_sum_aggr() { time_zone: String::from("UTC"), }; - let actor_ctx = ActorContext::create(0); + let actor_ctx = ActorContext::for_test(0); // `make_actor` build an actor to do local aggregation let make_actor = |input_rx| { let _schema = Schema { diff --git a/src/stream/src/executor/lookup/tests.rs b/src/stream/src/executor/lookup/tests.rs index ec07a516740fb..7d152e8a5f4e7 100644 --- a/src/stream/src/executor/lookup/tests.rs +++ b/src/stream/src/executor/lookup/tests.rs @@ -205,7 +205,7 @@ async fn test_lookup_this_epoch() { identity: "LookupExecutor".to_string(), }; let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams { - ctx: ActorContext::create(0), + ctx: ActorContext::for_test(0), info, arrangement, stream, @@ -279,7 +279,7 @@ async fn test_lookup_last_epoch() { identity: "LookupExecutor".to_string(), }; let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams { - ctx: ActorContext::create(0), + ctx: ActorContext::for_test(0), info, arrangement, stream, diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 6e6b1d621b870..145a937fcdb2e 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -86,7 +86,7 @@ impl MergeExecutor { use crate::executor::exchange::input::Input; Self::new( - ActorContext::create(114), + ActorContext::for_test(114), ExecutorInfo { schema, pk_indices: vec![], @@ -593,7 +593,7 @@ mod tests { .unwrap(); let merge = MergeExecutor::new( - ActorContext::create(actor_id), + ActorContext::for_test(actor_id), ExecutorInfo { schema, pk_indices: vec![], diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 505bd0873c02c..69e238a476f74 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -295,7 +295,7 @@ impl MaterializeExecutor { input, state_table, arrange_key_indices: arrange_columns.clone(), - actor_context: ActorContext::create(0), + actor_context: ActorContext::for_test(0), info: ExecutorInfo { schema, pk_indices: arrange_columns, diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 8cbd7e66e4897..a5e73705f8ec4 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -320,7 +320,7 @@ mod tests { }; let project = Box::new(ProjectExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source), vec![test_expr], @@ -414,7 +414,7 @@ mod tests { }; let project = Box::new(ProjectExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source), vec![a_expr, b_expr, c_expr], diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index a141f12d16e6c..273a3fcf1b339 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -94,7 +94,7 @@ impl ReceiverExecutor { use crate::executor::ActorContext; Self::new( - ActorContext::create(114), + ActorContext::for_test(114), ExecutorInfo { schema: Schema::default(), pk_indices: vec![], @@ -278,7 +278,7 @@ mod tests { }; let receiver = ReceiverExecutor::new( - ActorContext::create(actor_id), + ActorContext::for_test(actor_id), info, fragment_id, upstream_fragment_id, diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 3bd6c0aca2167..fe0ed6d908925 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -167,7 +167,7 @@ mod tests { let row_id_generator = Bitmap::ones(VirtualNode::COUNT); let (mut tx, upstream) = MockSource::channel(schema.clone(), pk_indices.clone()); let row_id_gen_executor = Box::new(RowIdGenExecutor::new( - ActorContext::create(233), + ActorContext::for_test(233), ExecutorInfo { schema, pk_indices, diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index a6f74316f27a2..f957241a402c9 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -369,7 +369,7 @@ mod tests { ]; let simple_agg = new_boxed_simple_agg_executor( - ActorContext::create(123), + ActorContext::for_test(123), store, Box::new(source), false, diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 5aca2842f5ad6..1f8de0bb4dd92 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -510,7 +510,7 @@ mod test { }; let sink_executor = SinkExecutor::new( - ActorContext::create(0), + ActorContext::for_test(0), info, Box::new(mock), SinkWriterParam::for_test(), @@ -637,7 +637,7 @@ mod test { }; let sink_executor = SinkExecutor::new( - ActorContext::create(0), + ActorContext::for_test(0), info, Box::new(mock), SinkWriterParam::for_test(), @@ -761,7 +761,7 @@ mod test { }; let sink_executor = SinkExecutor::new( - ActorContext::create(0), + ActorContext::for_test(0), info, Box::new(mock), SinkWriterParam::for_test(), diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index aa8bc8011b521..786eabdeabbf5 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -213,7 +213,7 @@ mod tests { let (tx, source) = MockSource::channel(input_schema, input_pk_indices); let sort_executor = SortExecutor::new(SortExecutorArgs { - actor_ctx: ActorContext::create(123), + actor_ctx: ActorContext::for_test(123), info: ExecutorInfo { schema: source.schema().clone(), pk_indices: source.pk_indices().to_vec(), diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index c22d334112c5f..5611b429d3259 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -749,7 +749,7 @@ mod tests { let system_params_manager = LocalSystemParamsManager::for_test(); let executor = SourceExecutor::new( - ActorContext::create(0), + ActorContext::for_test(0), ExecutorInfo { schema, pk_indices, @@ -843,7 +843,7 @@ mod tests { let system_params_manager = LocalSystemParamsManager::for_test(); let executor = SourceExecutor::new( - ActorContext::create(0), + ActorContext::for_test(0), ExecutorInfo { schema, pk_indices, diff --git a/src/stream/src/executor/stateless_simple_agg.rs b/src/stream/src/executor/stateless_simple_agg.rs index 86b40f4086cc7..8a6334b7743b5 100644 --- a/src/stream/src/executor/stateless_simple_agg.rs +++ b/src/stream/src/executor/stateless_simple_agg.rs @@ -170,7 +170,7 @@ mod tests { let simple_agg = Box::new( StatelessSimpleAggExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source), agg_calls, @@ -228,7 +228,7 @@ mod tests { let simple_agg = Box::new( StatelessSimpleAggExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source), agg_calls, diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index dbf4d2142c2b1..cc7505164d154 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -487,7 +487,7 @@ pub mod agg_executor { version: PbAggNodeVersion::Max, input, - actor_ctx: ActorContext::create(123), + actor_ctx: ActorContext::for_test(123), info, extreme_cache_size, diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index fb87f52c91a11..ed0f64d098a7a 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -396,7 +396,7 @@ mod tests { let top_n_executor = Box::new( GroupTopNExecutor::::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 2), @@ -498,7 +498,7 @@ mod tests { let top_n_executor = Box::new( GroupTopNExecutor::::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (1, 2), @@ -593,7 +593,7 @@ mod tests { let top_n_executor = Box::new( GroupTopNExecutor::::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 2), diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 0f262b043face..8da016fd8c7ac 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -259,7 +259,7 @@ mod tests { let top_n_executor = Box::new( AppendOnlyTopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key, (0, 5), @@ -346,7 +346,7 @@ mod tests { let top_n_executor = Box::new( AppendOnlyTopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (3, 4), diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index 357f1a099a795..8536d9d3273c3 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -303,7 +303,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (3, 1000), @@ -404,7 +404,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 4), @@ -517,7 +517,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::<_, true>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 4), @@ -629,7 +629,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (3, 4), @@ -861,7 +861,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (1, 3), @@ -945,7 +945,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (1, 3), @@ -1005,7 +1005,7 @@ mod tests { let top_n_executor_after_recovery = Box::new( TopNExecutor::<_, false>::new( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (1, 3), @@ -1139,7 +1139,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::new_with_ties_for_test( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 3), @@ -1295,7 +1295,7 @@ mod tests { let top_n_executor = Box::new( TopNExecutor::new_with_ties_for_test( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 3), @@ -1358,7 +1358,7 @@ mod tests { let top_n_executor_after_recovery = Box::new( TopNExecutor::new_with_ties_for_test( source as Box, - ActorContext::create(0), + ActorContext::for_test(0), info, storage_key(), (0, 3), diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index 1ce6f27ded573..2a927f1a3a780 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -213,7 +213,7 @@ mod tests { identity: "ValuesExecutor".to_string(), }; let values_executor_struct = ValuesExecutor::new( - ActorContext::create(actor_id), + ActorContext::for_test(actor_id), info, progress, vec![exprs diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index bca8f6f872dda..43a6ba3add1ef 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -487,7 +487,7 @@ mod tests { ( WatermarkFilterExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, source.boxed(), watermark_expr, diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 4d0ffa6b215ac..80f2623407fe0 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -549,7 +549,7 @@ impl LocalStreamManager { let mut ret = Vec::with_capacity(actors.len()); for actor in actors { let actor_id = actor.actor_id; - let actor_context = ActorContext::create_with_metrics( + let actor_context = ActorContext::create( &actor, self.total_mem_val.clone(), self.streaming_metrics.clone(), diff --git a/src/stream/tests/integration_tests/eowc_over_window.rs b/src/stream/tests/integration_tests/eowc_over_window.rs index b085cccb76dc1..6941d86f3a759 100644 --- a/src/stream/tests/integration_tests/eowc_over_window.rs +++ b/src/stream/tests/integration_tests/eowc_over_window.rs @@ -67,7 +67,7 @@ async fn create_executor( let (tx, source) = MockSource::channel(input_schema, input_pk_indices.clone()); let executor = EowcOverWindowExecutor::new(EowcOverWindowExecutorArgs { - actor_ctx: ActorContext::create(123), + actor_ctx: ActorContext::for_test(123), info: ExecutorInfo { schema: output_schema, pk_indices: output_pk_indices, diff --git a/src/stream/tests/integration_tests/hop_window.rs b/src/stream/tests/integration_tests/hop_window.rs index 6f60b3bd1e9fd..f0bd65c84b69e 100644 --- a/src/stream/tests/integration_tests/hop_window.rs +++ b/src/stream/tests/integration_tests/hop_window.rs @@ -46,7 +46,7 @@ fn create_executor(output_indices: Vec) -> (MessageSender, BoxedMessageSt ( tx, HopWindowExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), ExecutorInfo { schema, pk_indices, diff --git a/src/stream/tests/integration_tests/over_window.rs b/src/stream/tests/integration_tests/over_window.rs index 1b7ee0e304899..78203d79a87ac 100644 --- a/src/stream/tests/integration_tests/over_window.rs +++ b/src/stream/tests/integration_tests/over_window.rs @@ -76,7 +76,7 @@ async fn create_executor( let (tx, source) = MockSource::channel(input_schema, input_pk_indices.clone()); let executor = OverWindowExecutor::new(OverWindowExecutorArgs { - actor_ctx: ActorContext::create(123), + actor_ctx: ActorContext::for_test(123), info: ExecutorInfo { schema: output_schema, pk_indices: output_pk_indices, diff --git a/src/stream/tests/integration_tests/project_set.rs b/src/stream/tests/integration_tests/project_set.rs index 8c05bc43aa694..79fcbfc0d48db 100644 --- a/src/stream/tests/integration_tests/project_set.rs +++ b/src/stream/tests/integration_tests/project_set.rs @@ -48,7 +48,7 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) { }; let project_set = Box::new(ProjectSetExecutor::new( - ActorContext::create(123), + ActorContext::for_test(123), info, Box::new(source), vec![