From be677902c158bb0e65e729af945fc0a0109f9fb4 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Sun, 11 Feb 2024 13:56:37 +0800 Subject: [PATCH] stall barrier to force a snapshot read --- .../src/executor/backfill/cdc/cdc_backfill.rs | 82 ++++++++++++++++++- 1 file changed, 79 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 4c365fa78ec8c..c66d146747586 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -21,6 +21,7 @@ use futures::{pin_mut, stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarRefImpl}; @@ -189,6 +190,7 @@ impl CdcBackfillExecutor { // drive the upstream changelog first to ensure we can receive timely changelog event, // otherwise the upstream changelog may be blocked by the snapshot read stream let _ = Pin::new(&mut upstream).peek().await; + let mut has_snapshot_read = false; // wait for a barrier to make sure the backfill starts after upstream source #[for_await] @@ -223,7 +225,7 @@ impl CdcBackfillExecutor { pin!(upstream_table_reader.snapshot_read(args).map(Either::Right)); // Prefer to select upstream, so we can stop snapshot stream when barrier comes. - let backfill_stream = + let mut backfill_stream = select_with_strategy(left_upstream, right_snapshot, |_: &mut ()| { stream::PollNext::Left }); @@ -232,15 +234,88 @@ impl CdcBackfillExecutor { let mut cur_barrier_upstream_processed_rows: u64 = 0; #[for_await] - for either in backfill_stream { + for either in &mut backfill_stream { match either { // Upstream Either::Left(msg) => { match msg? { Message::Barrier(barrier) => { + if !has_snapshot_read { + let mut snapshot_stream_end = false; + // If no snapshot read happen, the current_pk_pos is unchanged in previous epoch. + // We stall the barrier message and force a snapshot read here to avoid starvation of the snapshot stream. + let (_, snapshot_stream) = backfill_stream.into_inner(); + #[for_await] + for msg in snapshot_stream { + let Either::Right(msg) = msg else { + bail!( + "BUG: snapshot_read contains upstream messages" + ); + }; + match msg? { + None => { + snapshot_stream_end = true; + break; + } + Some(chunk) => { + // Raise the current position. + // As snapshot read streams are ordered by pk, so we can + // just use the last row to update `current_pos`. + current_pk_pos = Some(get_new_pos( + &chunk, + &pk_in_output_indices, + )); + + tracing::trace!( + "got a snapshot chunk: len {}, current_pk_pos {:?}", + chunk.cardinality(), + current_pk_pos + ); + let chunk_cardinality = + chunk.cardinality() as u64; + cur_barrier_snapshot_processed_rows += + chunk_cardinality; + total_snapshot_row_count += chunk_cardinality; + yield Message::Chunk(mapping_chunk( + chunk, + &self.output_indices, + )); + break; + } + } + } + + if snapshot_stream_end { + // End of the snapshot read stream. + // Consume the buffered upstream chunk without filtering by `binlog_low`. + for chunk in upstream_chunk_buffer.drain(..) { + yield Message::Chunk(mapping_chunk( + chunk, + &self.output_indices, + )); + } + + // mark backfill has finished + state_impl + .mutate_state( + current_pk_pos, + last_binlog_offset.clone(), + total_snapshot_row_count, + true, + ) + .await?; + + // commit state because we have received a barrier message + state_impl.commit_state(barrier.epoch).await?; + yield Message::Barrier(barrier); + // end of backfill loop, since backfill has finished + break 'backfill_loop; + } + } + // If it is a barrier, switch snapshot and consume buffered // upstream chunk. - // If no current_pos, means we did not process any snapshot yet. + // If current_pk_pos is None, means we did not process any snapshot yet. // In that case we can just ignore the upstream buffer chunk. if let Some(current_pos) = ¤t_pk_pos { for chunk in upstream_chunk_buffer.drain(..) { @@ -356,6 +431,7 @@ impl CdcBackfillExecutor { } // Snapshot read Either::Right(msg) => { + has_snapshot_read = true; match msg? { None => { tracing::info!(