Skip to content

Commit

Permalink
feat(cdc-backfill): support pause_on_startup
Browse files Browse the repository at this point in the history
Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang committed Jan 16, 2024
1 parent 9162478 commit 0000002
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::{pin, Pin};
use std::pin::Pin;
use std::sync::Arc;

use auto_enums::auto_enum;
use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt};
Expand Down Expand Up @@ -122,6 +123,8 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;

let paused = first_barrier.is_pause_on_startup();

// Check whether this parallelism has been assigned splits,
// if not, we should bypass the backfill directly.
let mut state_impl = CdcBackfillState::new(
Expand Down Expand Up @@ -219,8 +222,13 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let left_upstream = upstream.by_ref().map(Either::Left);

let args = SnapshotReadArgs::new_for_cdc(current_pk_pos.clone(), self.chunk_size);
let right_snapshot =
pin!(upstream_table_reader.snapshot_read(args).map(Either::Right));

#[auto_enum(futures03::Stream)]
let right_snapshot = if !paused {
upstream_table_reader.snapshot_read(args).map(Either::Right)
} else {
futures::stream::pending()
};

// Prefer to select upstream, so we can stop snapshot stream when barrier comes.
let backfill_stream =
Expand Down

0 comments on commit 0000002

Please sign in to comment.