Skip to content

Commit

Permalink
feat(streaming): do not backfill for empty table (#7009)
Browse files Browse the repository at this point in the history
If the snapshot is empty, we don't need to backfill and can immediately finish the progress. This can speed up some tests.

```
dev=> create materialized view mv2 as select * from t;
CREATE_MATERIALIZED_VIEW
Time: 1033.834 ms (00:01.034)

dev=> delete from t;
DELETE 1
Time: 9.869 ms

dev=> create materialized view mv3 as select * from t;
CREATE_MATERIALIZED_VIEW
Time: 18.550 ms
```

Note that every executor requires a barrier for the first message. So if there are few records in the table (but not empty), we cannot adapt this optimization. The further plan might be to issue next checkpoints more frequently for this case.


Approved-By: chenzl25

Co-Authored-By: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored and lmatz committed Jan 3, 2023
1 parent 6c7ff63 commit f7d9e30
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
4 changes: 2 additions & 2 deletions e2e_test/source/basic/nexmark/nexmark_endless_part1.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ include ../../../nexmark/create_sources.slt.part

include ../../../streaming/nexmark/views/q3.slt.part

sleep 5s
sleep 10s

query I
select count(*) > 0 from (select * from nexmark_q3 limit 1);
Expand All @@ -17,7 +17,7 @@ include ../../../nexmark/drop_sources.slt.part
include ../../../nexmark/create_sources.slt.part
include ../../../streaming/nexmark/views/q4.slt.part

sleep 5s
sleep 10s

query I
select count(*) > 0 from (select * from nexmark_q4 limit 1);
Expand Down
84 changes: 47 additions & 37 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::cmp::Ordering;
use std::ops::Bound;
use std::sync::Arc;

use async_stack_trace::StackTrace;
use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt};
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
Expand All @@ -31,7 +30,7 @@ use risingwave_storage::table::TableIter;
use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message};
use super::{expect_first_barrier, BoxedExecutor, Executor, ExecutorInfo, Message, PkIndicesRef};
use crate::executor::PkIndices;
use crate::task::{ActorId, CreateMviewProgress};

Expand Down Expand Up @@ -63,11 +62,7 @@ pub struct BackfillExecutor<S: StateStore> {
upstream: BoxedExecutor,

/// The column indices need to be forwarded to the downstream.
upstream_indices: Arc<[usize]>,

/// Current position of the table storage primary key.
/// None means it starts from the beginning.
current_pos: Option<OwnedRow>,
upstream_indices: Vec<usize>,

progress: CreateMviewProgress,

Expand Down Expand Up @@ -98,8 +93,7 @@ where
},
table,
upstream,
upstream_indices: upstream_indices.into(),
current_pos: None,
upstream_indices,
actor_id: progress.actor_id(),
progress,
}
Expand All @@ -108,21 +102,34 @@ where
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
// Table storage primary key.
let table_pk_indices = self.table.pk_indices().to_vec();
let upstream_indices = self.upstream_indices.to_vec();
let table_pk_indices = self.table.pk_indices();
let upstream_indices = self.upstream_indices;

let mut upstream = self.upstream.execute();

// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;
let init_epoch = first_barrier.epoch.prev;

// If the barrier is a conf change of creating this mview, init backfill from its epoch.
// Otherwise, it means we've recovered and the backfill is already finished.
let to_backfill = first_barrier.is_add_dispatcher(self.actor_id);
// If the barrier is a conf change of creating this mview, we follow the procedure of
// backfill. Otherwise, it means we've recovered and we can forward the upstream messages
// directly.
let to_create_mv = first_barrier.is_add_dispatcher(self.actor_id);
// If the snapshot is empty, we don't need to backfill.
let is_snapshot_empty: bool = {
let snapshot = Self::snapshot_read(&self.table, init_epoch, None, false);
pin_mut!(snapshot);
snapshot.try_next().await?.unwrap().is_none()
};
let to_backfill = to_create_mv && !is_snapshot_empty;

if to_create_mv && is_snapshot_empty {
// Directly finish the progress as the snapshot is empty.
self.progress.finish(first_barrier.epoch.curr);
}

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier.clone());
yield Message::Barrier(first_barrier);

if !to_backfill {
// Forward messages directly to the downstream.
Expand All @@ -132,12 +139,17 @@ where
for message in upstream {
yield message?;
}

return Ok(());
}

// The epoch used to snapshot read upstream mv.
let mut snapshot_read_epoch = init_epoch;

// Current position of the table storage primary key.
// `None` means it starts from the beginning.
let mut current_pos: Option<OwnedRow> = None;

// Backfill Algorithm:
//
// backfill_stream
Expand All @@ -164,17 +176,17 @@ where
'backfill_loop: loop {
let mut upstream_chunk_buffer: Vec<StreamChunk> = vec![];

let mut left_upstream = (&mut upstream).map(Either::Left);
let left_upstream = upstream.by_ref().map(Either::Left);

let right_snapshot = Box::pin(
Self::snapshot_read(&self.table, snapshot_read_epoch, self.current_pos.clone())
Self::snapshot_read(&self.table, snapshot_read_epoch, current_pos.clone(), true)
.map(Either::Right),
);

// Prefer to select upstream, so we can stop snapshot stream as soon as the barrier
// comes.
let backfill_stream =
select_with_strategy(&mut left_upstream, right_snapshot, |_: &mut ()| {
select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
stream::PollNext::Left
});

Expand All @@ -189,9 +201,9 @@ where

// Consume upstream buffer chunk
for chunk in upstream_chunk_buffer.drain(..) {
if let Some(current_pos) = self.current_pos.as_ref() {
if let Some(current_pos) = &current_pos {
yield Message::Chunk(Self::mapping_chunk(
Self::mark_chunk(chunk, current_pos, &table_pk_indices),
Self::mark_chunk(chunk, current_pos, table_pk_indices),
&upstream_indices,
));
}
Expand Down Expand Up @@ -238,33 +250,24 @@ where
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
self.current_pos = Some(
current_pos = Some(
chunk
.rows()
.last()
.unwrap()
.1
.project(&table_pk_indices)
.project(table_pk_indices)
.into_owned_row(),
);

yield Message::Chunk(Self::mapping_chunk(
chunk,
&self.upstream_indices,
));
yield Message::Chunk(Self::mapping_chunk(chunk, &upstream_indices));
}
}
}
}
}
}

let mut finish_on_barrier = |msg: &Message| {
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);
}
};

tracing::trace!(
actor = self.actor_id,
"Backfill has already finished and forward messages directly to the downstream"
Expand All @@ -277,14 +280,21 @@ where
#[for_await]
for msg in upstream {
let msg: Message = msg?;
finish_on_barrier(&msg);
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);
}
yield msg;
}
}

#[expect(clippy::needless_lifetimes, reason = "code generated by try_stream")]
#[try_stream(ok = Option<StreamChunk>, error = StreamExecutorError)]
async fn snapshot_read(table: &StorageTable<S>, epoch: u64, current_pos: Option<OwnedRow>) {
async fn snapshot_read(
table: &StorageTable<S>,
epoch: u64,
current_pos: Option<OwnedRow>,
ordered: bool,
) {
// `current_pos` is None means it needs to scan from the beginning, so we use Unbounded to
// scan. Otherwise, use Excluded.
let range_bounds = if let Some(current_pos) = current_pos {
Expand All @@ -299,7 +309,7 @@ where
HummockReadEpoch::NoWait(epoch),
OwnedRow::empty(),
range_bounds,
true,
ordered,
)
.await?;

Expand All @@ -326,7 +336,7 @@ where
fn mark_chunk(
chunk: StreamChunk,
current_pos: &OwnedRow,
table_pk_indices: &PkIndices,
table_pk_indices: PkIndicesRef<'_>,
) -> StreamChunk {
let chunk = chunk.compact();
let (data, ops) = chunk.into_parts();
Expand Down

0 comments on commit f7d9e30

Please sign in to comment.