Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Jul 18, 2024
1 parent 0b858bb commit 107c6d7
Showing 1 changed file with 37 additions and 29 deletions.
66 changes: 37 additions & 29 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl<S: StateStore> Execute for WatermarkFilterExecutor<S> {
self.execute_inner().boxed()
}
}
const UPDATE_GLIOBAL_WATERMARK_FREQUENCY: usize = 10;

impl<S: StateStore> WatermarkFilterExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
Expand Down Expand Up @@ -124,7 +125,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

// If the input is idle
let mut idle_input = true;

let mut barrier_num_during_idle = 0;
#[for_await]
for msg in input {
let msg = msg?;
Expand Down Expand Up @@ -255,35 +256,42 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
HummockReadEpoch::Committed(prev_epoch),
)
.await?;
} else if is_checkpoint {
if idle_input {
// Align watermark
// NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch
let global_max_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::NoWait(prev_epoch),
)
.await?;
}

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
&& let Some(watermark) = current_watermark.clone()
{
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
if is_checkpoint {
if idle_input {
barrier_num_during_idle += 1;

if barrier_num_during_idle == UPDATE_GLIOBAL_WATERMARK_FREQUENCY {
barrier_num_during_idle = 0;
// Align watermark
// NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch
let global_max_watermark = Self::get_global_max_watermark(
&table,
&global_watermark_table,
HummockReadEpoch::NoWait(prev_epoch),
)
.await?;

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
&& let Some(watermark) = current_watermark.clone()
{
Some(cmp::max_by(
watermark,
global_max_watermark,
DefaultOrd::default_cmp,
))
} else {
current_watermark.or(global_max_watermark)
};
if let Some(watermark) = current_watermark.clone() {
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
watermark,
));
}
}
} else {
idle_input = true;
Expand Down

0 comments on commit 107c6d7

Please sign in to comment.