Skip to content

Commit

Permalink
feat(now): merge watermark of multiple ready barriers (#13271)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 25, 2023
1 parent 8fd89c4 commit ecd82b8
Showing 1 changed file with 51 additions and 35 deletions.
86 changes: 51 additions & 35 deletions src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::row::{self, OwnedRow};
use risingwave_common::types::{DataType, Datum};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{
Barrier, BoxedMessageStream, Executor, ExecutorInfo, Message, Mutation, PkIndicesRef,
Expand Down Expand Up @@ -55,7 +56,7 @@ impl<S: StateStore> NowExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn into_stream(self) {
let Self {
mut barrier_receiver,
barrier_receiver,
mut state_table,
info,
..
Expand All @@ -68,45 +69,60 @@ impl<S: StateStore> NowExecutor<S> {
// Whether the first barrier is handled and `last_timestamp` is initialized.
let mut initialized = false;

while let Some(barrier) = barrier_receiver.recv().await {
if !initialized {
// Handle the first barrier.
state_table.init_epoch(barrier.epoch);
let state_row = {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let data_iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await?;
pin_mut!(data_iter);
if let Some(keyed_row) = data_iter.next().await {
Some(keyed_row?)
} else {
None
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
state_table.commit_no_data_expected(barrier.epoch);
} else {
state_table.commit(barrier.epoch).await?;
const MAX_MERGE_BARRIER_SIZE: usize = 64;

#[for_await]
for barriers in
UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
{
let mut timestamp = None;
if barriers.len() > 1 {
warn!(
"handle multiple barriers at once in now executor: {}",
barriers.len()
);
}
for barrier in barriers {
if !initialized {
// Handle the first barrier.
state_table.init_epoch(barrier.epoch);
let state_row = {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Unbounded, Unbounded);
let data_iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await?;
pin_mut!(data_iter);
if let Some(keyed_row) = data_iter.next().await {
Some(keyed_row?)
} else {
None
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
state_table.commit_no_data_expected(barrier.epoch);
} else {
state_table.commit(barrier.epoch).await?;
}

// Extract timestamp from the current epoch.
let timestamp = Some(barrier.get_curr_epoch().as_scalar());
// Extract timestamp from the current epoch.
timestamp = Some(barrier.get_curr_epoch().as_scalar());

// Update paused state.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => paused = true,
Mutation::Resume => paused = false,
_ => {}
// Update paused state.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => paused = true,
Mutation::Resume => paused = false,
_ => {}
}
}
}

yield Message::Barrier(barrier.clone());
yield Message::Barrier(barrier);
}

// Do not yield any messages if paused.
if paused {
Expand Down

0 comments on commit ecd82b8

Please sign in to comment.