Skip to content

Commit

Permalink
feat: pause shared source until a MV is created
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Apr 17, 2024
1 parent fdf05fb commit 302266f
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn test_table_materialize() -> StreamResult<()> {
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
false,
)
.boxed(),
);
Expand Down
5 changes: 2 additions & 3 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ where
W: WatermarkBufferStrategy,
{
/// Create state table from table catalog and store.
///
/// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
pub async fn from_table_catalog(
table_catalog: &Table,
store: S,
Expand Down Expand Up @@ -1356,9 +1358,6 @@ where
/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
/// `vnode`.
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
Expand Down
26 changes: 23 additions & 3 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct SourceExecutor<S: StateStore> {

// control options for connector level
source_ctrl_opts: SourceCtrlOpts,

is_shared: bool,
}

impl<S: StateStore> SourceExecutor<S> {
Expand All @@ -71,6 +73,7 @@ impl<S: StateStore> SourceExecutor<S> {
barrier_receiver: UnboundedReceiver<Barrier>,
system_params: SystemParamsReaderRef,
source_ctrl_opts: SourceCtrlOpts,
is_shared: bool,
) -> Self {
Self {
actor_ctx,
Expand All @@ -79,6 +82,7 @@ impl<S: StateStore> SourceExecutor<S> {
barrier_receiver: Some(barrier_receiver),
system_params,
source_ctrl_opts,
is_shared,
}
}

Expand Down Expand Up @@ -400,6 +404,8 @@ impl<S: StateStore> SourceExecutor<S> {
boot_state.clone_from(splits);
}
}
// XXX: When the source executor is newly created, the first barrier must be Add or Update (created during scaling).
// But when restarting the cluster, is it possible that the first barrier is an Add adding dispatchers?
_ => {}
}
}
Expand All @@ -418,6 +424,7 @@ impl<S: StateStore> SourceExecutor<S> {

// init in-memory split states with persisted state if any
core.init_split_state(boot_state.clone());
let mut is_uninitialized = core.split_state_store.is_empty().await?;

// Return the ownership of `stream_source_core` to the source executor.
self.stream_source_core = Some(core);
Expand All @@ -436,11 +443,11 @@ impl<S: StateStore> SourceExecutor<S> {
let mut stream =
StreamReaderWithPause::<true, StreamChunk>::new(barrier_stream, source_chunk_reader);

// If the first barrier requires us to pause on startup, pause the stream.
if barrier.is_pause_on_startup() {
// - For shared source, pause until there's a MV.
// - If the first barrier requires us to pause on startup, pause the stream.
if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() {
stream.pause_stream();
}
// TODO: for shared source, pause until there's a MV.

yield Message::Barrier(barrier);

Expand Down Expand Up @@ -473,6 +480,7 @@ impl<S: StateStore> SourceExecutor<S> {

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
// XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic.
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::SourceChangeSplit(actor_splits) => {
Expand Down Expand Up @@ -500,6 +508,16 @@ impl<S: StateStore> SourceExecutor<S> {
)
.await?;
}
Mutation::Add(AddMutation { adds, .. }) => {
// The shared source executor has a downstream MV now. Let's start working!
if adds.contains_key(&self.actor_ctx.id)
&& self.is_shared
&& is_uninitialized
{
stream.resume_stream();
is_uninitialized = false;
}
}
_ => {}
}
}
Expand Down Expand Up @@ -778,6 +796,7 @@ mod tests {
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
false,
);
let mut executor = executor.boxed().execute();

Expand Down Expand Up @@ -866,6 +885,7 @@ mod tests {
barrier_rx,
system_params_manager.get_params(),
SourceCtrlOpts::default(),
false,
);
let mut handler = executor.boxed().execute();

Expand Down
20 changes: 20 additions & 0 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData};
use risingwave_hummock_sdk::key::next_key;
use risingwave_pb::catalog::PbTable;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::TableDistribution;
use risingwave_storage::StateStore;

use crate::common::table::state_table::StateTable;
Expand All @@ -51,6 +52,7 @@ pub struct SourceStateTableHandler<S: StateStore> {
}

impl<S: StateStore> SourceStateTableHandler<S> {
/// Creates a state table with singleton distribution.
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_table: StateTable::from_table_catalog(table_catalog, store, None).await,
Expand Down Expand Up @@ -82,6 +84,24 @@ impl<S: StateStore> SourceStateTableHandler<S> {
.map_err(StreamExecutorError::from)
}

/// This source has not consumed any data yet.
pub async fn is_empty(&self) -> StreamExecutorResult<bool> {
assert_eq!(
self.state_table.vnodes(),
TableDistribution::singleton_vnode_bitmap_ref(),
"SourceExecutor's state table should have singleton distribution"
);
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);

let state_table_iter = self
.state_table
.iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
.await?;
pin_mut!(state_table_iter);

Ok(state_table_iter.next().await.is_none())
}

/// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor)
pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult<HashSet<SplitId>> {
let start = Bound::Excluded(row::once(Some(Self::string_to_scalar(
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ impl<const BIASED: bool, M: Send + 'static> StreamReaderWithPause<BIASED, M> {
/// Pause the data stream.
pub fn pause_stream(&mut self) {
assert!(!self.paused, "already paused");
tracing::info!("data stream paused");
self.paused = true;
}

/// Resume the data stream. Panic if the data stream is not paused.
pub fn resume_stream(&mut self) {
assert!(self.paused, "not paused");
tracing::info!("data stream resumed");
self.paused = false;
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/stream/src/from_proto/source/trad_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.map(|column| column.column_id)
.collect();

let state_table_handler = SourceStateTableHandler::from_table_catalog(
source.state_table.as_ref().unwrap(),
store.clone(),
)
.await;
let mut state_table = source.state_table.clone().unwrap();
// To make it possible to scan the whole state table.
// This is quite wild, can we do this?
state_table.read_prefix_len_hint = 0;
let state_table_handler =
SourceStateTableHandler::from_table_catalog(&state_table, store.clone()).await;
let stream_source_core = StreamSourceCore::new(
source_id,
source_name,
Expand Down Expand Up @@ -233,13 +234,15 @@ impl ExecutorBuilder for SourceExecutorBuilder {
)
.boxed()
} else {
let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared());
SourceExecutor::new(
params.actor_context.clone(),
Some(stream_source_core),
params.executor_stats.clone(),
barrier_receiver,
system_params,
source_ctrl_opts.clone(),
is_shared,
)
.boxed()
}
Expand Down Expand Up @@ -276,6 +279,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
system_params,
// we don't expect any data in, so no need to set chunk_sizes
SourceCtrlOpts::default(),
false,
);
Ok((params.info, exec).into())
}
Expand Down

0 comments on commit 302266f

Please sign in to comment.