diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index e25027b61573a..4e17d4ad606d2 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -174,6 +174,7 @@ async fn test_table_materialize() -> StreamResult<()> { barrier_rx, system_params_manager.get_params(), SourceCtrlOpts::default(), + false, ) .boxed(), ); diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 09e0488f6ff91..7caa42f90ca26 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -258,6 +258,8 @@ where W: WatermarkBufferStrategy, { /// Create state table from table catalog and store. + /// + /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used. pub async fn from_table_catalog( table_catalog: &Table, store: S, @@ -1356,9 +1358,6 @@ where /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. - - /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same - /// `vnode`. pub async fn iter_with_prefix( &self, pk_prefix: impl Row, diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 65635d0e40748..ceea232f6836f 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -61,6 +61,8 @@ pub struct SourceExecutor { // control options for connector level source_ctrl_opts: SourceCtrlOpts, + + is_shared: bool, } impl SourceExecutor { @@ -71,6 +73,7 @@ impl SourceExecutor { barrier_receiver: UnboundedReceiver, system_params: SystemParamsReaderRef, source_ctrl_opts: SourceCtrlOpts, + is_shared: bool, ) -> Self { Self { actor_ctx, @@ -79,6 +82,7 @@ impl SourceExecutor { barrier_receiver: Some(barrier_receiver), system_params, source_ctrl_opts, + is_shared, } } @@ -400,6 +404,8 @@ impl SourceExecutor { boot_state.clone_from(splits); } } + // XXX: When the source executor is newly created, the first barrier must be Add or Update (created during scaling). + // But when restarting the cluster, is it possible that the first barrier is an Add adding dispatchers? _ => {} } } @@ -418,6 +424,7 @@ impl SourceExecutor { // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); + let mut is_uninitialized = core.split_state_store.is_empty().await?; // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); @@ -436,11 +443,11 @@ impl SourceExecutor { let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); - // If the first barrier requires us to pause on startup, pause the stream. - if barrier.is_pause_on_startup() { + // - For shared source, pause until there's a MV. + // - If the first barrier requires us to pause on startup, pause the stream. + if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() { stream.pause_stream(); } - // TODO: for shared source, pause until there's a MV. yield Message::Barrier(barrier); @@ -473,6 +480,7 @@ impl SourceExecutor { if let Some(mutation) = barrier.mutation.as_deref() { match mutation { + // XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. Mutation::Pause => stream.pause_stream(), Mutation::Resume => stream.resume_stream(), Mutation::SourceChangeSplit(actor_splits) => { @@ -500,6 +508,16 @@ impl SourceExecutor { ) .await?; } + Mutation::Add(AddMutation { adds, .. }) => { + // The shared source executor has a downstream MV now. Let's start working! + if adds.contains_key(&self.actor_ctx.id) + && self.is_shared + && is_uninitialized + { + stream.resume_stream(); + is_uninitialized = false; + } + } _ => {} } } @@ -778,6 +796,7 @@ mod tests { barrier_rx, system_params_manager.get_params(), SourceCtrlOpts::default(), + false, ); let mut executor = executor.boxed().execute(); @@ -866,6 +885,7 @@ mod tests { barrier_rx, system_params_manager.get_params(), SourceCtrlOpts::default(), + false, ); let mut handler = executor.boxed().execute(); diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index f85f12c793a2b..74a163b01e1b4 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -38,6 +38,7 @@ use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::key::next_key; use risingwave_pb::catalog::PbTable; use risingwave_storage::store::PrefetchOptions; +use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; @@ -51,6 +52,7 @@ pub struct SourceStateTableHandler { } impl SourceStateTableHandler { + /// Creates a state table with singleton distribution. pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { state_table: StateTable::from_table_catalog(table_catalog, store, None).await, @@ -82,6 +84,24 @@ impl SourceStateTableHandler { .map_err(StreamExecutorError::from) } + /// This source has not consumed any data yet. + pub async fn is_empty(&self) -> StreamExecutorResult { + assert_eq!( + self.state_table.vnodes(), + TableDistribution::singleton_vnode_bitmap_ref(), + "SourceExecutor's state table should have singleton distribution" + ); + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); + + let state_table_iter = self + .state_table + .iter_with_prefix(None::, sub_range, Default::default()) + .await?; + pin_mut!(state_table_iter); + + Ok(state_table_iter.next().await.is_none()) + } + /// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor) pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult> { let start = Bound::Excluded(row::once(Some(Self::string_to_scalar( diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 6bbf76d0bf85c..dc9308f39bcd2 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -96,12 +96,14 @@ impl StreamReaderWithPause { /// Pause the data stream. pub fn pause_stream(&mut self) { assert!(!self.paused, "already paused"); + tracing::info!("data stream paused"); self.paused = true; } /// Resume the data stream. Panic if the data stream is not paused. pub fn resume_stream(&mut self) { assert!(self.paused, "not paused"); + tracing::info!("data stream resumed"); self.paused = false; } } diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 7f8bff9d3c6f1..51c8d8cc7439a 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -190,11 +190,12 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|column| column.column_id) .collect(); - let state_table_handler = SourceStateTableHandler::from_table_catalog( - source.state_table.as_ref().unwrap(), - store.clone(), - ) - .await; + let mut state_table = source.state_table.clone().unwrap(); + // To make it possible to scan the whole state table. + // This is quite wild, can we do this? + state_table.read_prefix_len_hint = 0; + let state_table_handler = + SourceStateTableHandler::from_table_catalog(&state_table, store.clone()).await; let stream_source_core = StreamSourceCore::new( source_id, source_name, @@ -233,6 +234,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { ) .boxed() } else { + let is_shared = source.info.as_ref().is_some_and(|info| info.is_shared()); SourceExecutor::new( params.actor_context.clone(), Some(stream_source_core), @@ -240,6 +242,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, source_ctrl_opts.clone(), + is_shared, ) .boxed() } @@ -276,6 +279,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { system_params, // we don't expect any data in, so no need to set chunk_sizes SourceCtrlOpts::default(), + false, ); Ok((params.info, exec).into()) }