Skip to content

Commit

Permalink
fix(watermark): stop generating watermark messages when stream is pau…
Browse files Browse the repository at this point in the history
…sed (#19199)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 4, 2024
1 parent 07a4d1b commit 22c7459
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 31 deletions.
51 changes: 37 additions & 14 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct Inner {
nondecreasing_expr_indices: Vec<usize>,
/// Last seen values of nondecreasing expressions, buffered to periodically produce watermarks.
last_nondec_expr_values: Vec<Option<ScalarImpl>>,
/// Whether the stream is paused.
is_paused: bool,

/// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less
/// than the threshold, the Project executor will construct a new chunk before expr evaluation,
Expand Down Expand Up @@ -70,6 +72,7 @@ impl ProjectExecutor {
watermark_derivations,
nondecreasing_expr_indices,
last_nondec_expr_values: vec![None; n_nondecreasing_exprs],
is_paused: false,
materialize_selectivity_threshold,
noop_update_hint,
},
Expand Down Expand Up @@ -143,8 +146,13 @@ impl Inner {

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute(mut self, input: Executor) {
let mut input = input.execute();
let first_barrier = expect_first_barrier(&mut input).await?;
self.is_paused = first_barrier.is_pause_on_startup();
yield Message::Barrier(first_barrier);

#[for_await]
for msg in input.execute() {
for msg in input {
let msg = msg?;
match msg {
Message::Watermark(w) => {
Expand Down Expand Up @@ -174,21 +182,36 @@ impl Inner {
}
None => continue,
},
barrier @ Message::Barrier(_) => {
for (&expr_idx, value) in self
.nondecreasing_expr_indices
.iter()
.zip_eq_fast(&mut self.last_nondec_expr_values)
{
if let Some(value) = std::mem::take(value) {
yield Message::Watermark(Watermark::new(
expr_idx,
self.exprs[expr_idx].return_type(),
value,
))
Message::Barrier(barrier) => {
if !self.is_paused {
for (&expr_idx, value) in self
.nondecreasing_expr_indices
.iter()
.zip_eq_fast(&mut self.last_nondec_expr_values)
{
if let Some(value) = std::mem::take(value) {
yield Message::Watermark(Watermark::new(
expr_idx,
self.exprs[expr_idx].return_type(),
value,
))
}
}
}

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
self.is_paused = true;
}
Mutation::Resume => {
self.is_paused = false;
}
_ => (),
}
}
yield barrier;

yield Message::Barrier(barrier);
}
}
}
Expand Down
49 changes: 35 additions & 14 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ impl Execute for ProjectSetExecutor {
impl Inner {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute(self, input: Executor) {
let mut input = input.execute();
let first_barrier = expect_first_barrier(&mut input).await?;
let mut is_paused = first_barrier.is_pause_on_startup();
yield Message::Barrier(first_barrier);

assert!(!self.select_list.is_empty());
// First column will be `projected_row_id`, which represents the index in the
// output table
Expand All @@ -104,30 +109,46 @@ impl Inner {
let mut builder = StreamChunkBuilder::new(self.chunk_size, data_types);

let mut last_nondec_expr_values = vec![None; self.nondecreasing_expr_indices.len()];

#[for_await]
for msg in input.execute() {
for msg in input {
match msg? {
Message::Watermark(watermark) => {
let watermarks = self.handle_watermark(watermark).await?;
for watermark in watermarks {
yield Message::Watermark(watermark)
}
}
m @ Message::Barrier(_) => {
for (&expr_idx, value) in self
.nondecreasing_expr_indices
.iter()
.zip_eq_fast(&mut last_nondec_expr_values)
{
if let Some(value) = std::mem::take(value) {
yield Message::Watermark(Watermark::new(
expr_idx + PROJ_ROW_ID_OFFSET,
self.select_list[expr_idx].return_type(),
value,
))
Message::Barrier(barrier) => {
if !is_paused {
for (&expr_idx, value) in self
.nondecreasing_expr_indices
.iter()
.zip_eq_fast(&mut last_nondec_expr_values)
{
if let Some(value) = std::mem::take(value) {
yield Message::Watermark(Watermark::new(
expr_idx + PROJ_ROW_ID_OFFSET,
self.select_list[expr_idx].return_type(),
value,
))
}
}
}
yield m

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
is_paused = true;
}
Mutation::Resume => {
is_paused = false;
}
_ => (),
}
}

yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
let mut results = Vec::with_capacity(self.select_list.len());
Expand Down
21 changes: 18 additions & 3 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
let first_barrier = expect_first_barrier(&mut input).await?;
let prev_epoch = first_barrier.epoch.prev;
table.init_epoch(first_barrier.epoch);
let mut is_paused = first_barrier.is_pause_on_startup();
// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);

Expand All @@ -115,7 +116,9 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

let mut last_checkpoint_watermark = None;

if let Some(watermark) = current_watermark.clone() {
if let Some(watermark) = current_watermark.clone()
&& !is_paused
{
yield Message::Watermark(Watermark::new(
event_time_col_idx,
watermark_type.clone(),
Expand Down Expand Up @@ -245,8 +248,20 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
}
}
}

table.commit(barrier.epoch).await?;

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
is_paused = true;
}
Mutation::Resume => {
is_paused = false;
}
_ => (),
}
}

yield Message::Barrier(barrier);

if need_update_global_max_watermark {
Expand All @@ -258,7 +273,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
.await?;
}

if is_checkpoint {
if is_checkpoint && !is_paused {
if idle_input {
barrier_num_during_idle += 1;

Expand Down
3 changes: 3 additions & 0 deletions src/stream/tests/integration_tests/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use multimap::MultiMap;
use risingwave_common::util::epoch::test_epoch;
use risingwave_expr::table_function::repeat;
use risingwave_stream::executor::ProjectSetExecutor;
use risingwave_stream::task::ActorEvalErrorReport;
Expand Down Expand Up @@ -60,6 +61,7 @@ fn create_executor() -> (MessageSender, BoxedMessageStream) {
async fn test_project_set() {
let (mut tx, mut project_set) = create_executor();

tx.push_barrier(test_epoch(1), false);
tx.push_chunk(StreamChunk::from_pretty(
" I I
+ 1 4
Expand All @@ -76,6 +78,7 @@ async fn test_project_set() {
check_until_pending(
&mut project_set,
expect_test::expect![[r#"
- !barrier 1
- !chunk |-
+---+---+---+---+---+---+
| + | 0 | 5 | 2 | 1 | 2 |
Expand Down

0 comments on commit 22c7459

Please sign in to comment.