Skip to content

Commit

Permalink
fix: watermark filter use commited epoch to read global watermark (#1…
Browse files Browse the repository at this point in the history
…7724) (#17761)

Co-authored-by: stonepage <[email protected]>
  • Loading branch information
github-actions[bot] and st1page authored Jul 23, 2024
1 parent 1bc6890 commit c0d6adc
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 39 deletions.
2 changes: 1 addition & 1 deletion e2e_test/streaming/watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ statement ok
insert into t values ('2023-05-06 16:56:01', 1);

skipif in-memory
sleep 10s
sleep 20s

skipif in-memory
query TI
Expand Down
100 changes: 62 additions & 38 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl<S: StateStore> Execute for WatermarkFilterExecutor<S> {
self.execute_inner().boxed()
}
}
const UPDATE_GLOBAL_WATERMARK_FREQUENCY_WHEN_IDLE: usize = 5;

impl<S: StateStore> WatermarkFilterExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
Expand All @@ -99,13 +100,18 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
let mut input = input.execute();

let first_barrier = expect_first_barrier(&mut input).await?;
let prev_epoch = first_barrier.epoch.prev;
table.init_epoch(first_barrier.epoch);
// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);

// Initiate and yield the first watermark.
let mut current_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table).await?;
let mut current_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::Committed(prev_epoch),
)
.await?;

let mut last_checkpoint_watermark = None;

Expand All @@ -119,7 +125,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

// If the input is idle
let mut idle_input = true;

let mut barrier_num_during_idle = 0;
#[for_await]
for msg in input {
let msg = msg?;
Expand Down Expand Up @@ -208,6 +214,9 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
}
}
Message::Barrier(barrier) => {
let prev_epoch = barrier.epoch.prev;
let is_checkpoint = barrier.kind.is_checkpoint();
let mut need_update_global_max_watermark = false;
// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) {
let other_vnodes_bitmap = Arc::new(
Expand All @@ -220,15 +229,11 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

// Take the global max watermark when scaling happens.
if previous_vnode_bitmap != vnode_bitmap {
current_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table)
.await?;
need_update_global_max_watermark = true;
}
}

if barrier.kind.is_checkpoint()
&& last_checkpoint_watermark != current_watermark
{
if is_checkpoint && last_checkpoint_watermark != current_watermark {
last_checkpoint_watermark.clone_from(&current_watermark);
// Persist the watermark when checkpoint arrives.
if let Some(watermark) = current_watermark.clone() {
Expand All @@ -242,39 +247,59 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
}

table.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);

if need_update_global_max_watermark {
current_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::Committed(prev_epoch),
)
.await?;
}

if barrier.kind.is_checkpoint() {
if is_checkpoint {
if idle_input {
// Align watermark
let global_max_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table)
.await?;

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
&& let Some(watermark) = current_watermark.clone()
barrier_num_during_idle += 1;

if barrier_num_during_idle
== UPDATE_GLOBAL_WATERMARK_FREQUENCY_WHEN_IDLE
{
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
barrier_num_during_idle = 0;
// Align watermark
// NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch
let global_max_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::NoWait(prev_epoch),
)
.await?;

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
&& let Some(watermark) = current_watermark.clone()
{
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
}
}
} else {
idle_input = true;
barrier_num_during_idle = 0;
}
}

yield Message::Barrier(barrier);
}
}
}
Expand All @@ -301,8 +326,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
async fn get_global_max_watermark(
table: &StateTable<S>,
global_watermark_table: &StorageTable<S>,
wait_epoch: HummockReadEpoch,
) -> StreamExecutorResult<Option<ScalarImpl>> {
let epoch = table.epoch();
let handle_watermark_row = |watermark_row: Option<OwnedRow>| match watermark_row {
Some(row) => {
if row.len() == 1 {
Expand All @@ -319,9 +344,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
.iter_vnodes()
.map(|vnode| async move {
let pk = row::once(vnode.to_datum());
let watermark_row: Option<OwnedRow> = global_watermark_table
.get_row(pk, HummockReadEpoch::NoWait(epoch))
.await?;
let watermark_row: Option<OwnedRow> =
global_watermark_table.get_row(pk, wait_epoch).await?;
handle_watermark_row(watermark_row)
});
let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move {
Expand Down

0 comments on commit c0d6adc

Please sign in to comment.