Skip to content

Commit

Permalink
fix: move creation of AggGroup after yielding first barrier to preven…
Browse files Browse the repository at this point in the history
…t panic loop

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Oct 26, 2023
1 parent 2b8d09e commit 652b395
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ impl<S: StateStore> SimpleAggExecutor<S> {
table.init_epoch(barrier.epoch);
});

let mut distinct_dedup = DistinctDeduplicater::new(
&this.agg_calls,
&this.watermark_epoch,
&this.distinct_dedup_tables,
this.actor_ctx.id,
this.metrics.clone(),
);
distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
});

yield Message::Barrier(barrier);

let mut vars = ExecutionVars {
// This will fetch previous agg states from the intermediate state table.
agg_group: AggGroup::create(
Expand All @@ -277,22 +290,10 @@ impl<S: StateStore> SimpleAggExecutor<S> {
&this.input_schema,
)
.await?,
distinct_dedup: DistinctDeduplicater::new(
&this.agg_calls,
&this.watermark_epoch,
&this.distinct_dedup_tables,
this.actor_ctx.id,
this.metrics.clone(),
),
distinct_dedup,
state_changed: false,
};

vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
});

yield Message::Barrier(barrier);

#[for_await]
for msg in input {
let msg = msg?;
Expand Down

0 comments on commit 652b395

Please sign in to comment.