Skip to content

Commit

Permalink
fix(watermark): align watermark for idle input (#11554)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Aug 9, 2023
1 parent ce2a0c8 commit 55e3dde
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
current_watermark.clone(),
));

// If the input is idle
let mut idle_input = true;

#[for_await]
for msg in input {
let msg = msg?;
Expand Down Expand Up @@ -180,6 +183,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
yield Message::Chunk(output_chunk);
};

idle_input = false;
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
Expand All @@ -192,6 +196,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
let watermark = watermark.val;
if current_watermark.default_cmp(&watermark).is_lt() {
current_watermark = watermark;
idle_input = false;
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
Expand Down Expand Up @@ -229,6 +234,20 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
table.insert(row);
}
table.commit(barrier.epoch).await?;

if idle_input {
// Align watermark
let global_max_watermark =
Self::get_global_max_watermark(&table, watermark_type.clone())
.await?;
current_watermark = cmp::max_by(
current_watermark,
global_max_watermark,
DefaultOrd::default_cmp,
);
} else {
idle_input = true;
}
} else {
table.commit_no_data_expected(barrier.epoch);
}
Expand Down

0 comments on commit 55e3dde

Please sign in to comment.