Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc-backfill): support pause and resume #14590

Merged
merged 13 commits into from
Jan 17, 2024
29 changes: 26 additions & 3 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::{pin, Pin};
use std::pin::Pin;
use std::sync::Arc;

use either::Either;
Expand All @@ -32,6 +32,7 @@ use risingwave_connector::parser::{
use risingwave_connector::source::cdc::external::CdcOffset;
use risingwave_connector::source::{SourceColumnDesc, SourceContext};
use risingwave_storage::StateStore;
use rw_futures_util::pausable;

use crate::common::table::state_table::StateTable;
use crate::executor::backfill::cdc::state::CdcBackfillState;
Expand Down Expand Up @@ -122,6 +123,8 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// Poll the upstream to get the first barrier.
let first_barrier = expect_first_barrier(&mut upstream).await?;

let mut paused = first_barrier.is_pause_on_startup();

// Check whether this parallelism has been assigned splits,
// if not, we should bypass the backfill directly.
let mut state_impl = CdcBackfillState::new(
Expand Down Expand Up @@ -219,8 +222,13 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let left_upstream = upstream.by_ref().map(Either::Left);

let args = SnapshotReadArgs::new_for_cdc(current_pk_pos.clone(), self.chunk_size);
let right_snapshot =
pin!(upstream_table_reader.snapshot_read(args).map(Either::Right));

let (right_snapshot, valve) =
pausable(upstream_table_reader.snapshot_read(args).map(Either::Right));

if paused {
valve.pause();
}

// Prefer to select upstream, so we can stop snapshot stream when barrier comes.
let backfill_stream =
Expand All @@ -238,6 +246,21 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
Either::Left(msg) => {
match msg? {
Message::Barrier(barrier) => {
if let Some(mutation) = barrier.mutation.as_deref() {
use crate::executor::Mutation;
match mutation {
Mutation::Pause => {
paused = true;
valve.pause();
}
Mutation::Resume => {
paused = false;
valve.resume();
}
_ => (),
}
}

// If it is a barrier, switch snapshot and consume buffered
// upstream chunk.
// If no current_pos, means we did not process any snapshot yet.
Expand Down
12 changes: 11 additions & 1 deletion src/utils/futures_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@
use std::future::Future;

use futures::stream::TryStream;
use futures::TryFuture;
use futures::{Stream, TryFuture};

mod buffered_with_fence;
mod misc;
mod pausable;

use buffered_with_fence::{Fenced, MaybeFence, TryBufferedWithFence};
pub use misc::*;
pub use pausable::{Pausable, Valve};

/// Create a pausable stream, which can be paused or resumed by a valve.
pub fn pausable<St>(stream: St) -> (Pausable<St>, Valve)
where
St: Stream,
{
Pausable::new(stream)
}

pub trait RwTryStreamExt: TryStream {
/// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence.
Expand Down
96 changes: 96 additions & 0 deletions src/utils/futures_util/src/pausable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
TennyZhuang marked this conversation as resolved.
Show resolved Hide resolved
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

use futures::Stream;
use pin_project_lite::pin_project;

pin_project! {
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Pausable<St>
where St: Stream
{
#[pin]
stream: St,
paused: Arc<AtomicBool>,
waker: Arc<Mutex<Option<Waker>>>,
}
}

/// A valve is a handle that can control the [`Pausable`] stream.
#[derive(Clone)]
pub struct Valve {
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
paused: Arc<AtomicBool>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Valve {
/// Pause the stream controlled by the valve.
pub fn pause(&self) {
self.paused.store(true, Ordering::Relaxed);
}

/// Resume the stream controlled by the valve.
pub fn resume(&self) {
self.paused.store(false, Ordering::Relaxed);
if let Some(waker) = self.waker.lock().unwrap().as_ref() {
waker.wake_by_ref()
}
}
}

impl<St> Pausable<St>
where
St: Stream,
{
pub(crate) fn new(stream: St) -> (Self, Valve) {
let paused = Arc::new(AtomicBool::new(false));
let waker = Arc::new(Mutex::new(None));
(
Pausable {
stream,
paused: paused.clone(),
waker: waker.clone(),
},
Valve { paused, waker },
)
}
}

impl<St> Stream for Pausable<St>
where
St: Stream,
{
type Item = St::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if this.paused.load(Ordering::Relaxed) {
let mut waker = this.waker.lock().unwrap();
*waker = Some(cx.waker().clone());
Poll::Pending
Copy link
Member

@BugenZhao BugenZhao Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning Pending without doing anything with the waker could potentially lead to deadlock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give me more explanation? I can only find https://docs.rs/futures-util/0.3.30/src/futures_util/stream/pending.rs.html#32

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the article, but I can't find the solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, should I register the paused as a signal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@BugenZhao BugenZhao Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, should I register the paused as a signal?

Yeah. I guess using anything else with a wait-queue mechanism should work.

Although I guess the deadlock won't occur with the usage of this PR, but since it's made a general utility, we should be more careful about it.

} else {
this.stream.poll_next(cx)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
Loading