Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: watermark filter use commited epoch to read global watermark #17724

Merged
merged 13 commits into from
Jul 19, 2024
100 changes: 61 additions & 39 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl<S: StateStore> Execute for WatermarkFilterExecutor<S> {
self.execute_inner().boxed()
}
}
const UPDATE_GLIOBAL_WATERMARK_FREQUENCY_WHEN_IDLE: usize = 10;

impl<S: StateStore> WatermarkFilterExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
Expand All @@ -99,13 +100,18 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
let mut input = input.execute();

let first_barrier = expect_first_barrier(&mut input).await?;
let prev_epoch = first_barrier.epoch.prev;
table.init_epoch(first_barrier.epoch);
// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);

// Initiate and yield the first watermark.
let mut current_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table).await?;
let mut current_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::Committed(prev_epoch),
)
.await?;

let mut last_checkpoint_watermark = None;

Expand All @@ -119,7 +125,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

// If the input is idle
let mut idle_input = true;

let mut barrier_num_during_idle = 0;
#[for_await]
for msg in input {
let msg = msg?;
Expand Down Expand Up @@ -208,6 +214,9 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
}
}
Message::Barrier(barrier) => {
let prev_epoch = barrier.epoch.prev;
let is_checkpoint = barrier.kind.is_checkpoint();
let mut need_update_global_max_watermark = false;
// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) {
let other_vnodes_bitmap = Arc::new(
Expand All @@ -220,15 +229,11 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

// Take the global max watermark when scaling happens.
if previous_vnode_bitmap != vnode_bitmap {
current_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table)
.await?;
need_update_global_max_watermark = true;
}
}

if barrier.kind.is_checkpoint()
&& last_checkpoint_watermark != current_watermark
{
if is_checkpoint && last_checkpoint_watermark != current_watermark {
last_checkpoint_watermark.clone_from(&current_watermark);
// Persist the watermark when checkpoint arrives.
if let Some(watermark) = current_watermark.clone() {
Expand All @@ -242,39 +247,57 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
}

table.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);

if need_update_global_max_watermark {
current_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::Committed(prev_epoch),
Copy link
Member

@BugenZhao BugenZhao Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess NoWait is still okay here, cuz we're sure prev_epoch of the Pause checkpoint is committed here.

Command::Pause(reason) => {
if let PausedReason::ConfigChange = reason {
// After the `Pause` barrier is collected and committed, we must ensure that the
// storage version with this epoch is synced to all compute nodes before the
// execution of the next command of `Update`, as some newly created operators
// may immediately initialize their states on that barrier.
self.wait_epoch_commit(self.prev_epoch.value().0).await?;
}
}

By NoWait on update.prev_epoch (i.e. pause.curr_epoch), we're essentially read on the exactly same snapshot as pause.prev_epoch because we ensure that during the Pause barrier no data is written.

)
.await?;
}

if barrier.kind.is_checkpoint() {
if is_checkpoint {
if idle_input {
// Align watermark
let global_max_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table)
.await?;

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
&& let Some(watermark) = current_watermark.clone()
{
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
barrier_num_during_idle += 1;

if barrier_num_during_idle == UPDATE_GLIOBAL_WATERMARK_FREQUENCY_WHEN_IDLE {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it, the issue will still occur when "relace the table", as it cannot be guaranteed that all actors have been dropped by the time execution reaches here. After adding this logic, it will wait for 10 barriers before proceeding to this point, thereby avoiding the issue.

barrier_num_during_idle = 0;
// Align watermark
// NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch
let global_max_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::NoWait(prev_epoch),
)
.await?;

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
&& let Some(watermark) = current_watermark.clone()
{
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
}
}
} else {
idle_input = true;
barrier_num_during_idle = 0;
}
}

yield Message::Barrier(barrier);
}
}
}
Expand All @@ -301,8 +324,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
async fn get_global_max_watermark(
table: &StateTable<S>,
global_watermark_table: &StorageTable<S>,
wait_epoch: HummockReadEpoch,
) -> StreamExecutorResult<Option<ScalarImpl>> {
let epoch = table.epoch();
let handle_watermark_row = |watermark_row: Option<OwnedRow>| match watermark_row {
Some(row) => {
if row.len() == 1 {
Expand All @@ -319,9 +342,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
.iter_vnodes()
.map(|vnode| async move {
let pk = row::once(vnode.to_datum());
let watermark_row: Option<OwnedRow> = global_watermark_table
.get_row(pk, HummockReadEpoch::NoWait(epoch))
.await?;
let watermark_row: Option<OwnedRow> =
global_watermark_table.get_row(pk, wait_epoch).await?;
handle_watermark_row(watermark_row)
});
let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move {
Expand Down
Loading