Skip to content

Commit

Permalink
chore(stream): rename ActorContext creation methods (#14750)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Jan 23, 2024
1 parent 2e16c26 commit c75db2f
Show file tree
Hide file tree
Showing 33 changed files with 58 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let pk_indices = vec![0];

let (mut tx, source) = MockSource::channel(schema.clone(), pk_indices.clone());
let _actor_ctx = ActorContext::create(0x3a3a3a);
let _actor_ctx = ActorContext::for_test(0x3a3a3a);

// mock upstream offset (start from "1.binlog, pos=0") for ingested chunks
let mock_offset_executor =
Expand Down Expand Up @@ -230,7 +230,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
identity: "CdcBackfillExecutor".to_string(),
};
let cdc_backfill = CdcBackfillExecutor::new(
ActorContext::create(actor_id),
ActorContext::for_test(actor_id),
info,
external_table,
Box::new(mock_offset_executor),
Expand Down
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async fn test_table_materialize() -> StreamResult<()> {
let barrier_tx = Arc::new(barrier_tx);
let vnodes = Bitmap::from_bytes(&[0b11111111]);

let actor_ctx = ActorContext::create(0x3f3f3f);
let actor_ctx = ActorContext::for_test(0x3f3f3f);
let system_params_manager = LocalSystemParamsManager::for_test();

// Create a `SourceExecutor` to read the changes.
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct ActorContext {
pub type ActorContextRef = Arc<ActorContext>;

impl ActorContext {
pub fn create(id: ActorId) -> ActorContextRef {
pub fn for_test(id: ActorId) -> ActorContextRef {
Arc::new(Self {
id,
fragment_id: 0,
Expand All @@ -73,7 +73,7 @@ impl ActorContext {
})
}

pub fn create_with_metrics(
pub fn create(
stream_actor: &PbStreamActor,
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ mod tests {
&agg_calls,
Arc::new(AtomicU64::new(0)),
&dedup_tables,
ActorContext::create(0),
ActorContext::for_test(0),
);

// --- chunk 1 ---
Expand Down Expand Up @@ -483,7 +483,7 @@ mod tests {
&agg_calls,
Arc::new(AtomicU64::new(0)),
&dedup_tables,
ActorContext::create(0),
ActorContext::for_test(0),
);

// --- chunk 3 ---
Expand Down Expand Up @@ -572,7 +572,7 @@ mod tests {
&agg_calls,
Arc::new(AtomicU64::new(0)),
&dedup_tables,
ActorContext::create(0),
ActorContext::for_test(0),
);

let chunk = StreamChunk::from_pretty(
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/barrier_recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl BarrierRecvExecutor {

pub fn for_test(barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
Self::new(
ActorContext::create(0),
ActorContext::for_test(0),
ExecutorInfo {
schema: Schema::empty().clone(),
pk_indices: PkIndices::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ mod tests {
Box::new(input),
state_table,
info,
ActorContext::create(123),
ActorContext::for_test(123),
Arc::new(AtomicU64::new(0)),
Arc::new(StreamingMetrics::unused()),
))
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ mod tests {
};

let executor = DynamicFilterExecutor::<MemoryStateStore, false>::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source_l),
Box::new(source_r),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ mod tests {
let test_expr = build_from_pretty("(greater_than:boolean $0:int8 $1:int8)");

let filter = Box::new(FilterExecutor::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source),
test_expr,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ mod tests {
};

let executor = HashJoinExecutor::<Key64, MemoryStateStore, T>::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source_l),
Box::new(source_r),
Expand Down Expand Up @@ -1494,7 +1494,7 @@ mod tests {
};

let executor = HashJoinExecutor::<Key128, MemoryStateStore, T>::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source_l),
Box::new(source_r),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ mod tests {
.unwrap();

super::HopWindowExecutor::new(
ActorContext::create(123),
ActorContext::for_test(123),
ExecutorInfo {
// TODO: the schema is incorrect, but it seems useless here.
schema,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async fn test_merger_sum_aggr() {
time_zone: String::from("UTC"),
};

let actor_ctx = ActorContext::create(0);
let actor_ctx = ActorContext::for_test(0);
// `make_actor` build an actor to do local aggregation
let make_actor = |input_rx| {
let _schema = Schema {
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/lookup/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async fn test_lookup_this_epoch() {
identity: "LookupExecutor".to_string(),
};
let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams {
ctx: ActorContext::create(0),
ctx: ActorContext::for_test(0),
info,
arrangement,
stream,
Expand Down Expand Up @@ -279,7 +279,7 @@ async fn test_lookup_last_epoch() {
identity: "LookupExecutor".to_string(),
};
let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams {
ctx: ActorContext::create(0),
ctx: ActorContext::for_test(0),
info,
arrangement,
stream,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl MergeExecutor {
use crate::executor::exchange::input::Input;

Self::new(
ActorContext::create(114),
ActorContext::for_test(114),
ExecutorInfo {
schema,
pk_indices: vec![],
Expand Down Expand Up @@ -593,7 +593,7 @@ mod tests {
.unwrap();

let merge = MergeExecutor::new(
ActorContext::create(actor_id),
ActorContext::for_test(actor_id),
ExecutorInfo {
schema,
pk_indices: vec![],
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
input,
state_table,
arrange_key_indices: arrange_columns.clone(),
actor_context: ActorContext::create(0),
actor_context: ActorContext::for_test(0),
info: ExecutorInfo {
schema,
pk_indices: arrange_columns,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod tests {
};

let project = Box::new(ProjectExecutor::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source),
vec![test_expr],
Expand Down Expand Up @@ -414,7 +414,7 @@ mod tests {
};

let project = Box::new(ProjectExecutor::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source),
vec![a_expr, b_expr, c_expr],
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl ReceiverExecutor {
use crate::executor::ActorContext;

Self::new(
ActorContext::create(114),
ActorContext::for_test(114),
ExecutorInfo {
schema: Schema::default(),
pk_indices: vec![],
Expand Down Expand Up @@ -278,7 +278,7 @@ mod tests {
};

let receiver = ReceiverExecutor::new(
ActorContext::create(actor_id),
ActorContext::for_test(actor_id),
info,
fragment_id,
upstream_fragment_id,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/row_id_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ mod tests {
let row_id_generator = Bitmap::ones(VirtualNode::COUNT);
let (mut tx, upstream) = MockSource::channel(schema.clone(), pk_indices.clone());
let row_id_gen_executor = Box::new(RowIdGenExecutor::new(
ActorContext::create(233),
ActorContext::for_test(233),
ExecutorInfo {
schema,
pk_indices,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ mod tests {
];

let simple_agg = new_boxed_simple_agg_executor(
ActorContext::create(123),
ActorContext::for_test(123),
store,
Box::new(source),
false,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ mod test {
};

let sink_executor = SinkExecutor::new(
ActorContext::create(0),
ActorContext::for_test(0),
info,
Box::new(mock),
SinkWriterParam::for_test(),
Expand Down Expand Up @@ -637,7 +637,7 @@ mod test {
};

let sink_executor = SinkExecutor::new(
ActorContext::create(0),
ActorContext::for_test(0),
info,
Box::new(mock),
SinkWriterParam::for_test(),
Expand Down Expand Up @@ -761,7 +761,7 @@ mod test {
};

let sink_executor = SinkExecutor::new(
ActorContext::create(0),
ActorContext::for_test(0),
info,
Box::new(mock),
SinkWriterParam::for_test(),
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ mod tests {

let (tx, source) = MockSource::channel(input_schema, input_pk_indices);
let sort_executor = SortExecutor::new(SortExecutorArgs {
actor_ctx: ActorContext::create(123),
actor_ctx: ActorContext::for_test(123),
info: ExecutorInfo {
schema: source.schema().clone(),
pk_indices: source.pk_indices().to_vec(),
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ mod tests {
let system_params_manager = LocalSystemParamsManager::for_test();

let executor = SourceExecutor::new(
ActorContext::create(0),
ActorContext::for_test(0),
ExecutorInfo {
schema,
pk_indices,
Expand Down Expand Up @@ -843,7 +843,7 @@ mod tests {
let system_params_manager = LocalSystemParamsManager::for_test();

let executor = SourceExecutor::new(
ActorContext::create(0),
ActorContext::for_test(0),
ExecutorInfo {
schema,
pk_indices,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/stateless_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod tests {

let simple_agg = Box::new(
StatelessSimpleAggExecutor::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source),
agg_calls,
Expand Down Expand Up @@ -228,7 +228,7 @@ mod tests {

let simple_agg = Box::new(
StatelessSimpleAggExecutor::new(
ActorContext::create(123),
ActorContext::for_test(123),
info,
Box::new(source),
agg_calls,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ pub mod agg_executor {
version: PbAggNodeVersion::Max,

input,
actor_ctx: ActorContext::create(123),
actor_ctx: ActorContext::for_test(123),
info,

extreme_cache_size,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ mod tests {
let top_n_executor = Box::new(
GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source as Box<dyn Executor>,
ActorContext::create(0),
ActorContext::for_test(0),
info,
storage_key(),
(0, 2),
Expand Down Expand Up @@ -498,7 +498,7 @@ mod tests {
let top_n_executor = Box::new(
GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source as Box<dyn Executor>,
ActorContext::create(0),
ActorContext::for_test(0),
info,
storage_key(),
(1, 2),
Expand Down Expand Up @@ -593,7 +593,7 @@ mod tests {
let top_n_executor = Box::new(
GroupTopNExecutor::<SerializedKey, MemoryStateStore, false>::new(
source as Box<dyn Executor>,
ActorContext::create(0),
ActorContext::for_test(0),
info,
storage_key(),
(0, 2),
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ mod tests {
let top_n_executor = Box::new(
AppendOnlyTopNExecutor::<_, false>::new(
source as Box<dyn Executor>,
ActorContext::create(0),
ActorContext::for_test(0),
info,
storage_key,
(0, 5),
Expand Down Expand Up @@ -346,7 +346,7 @@ mod tests {
let top_n_executor = Box::new(
AppendOnlyTopNExecutor::<_, false>::new(
source as Box<dyn Executor>,
ActorContext::create(0),
ActorContext::for_test(0),
info,
storage_key(),
(3, 4),
Expand Down
Loading

0 comments on commit c75db2f

Please sign in to comment.