Skip to content

Commit

Permalink
stall barrier to force a snapshot read
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Feb 11, 2024
1 parent 347fa58 commit be67790
Showing 1 changed file with 79 additions and 3 deletions.
82 changes: 79 additions & 3 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::{pin_mut, stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, ScalarRefImpl};
Expand Down Expand Up @@ -189,6 +190,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// drive the upstream changelog first to ensure we can receive timely changelog event,
// otherwise the upstream changelog may be blocked by the snapshot read stream
let _ = Pin::new(&mut upstream).peek().await;
let mut has_snapshot_read = false;

// wait for a barrier to make sure the backfill starts after upstream source
#[for_await]
Expand Down Expand Up @@ -223,7 +225,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
pin!(upstream_table_reader.snapshot_read(args).map(Either::Right));

// Prefer to select upstream, so we can stop snapshot stream when barrier comes.
let backfill_stream =
let mut backfill_stream =
select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| {
stream::PollNext::Left
});
Expand All @@ -232,15 +234,88 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let mut cur_barrier_upstream_processed_rows: u64 = 0;

#[for_await]
for either in backfill_stream {
for either in &mut backfill_stream {
match either {
// Upstream
Either::Left(msg) => {
match msg? {
Message::Barrier(barrier) => {
if !has_snapshot_read {
let mut snapshot_stream_end = false;
// If no snapshot read happen, the current_pk_pos is unchanged in previous epoch.
// We stall the barrier message and force a snapshot read here to avoid starvation of the snapshot stream.
let (_, snapshot_stream) = backfill_stream.into_inner();
#[for_await]
for msg in snapshot_stream {
let Either::Right(msg) = msg else {
bail!(
"BUG: snapshot_read contains upstream messages"
);
};
match msg? {
None => {
snapshot_stream_end = true;
break;
}
Some(chunk) => {
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
current_pk_pos = Some(get_new_pos(
&chunk,
&pk_in_output_indices,
));

tracing::trace!(
"got a snapshot chunk: len {}, current_pk_pos {:?}",
chunk.cardinality(),
current_pk_pos
);
let chunk_cardinality =
chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows +=
chunk_cardinality;
total_snapshot_row_count += chunk_cardinality;
yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
));
break;
}
}
}

if snapshot_stream_end {
// End of the snapshot read stream.
// Consume the buffered upstream chunk without filtering by `binlog_low`.
for chunk in upstream_chunk_buffer.drain(..) {
yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
));
}

// mark backfill has finished
state_impl
.mutate_state(
current_pk_pos,
last_binlog_offset.clone(),
total_snapshot_row_count,
true,
)
.await?;

// commit state because we have received a barrier message
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);
// end of backfill loop, since backfill has finished
break 'backfill_loop;
}
}

// If it is a barrier, switch snapshot and consume buffered
// upstream chunk.
// If no current_pos, means we did not process any snapshot yet.
// If current_pk_pos is None, means we did not process any snapshot yet.
// In that case we can just ignore the upstream buffer chunk.
if let Some(current_pos) = &current_pk_pos {
for chunk in upstream_chunk_buffer.drain(..) {
Expand Down Expand Up @@ -356,6 +431,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
// Snapshot read
Either::Right(msg) => {
has_snapshot_read = true;
match msg? {
None => {
tracing::info!(
Expand Down

0 comments on commit be67790

Please sign in to comment.