Skip to content

Commit

Permalink
refactor(stream): explicitly wait first barrier in now and temporal j…
Browse files Browse the repository at this point in the history
…oin executor
  • Loading branch information
wenym1 committed Nov 1, 2024
1 parent 051fbad commit 6c667bf
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 47 deletions.
59 changes: 29 additions & 30 deletions src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::ops::Bound;
use std::ops::Bound::Unbounded;

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::row;
Expand Down Expand Up @@ -91,13 +92,34 @@ impl<S: StateStore> NowExecutor<S> {

let max_chunk_size = crate::config::chunk_size();

let mut barrier_stream = UnboundedReceiverStream::new(barrier_receiver);
let first_barrier = barrier_stream
.next()
.await
.ok_or_else(|| anyhow!("end of barrier stream"))?;

let first_epoch = first_barrier.epoch;

// Whether the executor is paused.
let mut paused = false;
// The last timestamp **sent** to the downstream.
let mut last_timestamp: Datum = None;
let mut paused = first_barrier.is_pause_on_startup();

// Whether the first barrier is handled and `last_timestamp` is initialized.
let mut initialized = false;
yield Message::Barrier(first_barrier);

state_table.init_epoch(first_epoch);
// The last timestamp **sent** to the downstream.
let mut last_timestamp: Datum = {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let data_iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await?;
pin_mut!(data_iter);
let initial_state_row = if let Some(keyed_row) = data_iter.next().await {
Some(keyed_row?)
} else {
None
};
initial_state_row.and_then(|row| row[0].clone())
};

let mut mode_vars = match &mode {
NowMode::UpdateCurrent => ModeVars::UpdateCurrent,
Expand All @@ -116,9 +138,7 @@ impl<S: StateStore> NowExecutor<S> {
const MAX_MERGE_BARRIER_SIZE: usize = 64;

#[for_await]
for barriers in
UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
{
for barriers in barrier_stream.ready_chunks(MAX_MERGE_BARRIER_SIZE) {
let mut curr_timestamp = None;
if barriers.len() > 1 {
warn!(
Expand All @@ -127,28 +147,7 @@ impl<S: StateStore> NowExecutor<S> {
);
}
for barrier in barriers {
if !initialized {
// Handle the initial barrier.
state_table.init_epoch(barrier.epoch);
let state_row = {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Unbounded, Unbounded);
let data_iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await?;
pin_mut!(data_iter);
if let Some(keyed_row) = data_iter.next().await {
Some(keyed_row?)
} else {
None
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else {
state_table.commit(barrier.epoch).await?;
}
state_table.commit(barrier.epoch).await?;

// Extract timestamp from the current epoch.
curr_timestamp = Some(barrier.get_curr_epoch().as_scalar());
Expand Down
46 changes: 29 additions & 17 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::alloc::Global;
use std::collections::hash_map::Entry;
use std::collections::HashMap;

use anyhow::anyhow;
use either::Either;
use futures::stream::{self, PollNext};
use futures::TryStreamExt;
Expand Down Expand Up @@ -675,8 +676,6 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b

let null_matched = K::Bitmap::from_bool_vec(self.null_safe);

let mut prev_epoch = None;

let full_schema: Vec<_> = self
.left
.schema()
Expand All @@ -685,10 +684,32 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b
.chain(self.right.schema().data_types().into_iter())
.collect();

let mut wait_first_barrier = true;
let input = align_input::<true>(self.left, self.right);
pin_mut!(input);

let first_barrier = match input
.try_next()
.await?
.ok_or_else(|| anyhow!("end of input"))?
{
InternalMessage::Barrier(chunks, barrier) => {
assert!(chunks.is_empty());
barrier
}
_ => {
unreachable!("should receive first barrier")
}
};

let first_epoch = first_barrier.epoch;
yield Message::Barrier(first_barrier);
if let Some(memo_table) = &mut self.memo_table {
memo_table.init_epoch(first_epoch);
}
let mut prev_epoch = first_epoch.prev;

#[for_await]
for msg in align_input::<true>(self.left, self.right) {
for msg in input {
self.right_table.cache.evict();
self.metrics
.temporal_join_cached_entry_count
Expand All @@ -699,7 +720,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b
yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
}
InternalMessage::Chunk(chunk) => {
let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
let epoch = prev_epoch;

let full_schema = full_schema.clone();

Expand Down Expand Up @@ -811,17 +832,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b
}
}
InternalMessage::Barrier(updates, barrier) => {
if !APPEND_ONLY {
if wait_first_barrier {
wait_first_barrier = false;
self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch);
} else {
self.memo_table
.as_mut()
.unwrap()
.commit(barrier.epoch)
.await?;
}
if let Some(memo_table) = &mut self.memo_table {
memo_table.commit(barrier.epoch).await?;
}
if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) {
let prev_vnodes =
Expand All @@ -835,7 +847,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, const APPEND_ONLY: b
&self.right_join_keys,
&right_stream_key_indices,
)?;
prev_epoch = Some(barrier.epoch.curr);
prev_epoch = barrier.epoch.curr;
yield Message::Barrier(barrier)
}
}
Expand Down

0 comments on commit 6c667bf

Please sign in to comment.