Skip to content

Commit

Permalink
all_state_tables_mut
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Feb 23, 2023
1 parent 9b61a7c commit 37cb6e0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 54 deletions.
35 changes: 20 additions & 15 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ struct ExecutorInner<S: StateStore> {
extreme_cache_size: usize,
}

impl<S: StateStore> ExecutorInner<S> {
fn all_state_tables_mut(&mut self) -> impl Iterator<Item = &mut StateTable<S>> {
iter_table_storage(&mut self.storages)
.chain(self.distinct_dedup_tables.values_mut())
.chain(std::iter::once(&mut self.result_table))
}

fn all_state_tables_except_result_mut(&mut self) -> impl Iterator<Item = &mut StateTable<S>> {
iter_table_storage(&mut self.storages).chain(self.distinct_dedup_tables.values_mut())
}
}

struct ExecutionVars<S: StateStore> {
/// The single [`AggGroup`].
agg_group: AggGroup<S>,
Expand Down Expand Up @@ -215,9 +227,8 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {

// Commit all state tables except for result table.
futures::future::try_join_all(
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.map(|state_table| state_table.commit(epoch)),
this.all_state_tables_except_result_mut()
.map(|table| table.commit(epoch)),
)
.await?;

Expand Down Expand Up @@ -263,12 +274,9 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
} else {
// No state is changed.
// Call commit on state table to increment the epoch.
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.for_each(|state_table| {
state_table.commit_no_data_expected(epoch);
});
this.result_table.commit_no_data_expected(epoch);
this.all_state_tables_mut().for_each(|table| {
table.commit_no_data_expected(epoch);
});
Ok(None)
}
}
Expand All @@ -282,12 +290,9 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {

let mut input = input.execute();
let barrier = expect_first_barrier(&mut input).await?;
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.for_each(|state_table| {
state_table.init_epoch(barrier.epoch);
});
this.result_table.init_epoch(barrier.epoch);
this.all_state_tables_mut().for_each(|table| {
table.init_epoch(barrier.epoch);
});

let mut vars = ExecutionVars {
// Create `AggGroup`. This will fetch previous agg result from the result table.
Expand Down
66 changes: 27 additions & 39 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ struct ExecutorInner<K: HashKey, S: StateStore> {
metrics: Arc<StreamingMetrics>,
}

impl<K: HashKey, S: StateStore> ExecutorInner<K, S> {
fn all_state_tables_mut(&mut self) -> impl Iterator<Item = &mut StateTable<S>> {
iter_table_storage(&mut self.storages)
.chain(self.distinct_dedup_tables.values_mut())
.chain(std::iter::once(&mut self.result_table))
}
}

struct ExecutionVars<K: HashKey, S: StateStore> {
stats: ExecutionStats,

Expand Down Expand Up @@ -484,39 +492,25 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
vars.distinct_dedup.flush(&mut this.distinct_dedup_tables)?;

// Commit all state tables.
futures::future::try_join_all(
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.map(|state_table| async {
if let Some(watermark) = state_clean_watermark.as_ref() {
state_table.update_watermark(watermark.clone())
};
state_table.commit(epoch).await
}),
)
futures::future::try_join_all(this.all_state_tables_mut().map(|table| async {
if let Some(watermark) = state_clean_watermark.as_ref() {
table.update_watermark(watermark.clone())
};
table.commit(epoch).await
}))
.await?;
if let Some(watermark) = state_clean_watermark.as_ref() {
this.result_table.update_watermark(watermark.clone());
};
this.result_table.commit(epoch).await?;

// Evict cache to target capacity.
vars.agg_group_cache.evict();
} else {
// Nothing to flush.
// Call commit on state table to increment the epoch.
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.for_each(|state_table| {
if let Some(watermark) = state_clean_watermark.as_ref() {
state_table.update_watermark(watermark.clone())
};
state_table.commit_no_data_expected(epoch);
});
if let Some(watermark) = state_clean_watermark.as_ref() {
this.result_table.update_watermark(watermark.clone());
};
this.result_table.commit_no_data_expected(epoch);
this.all_state_tables_mut().for_each(|table| {
if let Some(watermark) = state_clean_watermark.as_ref() {
table.update_watermark(watermark.clone())
};
table.commit_no_data_expected(epoch);
});
return Ok(());
}
}
Expand Down Expand Up @@ -551,12 +545,9 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
// First barrier
let mut input = input.execute();
let barrier = expect_first_barrier(&mut input).await?;
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.for_each(|state_table| {
state_table.init_epoch(barrier.epoch);
});
this.result_table.init_epoch(barrier.epoch);
this.all_state_tables_mut().for_each(|table| {
table.init_epoch(barrier.epoch);
});
vars.agg_group_cache.update_epoch(barrier.epoch.curr);
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
Expand Down Expand Up @@ -593,13 +584,10 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) {
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.for_each(|state_table| {
let _ = state_table.update_vnode_bitmap(vnode_bitmap.clone());
});
let previous_vnode_bitmap =
this.result_table.update_vnode_bitmap(vnode_bitmap.clone());
let previous_vnode_bitmap = this.result_table.vnodes().clone();
this.all_state_tables_mut().for_each(|table| {
let _ = table.update_vnode_bitmap(vnode_bitmap.clone());
});

// Manipulate the cache if necessary.
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
Expand Down

0 comments on commit 37cb6e0

Please sign in to comment.