-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: watermark filter use commited epoch to read global watermark #17724
Merged
st1page
merged 13 commits into
main
from
sts/change_watermark_filter_to_wait_committed_epoch
Jul 19, 2024
Merged
Changes from 8 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
36935a0
fix: watermark filter use commited epoch to read global watermark
st1page 42b8936
fix
st1page c9e4851
try fix
st1page a619ac7
fix back
st1page 7fcdd8f
fix deadlock
st1page 0b858bb
try fix
st1page 107c6d7
fix
st1page a389507
rename
st1page 2a0642a
fmt
st1page 4c382d9
typo
st1page fb6b75e
fix watermark test
st1page 2564e5a
fix watermark test
st1page 3fe576e
Merge branch 'main' into sts/change_watermark_filter_to_wait_committe…
st1page File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,7 @@ impl<S: StateStore> Execute for WatermarkFilterExecutor<S> { | |
self.execute_inner().boxed() | ||
} | ||
} | ||
const UPDATE_GLIOBAL_WATERMARK_FREQUENCY_WHEN_IDLE: usize = 10; | ||
|
||
impl<S: StateStore> WatermarkFilterExecutor<S> { | ||
#[try_stream(ok = Message, error = StreamExecutorError)] | ||
|
@@ -99,13 +100,18 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
let mut input = input.execute(); | ||
|
||
let first_barrier = expect_first_barrier(&mut input).await?; | ||
let prev_epoch = first_barrier.epoch.prev; | ||
table.init_epoch(first_barrier.epoch); | ||
// The first barrier message should be propagated. | ||
yield Message::Barrier(first_barrier); | ||
|
||
// Initiate and yield the first watermark. | ||
let mut current_watermark = | ||
Self::get_global_max_watermark(&table, &global_watermark_table).await?; | ||
let mut current_watermark = Self::get_global_max_watermark( | ||
&table, | ||
&global_watermark_table, | ||
HummockReadEpoch::Committed(prev_epoch), | ||
) | ||
.await?; | ||
|
||
let mut last_checkpoint_watermark = None; | ||
|
||
|
@@ -119,7 +125,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
|
||
// If the input is idle | ||
let mut idle_input = true; | ||
|
||
let mut barrier_num_during_idle = 0; | ||
#[for_await] | ||
for msg in input { | ||
let msg = msg?; | ||
|
@@ -208,6 +214,9 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
} | ||
} | ||
Message::Barrier(barrier) => { | ||
let prev_epoch = barrier.epoch.prev; | ||
let is_checkpoint = barrier.kind.is_checkpoint(); | ||
let mut need_update_global_max_watermark = false; | ||
// Update the vnode bitmap for state tables of all agg calls if asked. | ||
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { | ||
let other_vnodes_bitmap = Arc::new( | ||
|
@@ -220,15 +229,11 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
|
||
// Take the global max watermark when scaling happens. | ||
if previous_vnode_bitmap != vnode_bitmap { | ||
current_watermark = | ||
Self::get_global_max_watermark(&table, &global_watermark_table) | ||
.await?; | ||
need_update_global_max_watermark = true; | ||
} | ||
} | ||
|
||
if barrier.kind.is_checkpoint() | ||
&& last_checkpoint_watermark != current_watermark | ||
{ | ||
if is_checkpoint && last_checkpoint_watermark != current_watermark { | ||
last_checkpoint_watermark.clone_from(¤t_watermark); | ||
// Persist the watermark when checkpoint arrives. | ||
if let Some(watermark) = current_watermark.clone() { | ||
|
@@ -242,39 +247,57 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
} | ||
|
||
table.commit(barrier.epoch).await?; | ||
yield Message::Barrier(barrier); | ||
|
||
if need_update_global_max_watermark { | ||
current_watermark = Self::get_global_max_watermark( | ||
&table, | ||
&global_watermark_table, | ||
HummockReadEpoch::Committed(prev_epoch), | ||
) | ||
.await?; | ||
} | ||
|
||
if barrier.kind.is_checkpoint() { | ||
if is_checkpoint { | ||
if idle_input { | ||
// Align watermark | ||
let global_max_watermark = | ||
Self::get_global_max_watermark(&table, &global_watermark_table) | ||
.await?; | ||
|
||
current_watermark = if let Some(global_max_watermark) = | ||
global_max_watermark.clone() | ||
&& let Some(watermark) = current_watermark.clone() | ||
{ | ||
Some(cmp::max_by( | ||
watermark, | ||
global_max_watermark, | ||
DefaultOrd::default_cmp, | ||
)) | ||
} else { | ||
current_watermark.or(global_max_watermark) | ||
}; | ||
if let Some(watermark) = current_watermark.clone() { | ||
yield Message::Watermark(Watermark::new( | ||
event_time_col_idx, | ||
watermark_type.clone(), | ||
watermark, | ||
)); | ||
barrier_num_during_idle += 1; | ||
|
||
if barrier_num_during_idle == UPDATE_GLIOBAL_WATERMARK_FREQUENCY_WHEN_IDLE { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without it, the issue will still occur when "relace the table", as it cannot be guaranteed that all actors have been dropped by the time execution reaches here. After adding this logic, it will wait for 10 barriers before proceeding to this point, thereby avoiding the issue. |
||
barrier_num_during_idle = 0; | ||
// Align watermark | ||
// NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch | ||
let global_max_watermark = Self::get_global_max_watermark( | ||
&table, | ||
&global_watermark_table, | ||
HummockReadEpoch::NoWait(prev_epoch), | ||
) | ||
.await?; | ||
|
||
current_watermark = if let Some(global_max_watermark) = | ||
global_max_watermark.clone() | ||
&& let Some(watermark) = current_watermark.clone() | ||
{ | ||
Some(cmp::max_by( | ||
watermark, | ||
global_max_watermark, | ||
DefaultOrd::default_cmp, | ||
)) | ||
} else { | ||
current_watermark.or(global_max_watermark) | ||
}; | ||
if let Some(watermark) = current_watermark.clone() { | ||
yield Message::Watermark(Watermark::new( | ||
event_time_col_idx, | ||
watermark_type.clone(), | ||
watermark, | ||
)); | ||
} | ||
} | ||
} else { | ||
idle_input = true; | ||
barrier_num_during_idle = 0; | ||
} | ||
} | ||
|
||
yield Message::Barrier(barrier); | ||
} | ||
} | ||
} | ||
|
@@ -301,8 +324,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
async fn get_global_max_watermark( | ||
table: &StateTable<S>, | ||
global_watermark_table: &StorageTable<S>, | ||
wait_epoch: HummockReadEpoch, | ||
) -> StreamExecutorResult<Option<ScalarImpl>> { | ||
let epoch = table.epoch(); | ||
let handle_watermark_row = |watermark_row: Option<OwnedRow>| match watermark_row { | ||
Some(row) => { | ||
if row.len() == 1 { | ||
|
@@ -319,9 +342,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> { | |
.iter_vnodes() | ||
.map(|vnode| async move { | ||
let pk = row::once(vnode.to_datum()); | ||
let watermark_row: Option<OwnedRow> = global_watermark_table | ||
.get_row(pk, HummockReadEpoch::NoWait(epoch)) | ||
.await?; | ||
let watermark_row: Option<OwnedRow> = | ||
global_watermark_table.get_row(pk, wait_epoch).await?; | ||
handle_watermark_row(watermark_row) | ||
}); | ||
let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess
NoWait
is still okay here, cuz we're sureprev_epoch
of thePause
checkpoint is committed here.risingwave/src/meta/src/barrier/command.rs
Lines 892 to 900 in 4605773
By
NoWait
onupdate.prev_epoch
(i.e.pause.curr_epoch
), we're essentially read on the exactly same snapshot aspause.prev_epoch
because we ensure that during thePause
barrier no data is written.