diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 73157f91c2d0c..709215e69eaa0 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -81,12 +81,18 @@ pub struct Reschedule { pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, } +/// Replacing an old table with a new one. Used for `ALTER TABLE` and sink into table. All actors in the table job will be rebuilt. #[derive(Debug, Clone)] pub struct ReplaceTablePlan { pub old_table_fragments: TableFragments, pub new_table_fragments: TableFragments, pub merge_updates: Vec, pub dispatchers: HashMap>, + /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids. + /// We need to reassign splits for it. + /// + /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about + /// backfill_splits. pub init_split_assignment: SplitAssignment, } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index d876fbe6c33b5..993e717a12eee 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -49,6 +49,7 @@ pub struct ActorContext { pub streaming_metrics: Arc, + /// This is the number of dispatchers when the actor is created. It will not be updated during runtime when new downstreams are added. pub dispatch_num: usize, } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ab53b02ef7ce9..f74259da9772b 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -372,6 +372,36 @@ impl Barrier { } } + /// Whether this barrier adds new downstream actors for the actor with `upstream_actor_id`. + pub fn has_new_downstream(&self, upstream_actor_id: ActorId) -> bool { + let Some(mutation) = self.mutation.as_deref() else { + return false; + }; + match mutation { + // Add is for mv, index and sink creation. + Mutation::Add(AddMutation { adds, .. }) => adds.get(&upstream_actor_id).is_some(), + // AddAndUpdate is for sink-into-table. + Mutation::AddAndUpdate( + AddMutation { adds, .. }, + UpdateMutation { + dispatchers, + actor_new_dispatchers, + .. + }, + ) => { + adds.get(&upstream_actor_id).is_some() + || actor_new_dispatchers.get(&upstream_actor_id).is_some() + || dispatchers.get(&upstream_actor_id).is_some() + } + Mutation::Update(_) + | Mutation::Stop(_) + | Mutation::Pause + | Mutation::Resume + | Mutation::SourceChangeSplit(_) + | Mutation::Throttle(_) => false, + } + } + /// Whether this barrier requires the executor to pause its data stream on startup. pub fn is_pause_on_startup(&self) -> bool { match self.mutation.as_deref() { diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 3215fdd0fc9c9..a46116d18e742 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -43,10 +43,10 @@ use crate::common::table::state_table::StateTableInner; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ - expect_first_barrier, ActorContext, ActorContextRef, AddMutation, BoxedMessageStream, Execute, - Executor, Message, Mutation, StreamExecutorResult, UpdateMutation, + expect_first_barrier, ActorContext, ActorContextRef, BoxedMessageStream, Execute, Executor, + Message, StreamExecutorResult, }; -use crate::task::{ActorId, AtomicU64Ref}; +use crate::task::AtomicU64Ref; /// `MaterializeExecutor` materializes changes in stream into a materialized view on storage. pub struct MaterializeExecutor { @@ -231,10 +231,9 @@ impl MaterializeExecutor { } } Message::Barrier(b) => { - let mutation = b.mutation.clone(); // If a downstream mv depends on the current table, we need to do conflict check again. if !self.state_table.is_consistent_op() - && Self::new_downstream_created(mutation, self.actor_context.id) + && b.has_new_downstream(self.actor_context.id) { assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite); self.state_table @@ -259,35 +258,6 @@ impl MaterializeExecutor { } } } - - fn new_downstream_created(mutation: Option>, actor_id: ActorId) -> bool { - let Some(mutation) = mutation.as_deref() else { - return false; - }; - match mutation { - // Add is for mv, index and sink creation. - Mutation::Add(AddMutation { adds, .. }) => adds.get(&actor_id).is_some(), - // AddAndUpdate is for sink-into-table. - Mutation::AddAndUpdate( - AddMutation { adds, .. }, - UpdateMutation { - dispatchers, - actor_new_dispatchers: actor_dispatchers, - .. - }, - ) => { - adds.get(&actor_id).is_some() - || actor_dispatchers.get(&actor_id).is_some() - || dispatchers.get(&actor_id).is_some() - } - Mutation::Update(_) - | Mutation::Stop(_) - | Mutation::Pause - | Mutation::Resume - | Mutation::SourceChangeSplit(_) - | Mutation::Throttle(_) => false, - } - } } impl MaterializeExecutor { diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index 678a76f39f927..be9abe8490e63 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -33,6 +33,7 @@ pub struct BackfillStateTableHandler { } impl BackfillStateTableHandler { + /// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like. pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { state_store: StateTable::from_table_catalog(table_catalog, store, None).await, diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index fa77b20b1dd4c..951bca8fd44b4 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -478,7 +478,7 @@ impl SourceExecutor { // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); - let mut is_uninitialized = core.split_state_store.is_empty().await?; + let mut is_uninitialized = self.actor_ctx.dispatch_num == 0; // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); @@ -532,6 +532,13 @@ impl SourceExecutor { let epoch = barrier.epoch; + if barrier.has_new_downstream(self.actor_ctx.id) { + if is_uninitialized { + stream.resume_stream(); + is_uninitialized = false; + } + } + if let Some(mutation) = barrier.mutation.as_deref() { match mutation { // XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. @@ -570,16 +577,6 @@ impl SourceExecutor { .await?; } } - Mutation::Add(AddMutation { adds, .. }) => { - // The shared source executor has a downstream MV now. Let's start working! - if adds.contains_key(&self.actor_ctx.id) - && self.is_shared - && is_uninitialized - { - stream.resume_stream(); - is_uninitialized = false; - } - } _ => {} } } diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index ec7139b975ade..5f9bd2c9f2c55 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -38,7 +38,6 @@ use risingwave_connector::source::{SplitId, SplitImpl, SplitMetaData}; use risingwave_hummock_sdk::key::next_key; use risingwave_pb::catalog::PbTable; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use crate::common::table::state_table::StateTable; @@ -92,24 +91,6 @@ impl SourceStateTableHandler { .map_err(StreamExecutorError::from) } - /// This source has not consumed any data yet. - pub async fn is_empty(&self) -> StreamExecutorResult { - assert_eq!( - self.state_table.vnodes(), - TableDistribution::singleton_vnode_bitmap_ref(), - "SourceExecutor's state table should have singleton distribution" - ); - let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); - - let state_table_iter = self - .state_table - .iter_with_prefix(None::, sub_range, Default::default()) - .await?; - pin_mut!(state_table_iter); - - Ok(state_table_iter.next().await.is_none()) - } - /// this method should only be used by [`FsSourceExecutor`](super::FsSourceExecutor) pub(crate) async fn get_all_completed(&self) -> StreamExecutorResult> { let start = Bound::Excluded(row::once(Some(Self::string_to_scalar(