From a3c6e48c67cf666d382c8e3fcb2afc451c8bcdc5 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Wed, 18 Sep 2024 14:33:19 +0800 Subject: [PATCH] refactor: unify to subscribe mutation via barrier sender (#18255) --- proto/stream_service.proto | 10 - src/meta/src/barrier/creating_job/mod.rs | 35 +-- src/meta/src/barrier/creating_job/status.rs | 4 +- src/meta/src/barrier/mod.rs | 8 - src/meta/src/barrier/recovery.rs | 1 - src/meta/src/barrier/rpc.rs | 17 +- .../executor/backfill/snapshot_backfill.rs | 63 ++-- src/stream/src/executor/dispatch.rs | 9 +- src/stream/src/executor/exchange/input.rs | 154 ++++------ src/stream/src/executor/integration_tests.rs | 276 +++++++++--------- src/stream/src/executor/merge.rs | 214 ++++++++++---- src/stream/src/executor/mod.rs | 1 + src/stream/src/executor/receiver.rs | 32 +- src/stream/src/executor/test_utils.rs | 16 +- src/stream/src/from_proto/merge.rs | 88 +++--- src/stream/src/from_proto/mod.rs | 2 +- src/stream/src/from_proto/stream_scan.rs | 31 +- src/stream/src/task/barrier_manager.rs | 33 +-- .../src/task/barrier_manager/managed_state.rs | 129 +------- src/stream/src/task/barrier_manager/tests.rs | 14 +- src/stream/src/task/mod.rs | 5 - src/stream/src/task/stream_manager.rs | 166 +++++++++-- 22 files changed, 621 insertions(+), 687 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 0f030b2898f3..c13ee8875b43 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -15,16 +15,6 @@ message InjectBarrierRequest { repeated uint32 actor_ids_to_collect = 4; repeated uint32 table_ids_to_sync = 5; uint32 partial_graph_id = 6; - // Actors in the partial graphs of the creating jobs that need to be pre-synced the barrier mutation to. - // - // This is required because in snapshot backfill, the snapshot backfill executor receive barriers from - // both local barrier manager and upstream. If we don't pre-sync the barrier mutations, when an input executor - // of an snapshot backfill actor receive a barrier, it will be blocked when trying the fetch the mutation - // of this upstream barrier. The reason for blocking is that, the snapshot backfill have slower progress, - // and therefore won't be synced with the mutation of barrier in upstream. To solve this issue of blocking, - // we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation, - // so that the input executor won't be blocked at waiting for the mutation of upstream barriers. - repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7; repeated common.ActorInfo broadcast_info = 8; repeated stream_plan.StreamActor actors_to_build = 9; diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 108f13197ed5..0e2a948b6eb4 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -16,7 +16,7 @@ mod barrier_control; mod status; use std::cmp::max; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::mem::take; use std::sync::Arc; use std::time::Duration; @@ -43,7 +43,6 @@ use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::{Command, CreateStreamingJobCommandInfo, SnapshotBackfillInfo}; use crate::manager::WorkerId; -use crate::model::ActorId; use crate::rpc::metrics::MetaMetrics; use crate::MetaResult; @@ -77,18 +76,6 @@ impl CreatingStreamingJobControl { let mut create_mview_tracker = CreateMviewProgressTracker::default(); create_mview_tracker.update_tracking_jobs(Some((&info, None)), [], version_stat); let fragment_info: HashMap<_, _> = info.new_fragment_info().collect(); - let snapshot_backfill_actors_set = info.table_fragments.snapshot_backfill_actor_ids(); - let mut snapshot_backfill_actors: HashMap<_, HashSet<_>> = HashMap::new(); - for fragment in fragment_info.values() { - for (actor_id, worker_node) in &fragment.actors { - if snapshot_backfill_actors_set.contains(actor_id) { - snapshot_backfill_actors - .entry(*worker_node) - .or_default() - .insert(*actor_id); - } - } - } let table_id = info.table_fragments.table_id(); let table_id_str = format!("{}", table_id.table_id); @@ -108,7 +95,6 @@ impl CreatingStreamingJobControl { graph_info: InflightGraphInfo::new(fragment_info), backfill_epoch, pending_non_checkpoint_barriers: vec![], - snapshot_backfill_actors, initial_barrier_info: Some((actors_to_create, initial_mutation)), }, upstream_lag: metrics @@ -139,22 +125,6 @@ impl CreatingStreamingJobControl { } } - pub(super) fn actors_to_pre_sync_barrier( - &self, - ) -> impl Iterator)> + '_ { - if let CreatingStreamingJobStatus::ConsumingSnapshot { - snapshot_backfill_actors, - .. - } = &self.status - { - Some(snapshot_backfill_actors) - } else { - None - } - .into_iter() - .flat_map(|actors| actors.iter()) - } - pub(super) fn gen_ddl_progress(&self) -> DdlProgress { let progress = match &self.status { CreatingStreamingJobStatus::ConsumingSnapshot { @@ -278,7 +248,6 @@ impl CreatingStreamingJobControl { &kind, graph_info, Some(graph_info), - HashMap::new(), new_actors, vec![], vec![], @@ -345,7 +314,6 @@ impl CreatingStreamingJobControl { &command_ctx.kind, graph_info, Some(graph_info), - HashMap::new(), None, vec![], vec![], @@ -394,7 +362,6 @@ impl CreatingStreamingJobControl { } else { Some(graph_info) }, - HashMap::new(), None, vec![], vec![], diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index 547347d8a085..6f205d7ced99 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::mem::take; use std::sync::Arc; @@ -27,7 +27,6 @@ use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::{BarrierKind, TracedEpoch}; use crate::manager::WorkerId; -use crate::model::ActorId; #[derive(Debug)] pub(super) enum CreatingStreamingJobStatus { @@ -40,7 +39,6 @@ pub(super) enum CreatingStreamingJobStatus { backfill_epoch: u64, /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, - snapshot_backfill_actors: HashMap>, /// Info of the first barrier: (`actors_to_create`, `mutation`) /// Take the mutation out when injecting the first barrier initial_barrier_info: Option<(HashMap>, Mutation)>, diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 3225278b7ae6..c1dfcaba9650 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1046,17 +1046,10 @@ impl GlobalBarrierManager { let table_ids_to_commit: HashSet<_> = pre_applied_graph_info.existing_table_ids().collect(); - let mut actors_to_pre_sync_barrier: HashMap<_, Vec<_>> = HashMap::new(); let mut jobs_to_wait = HashSet::new(); for (table_id, creating_job) in &mut self.checkpoint_control.creating_streaming_job_controls { - for (worker_id, actors) in creating_job.actors_to_pre_sync_barrier() { - actors_to_pre_sync_barrier - .entry(*worker_id) - .or_default() - .extend(actors.iter().cloned()) - } if let Some(wait_job) = creating_job.on_new_command(&mut self.control_stream_manager, &command_ctx)? { @@ -1071,7 +1064,6 @@ impl GlobalBarrierManager { &command_ctx, &pre_applied_graph_info, Some(&self.state.inflight_graph_info), - actors_to_pre_sync_barrier, ) { Ok(node_to_collect) => node_to_collect, Err(err) => { diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 7aa75d3e0e86..f74e6a23aa74 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -398,7 +398,6 @@ impl GlobalBarrierManager { &BarrierKind::Initial, &info, Some(&info), - HashMap::new(), Some(node_actors), vec![], vec![], diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index c86095f779f2..0177259486b7 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -23,7 +23,6 @@ use futures::stream::{BoxStream, FuturesUnordered}; use futures::StreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::hash::ActorId; use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::{ActorInfo, WorkerNode}; @@ -275,7 +274,6 @@ impl ControlStreamManager { command_ctx: &CommandContext, pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, - actor_ids_to_pre_sync_mutation: HashMap>, ) -> MetaResult> { let mutation = command_ctx.to_mutation(); let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation { @@ -295,14 +293,12 @@ impl ControlStreamManager { &command_ctx.kind, pre_applied_graph_info, applied_graph_info, - actor_ids_to_pre_sync_mutation, command_ctx.command.actors_to_create(), subscriptions_to_add, subscriptions_to_remove, ) } - #[expect(clippy::too_many_arguments)] pub(super) fn inject_barrier( &mut self, creating_table_id: Option, @@ -311,7 +307,6 @@ impl ControlStreamManager { kind: &BarrierKind, pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, - actor_ids_to_pre_sync_mutation: HashMap>, mut new_actors: Option>>, subscriptions_to_add: Vec, subscriptions_to_remove: Vec, @@ -321,10 +316,7 @@ impl ControlStreamManager { )); let partial_graph_id = creating_table_id - .map(|table_id| { - assert!(actor_ids_to_pre_sync_mutation.is_empty()); - table_id.table_id - }) + .map(|table_id| table_id.table_id) .unwrap_or(u32::MAX); for worker_id in pre_applied_graph_info @@ -401,13 +393,6 @@ impl ControlStreamManager { actor_ids_to_collect, table_ids_to_sync, partial_graph_id, - actor_ids_to_pre_sync_barrier_mutation: - actor_ids_to_pre_sync_mutation - .get(node_id) - .into_iter() - .flatten() - .cloned() - .collect(), broadcast_info: new_actors_location_to_broadcast.clone(), actors_to_build: new_actors .as_mut() diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 593a13df9cbc..a3cba25a0572 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -40,7 +40,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, - DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation, + DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgressReporter; @@ -50,7 +50,7 @@ pub struct SnapshotBackfillExecutor { upstream_table: StorageTable, /// Upstream with the same schema with the upstream table. - upstream: Executor, + upstream: MergeExecutorInput, /// The column indices need to be forwarded to the downstream from the upstream and table scan. output_indices: Vec, @@ -68,9 +68,9 @@ pub struct SnapshotBackfillExecutor { impl SnapshotBackfillExecutor { #[expect(clippy::too_many_arguments)] - pub fn new( + pub(crate) fn new( upstream_table: StorageTable, - upstream: Executor, + upstream: MergeExecutorInput, output_indices: Vec, actor_ctx: ActorContextRef, progress: CreateMviewProgressReporter, @@ -101,15 +101,14 @@ impl SnapshotBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let mut upstream = erase_upstream_mutation(self.upstream.execute()); let upstream_table_id = self.upstream_table.table_id(); - let first_barrier = expect_first_barrier(&mut upstream).await?; + let first_barrier = expect_first_barrier(&mut self.upstream).await?; debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier"); let should_backfill = first_barrier.epoch != first_recv_barrier.epoch; - { + let mut barrier_epoch = { if should_backfill { let subscriber_ids = first_recv_barrier .added_subscriber_on_mv_table(upstream_table_id) @@ -140,7 +139,7 @@ impl SnapshotBackfillExecutor { .with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]); let mut upstream_buffer = UpstreamBuffer::new( - &mut upstream, + &mut self.upstream, upstream_table_id, snapshot_backfill_table_fragment_id, consume_upstream_row_count, @@ -250,6 +249,7 @@ impl SnapshotBackfillExecutor { table_id = self.upstream_table.table_id().table_id, "finish consuming log store" ); + barrier_epoch } else { info!( table_id = self.upstream_table.table_id().table_id, @@ -258,19 +258,17 @@ impl SnapshotBackfillExecutor { let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; assert_eq!(first_barrier.epoch, first_recv_barrier.epoch); yield Message::Barrier(first_recv_barrier); + first_barrier.epoch } - } + }; + let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); // Phase 3: consume upstream while let Some(msg) = upstream.try_next().await? { - yield match msg { - DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk), - DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark), - DispatcherMessage::Barrier(barrier) => { - let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; - assert_eq!(barrier.epoch, recv_barrier.epoch); - Message::Barrier(recv_barrier) - } - }; + if let Message::Barrier(barrier) = &msg { + assert_eq!(barrier.epoch.prev, barrier_epoch.curr); + barrier_epoch = barrier.epoch; + } + yield msg; } } } @@ -404,33 +402,8 @@ impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { } } -mod erase_upstream_mutation { - use futures::TryStreamExt; - - use crate::executor::prelude::Stream; - use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem}; - - pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream { - upstream.map_ok(|msg| { - msg.map_mutation(|mutation| { - if let Some(mutation) = mutation { - // TODO: assert none mutation after we explicitly erase mutation - warn!( - ?mutation, - "receive non-empty mutation from upstream. ignored" - ); - }; - }) - }) - } - - pub(super) type UpstreamStream = impl Stream + Unpin; -} - -use erase_upstream_mutation::*; - struct UpstreamBuffer<'a, S> { - upstream: &'a mut UpstreamStream, + upstream: &'a mut MergeExecutorInput, state: S, consume_upstream_row_count: LabelGuardedIntCounter<3>, upstream_table_id: TableId, @@ -439,7 +412,7 @@ struct UpstreamBuffer<'a, S> { impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { fn new( - upstream: &'a mut UpstreamStream, + upstream: &'a mut MergeExecutorInput, upstream_table_id: TableId, current_subscriber_id: u32, consume_upstream_row_count: LabelGuardedIntCounter<3>, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index bb1db4662b0d..39f561633b8d 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1180,10 +1180,6 @@ mod tests { let actor_id = 233; let fragment_id = 666; let barrier_test_env = LocalBarrierTestEnv::for_test().await; - let input = Executor::new( - Default::default(), - ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(), - ); let ctx = Arc::new(SharedContext::for_test()); let metrics = Arc::new(StreamingMetrics::unused()); @@ -1252,6 +1248,11 @@ mod tests { .flush_all_events() .await; + let input = Executor::new( + Default::default(), + ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()) + .boxed(), + ); let executor = Box::new(DispatchExecutor::new( input, vec![broadcast_dispatcher, simple_dispatcher], diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 7ecac2c625e6..5437a4ae977c 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -24,14 +24,15 @@ use tokio::sync::mpsc; use super::permit::Receiver; use crate::executor::prelude::*; -use crate::executor::{DispatcherBarrier, DispatcherMessage}; -use crate::task::{ - FragmentId, LocalBarrierManager, SharedContext, UpDownActorIds, UpDownFragmentIds, +use crate::executor::{ + BarrierInner, DispatcherBarrier, DispatcherMessage, DispatcherMessageStream, + DispatcherMessageStreamItem, }; +use crate::task::{FragmentId, SharedContext, UpDownActorIds, UpDownFragmentIds}; /// `Input` provides an interface for [`MergeExecutor`](crate::executor::MergeExecutor) and /// [`ReceiverExecutor`](crate::executor::ReceiverExecutor) to receive data from upstream actors. -pub trait Input: MessageStream { +pub trait Input: DispatcherMessageStream { /// The upstream actor id. fn actor_id(&self) -> ActorId; @@ -62,56 +63,47 @@ pub struct LocalInput { actor_id: ActorId, } -async fn process_msg<'a>( - msg: DispatcherMessage, - get_mutation_subscriber: impl for<'b> FnOnce( - &'b DispatcherBarrier, - ) - -> &'a mut mpsc::UnboundedReceiver - + 'a, +pub(crate) fn assert_equal_dispatcher_barrier( + first: &BarrierInner, + second: &BarrierInner, +) { + assert_eq!(first.epoch, second.epoch); + assert_eq!(first.kind, second.kind); +} + +pub(crate) fn apply_dispatcher_barrier( + recv_barrier: &mut Barrier, + dispatcher_barrier: DispatcherBarrier, +) { + assert_equal_dispatcher_barrier(recv_barrier, &dispatcher_barrier); + recv_barrier + .passed_actors + .extend(dispatcher_barrier.passed_actors); +} + +pub(crate) async fn process_dispatcher_msg( + dispatcher_msg: DispatcherMessage, + barrier_rx: &mut mpsc::UnboundedReceiver, ) -> StreamExecutorResult { - let barrier = match msg { - DispatcherMessage::Chunk(c) => { - return Ok(Message::Chunk(c)); - } - DispatcherMessage::Barrier(b) => b, - DispatcherMessage::Watermark(watermark) => { - return Ok(Message::Watermark(watermark)); + let msg = match dispatcher_msg { + DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk), + DispatcherMessage::Barrier(barrier) => { + let mut recv_barrier = barrier_rx + .recv() + .await + .ok_or_else(|| anyhow!("end of barrier recv"))?; + apply_dispatcher_barrier(&mut recv_barrier, barrier); + Message::Barrier(recv_barrier) } + DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark), }; - let mutation_subscriber = get_mutation_subscriber(&barrier); - - let mutation = mutation_subscriber - .recv() - .await - .ok_or_else(|| anyhow!("failed to receive mutation of barrier {:?}", barrier)) - .map(|(prev_epoch, mutation)| { - assert_eq!(prev_epoch, barrier.epoch.prev); - mutation - })?; - Ok(Message::Barrier(Barrier { - epoch: barrier.epoch, - mutation, - kind: barrier.kind, - tracing_context: barrier.tracing_context, - passed_actors: barrier.passed_actors, - })) + Ok(msg) } impl LocalInput { - pub fn new( - channel: Receiver, - upstream_actor_id: ActorId, - self_actor_id: ActorId, - local_barrier_manager: LocalBarrierManager, - ) -> Self { + pub fn new(channel: Receiver, upstream_actor_id: ActorId) -> Self { Self { - inner: local_input::run( - channel, - upstream_actor_id, - self_actor_id, - local_barrier_manager, - ), + inner: local_input::run(channel, upstream_actor_id), actor_id: upstream_actor_id, } } @@ -121,44 +113,22 @@ mod local_input { use await_tree::InstrumentAwait; use crate::executor::exchange::error::ExchangeChannelClosed; - use crate::executor::exchange::input::process_msg; use crate::executor::exchange::permit::Receiver; use crate::executor::prelude::try_stream; - use crate::executor::{Message, StreamExecutorError}; - use crate::task::{ActorId, LocalBarrierManager}; + use crate::executor::{DispatcherMessage, StreamExecutorError}; + use crate::task::ActorId; - pub(super) type LocalInputStreamInner = impl crate::executor::MessageStream; + pub(super) type LocalInputStreamInner = impl crate::executor::DispatcherMessageStream; - pub(super) fn run( - channel: Receiver, - upstream_actor_id: ActorId, - self_actor_id: ActorId, - local_barrier_manager: LocalBarrierManager, - ) -> LocalInputStreamInner { - run_inner( - channel, - upstream_actor_id, - self_actor_id, - local_barrier_manager, - ) + pub(super) fn run(channel: Receiver, upstream_actor_id: ActorId) -> LocalInputStreamInner { + run_inner(channel, upstream_actor_id) } - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn run_inner( - mut channel: Receiver, - upstream_actor_id: ActorId, - self_actor_id: ActorId, - local_barrier_manager: LocalBarrierManager, - ) { + #[try_stream(ok = DispatcherMessage, error = StreamExecutorError)] + async fn run_inner(mut channel: Receiver, upstream_actor_id: ActorId) { let span: await_tree::Span = format!("LocalInput (actor {upstream_actor_id})").into(); - let mut mutation_subscriber = None; while let Some(msg) = channel.recv().verbose_instrument_await(span.clone()).await { - yield process_msg(msg, |barrier| { - mutation_subscriber.get_or_insert_with(|| { - local_barrier_manager.subscribe_barrier_mutation(self_actor_id, barrier) - }) - }) - .await?; + yield msg; } // Always emit an error outside the loop. This is because we use barrier as the control // message to stop the stream. Reaching here means the channel is closed unexpectedly. @@ -167,7 +137,7 @@ mod local_input { } impl Stream for LocalInput { - type Item = MessageStreamItem; + type Item = DispatcherMessageStreamItem; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // TODO: shall we pass the error with local exchange? @@ -196,7 +166,6 @@ impl RemoteInput { /// Create a remote input from compute client and related info. Should provide the corresponding /// compute client of where the actor is placed. pub fn new( - local_barrier_manager: LocalBarrierManager, client_pool: ComputeClientPool, upstream_addr: HostAddr, up_down_ids: UpDownActorIds, @@ -209,7 +178,6 @@ impl RemoteInput { Self { actor_id, inner: remote_input::run( - local_barrier_manager, client_pool, upstream_addr, up_down_ids, @@ -231,16 +199,14 @@ mod remote_input { use risingwave_rpc_client::ComputeClientPool; use crate::executor::exchange::error::ExchangeChannelClosed; - use crate::executor::exchange::input::process_msg; use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{pin_mut, try_stream, StreamExt}; - use crate::executor::{DispatcherMessage, Message, StreamExecutorError}; - use crate::task::{LocalBarrierManager, UpDownActorIds, UpDownFragmentIds}; + use crate::executor::{DispatcherMessage, StreamExecutorError}; + use crate::task::{UpDownActorIds, UpDownFragmentIds}; - pub(super) type RemoteInputStreamInner = impl crate::executor::MessageStream; + pub(super) type RemoteInputStreamInner = impl crate::executor::DispatcherMessageStream; pub(super) fn run( - local_barrier_manager: LocalBarrierManager, client_pool: ComputeClientPool, upstream_addr: HostAddr, up_down_ids: UpDownActorIds, @@ -249,7 +215,6 @@ mod remote_input { batched_permits_limit: usize, ) -> RemoteInputStreamInner { run_inner( - local_barrier_manager, client_pool, upstream_addr, up_down_ids, @@ -259,9 +224,8 @@ mod remote_input { ) } - #[try_stream(ok = Message, error = StreamExecutorError)] + #[try_stream(ok = DispatcherMessage, error = StreamExecutorError)] async fn run_inner( - local_barrier_manager: LocalBarrierManager, client_pool: ComputeClientPool, upstream_addr: HostAddr, up_down_ids: UpDownActorIds, @@ -269,7 +233,6 @@ mod remote_input { metrics: Arc, batched_permits_limit: usize, ) { - let self_actor_id = up_down_ids.1; let client = client_pool.get_by_addr(upstream_addr).await?; let (stream, permits_tx) = client .get_stream(up_down_ids.0, up_down_ids.1, up_down_frag.0, up_down_frag.1) @@ -285,7 +248,6 @@ mod remote_input { let span: await_tree::Span = format!("RemoteInput (actor {up_actor_id})").into(); let mut batched_permits_accumulated = 0; - let mut mutation_subscriber = None; pin_mut!(stream); while let Some(data_res) = stream.next().verbose_instrument_await(span.clone()).await { @@ -320,12 +282,7 @@ mod remote_input { let msg = msg_res.context("RemoteInput decode message error")?; - yield process_msg(msg, |barrier| { - mutation_subscriber.get_or_insert_with(|| { - local_barrier_manager.subscribe_barrier_mutation(self_actor_id, barrier) - }) - }) - .await?; + yield msg; } Err(e) => Err(ExchangeChannelClosed::remote_input(up_down_ids.0, Some(e)))?, @@ -339,7 +296,7 @@ mod remote_input { } impl Stream for RemoteInput { - type Item = MessageStreamItem; + type Item = DispatcherMessageStreamItem; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().inner.poll_next(cx) @@ -371,13 +328,10 @@ pub(crate) fn new_input( LocalInput::new( context.take_receiver((upstream_actor_id, actor_id))?, upstream_actor_id, - actor_id, - context.local_barrier_manager.clone(), ) .boxed_input() } else { RemoteInput::new( - context.local_barrier_manager.clone(), context.compute_client_pool.as_ref().to_owned(), upstream_addr, (upstream_actor_id, actor_id), diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 13e9a67d1c52..6ea34de85721 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -64,41 +64,45 @@ async fn test_merger_sum_aggr() { let input_schema = Schema { fields: vec![Field::unnamed(DataType::Int64)], }; - let input = Executor::new( - ExecutorInfo { - schema: input_schema, - pk_indices: PkIndices::new(), - identity: "ReceiverExecutor".to_string(), - }, - ReceiverExecutor::for_test(actor_id, input_rx, barrier_test_env.shared_context.clone()) - .boxed(), - ); - let agg_calls = vec![ - AggCall::from_pretty("(count:int8)"), - AggCall::from_pretty("(sum:int8 $0:int8)"), - ]; - let schema = generate_agg_schema(&input, &agg_calls, None); - // for the local aggregator, we need two states: row count and sum - let aggregator = - StatelessSimpleAggExecutor::new(actor_ctx.clone(), input, schema, agg_calls).unwrap(); + let shared_context = barrier_test_env.shared_context.clone(); + let expr_context = expr_context.clone(); let (tx, rx) = channel_for_test(); - let consumer = SenderConsumer { - input: aggregator.boxed(), - channel: Box::new(LocalOutput::new(233, tx)), - }; + let actor_future = async move { + let input = Executor::new( + ExecutorInfo { + schema: input_schema, + pk_indices: PkIndices::new(), + identity: "ReceiverExecutor".to_string(), + }, + ReceiverExecutor::for_test(actor_id, input_rx, shared_context.clone()).boxed(), + ); + let agg_calls = vec![ + AggCall::from_pretty("(count:int8)"), + AggCall::from_pretty("(sum:int8 $0:int8)"), + ]; + let schema = generate_agg_schema(&input, &agg_calls, None); + // for the local aggregator, we need two states: row count and sum + let aggregator = + StatelessSimpleAggExecutor::new(actor_ctx.clone(), input, schema, agg_calls) + .unwrap(); + let consumer = SenderConsumer { + input: aggregator.boxed(), + channel: Box::new(LocalOutput::new(233, tx)), + }; - let actor = Actor::new( - consumer, - vec![], - StreamingMetrics::unused().into(), - actor_ctx, - expr_context.clone(), - barrier_test_env - .shared_context - .local_barrier_manager - .clone(), - ); - (actor, rx) + let actor = Actor::new( + consumer, + vec![], + StreamingMetrics::unused().into(), + actor_ctx, + expr_context, + shared_context.local_barrier_manager.clone(), + ); + + actor.run().await + } + .boxed(); + (actor_future, rx) }; // join handles of all actors @@ -113,9 +117,9 @@ async fn test_merger_sum_aggr() { // create 17 local aggregation actors for _ in 0..17 { let (tx, rx) = channel_for_test(); - let (actor, channel) = make_actor(rx); + let (actor_future, channel) = make_actor(rx); outputs.push(channel); - actor_futures.push(actor.run().boxed()); + actor_futures.push(actor_future); inputs.push(Box::new(LocalOutput::new(233, tx)) as BoxedOutput); } @@ -123,111 +127,117 @@ async fn test_merger_sum_aggr() { let actor_id = gen_next_actor_id(); let (input, rx) = channel_for_test(); - let receiver_op = Executor::new( - ExecutorInfo { - // input schema of local simple agg - schema: Schema::new(vec![Field::unnamed(DataType::Int64)]), - pk_indices: PkIndices::new(), - identity: "ReceiverExecutor".to_string(), - }, - ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()).boxed(), - ); - let dispatcher = DispatchExecutor::new( - receiver_op, - vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new( - inputs, - vec![0], - 0, - ))], - 0, - 0, - barrier_test_env.shared_context.clone(), - metrics, - config::default::developer::stream_chunk_size(), - ); - let actor = Actor::new( - dispatcher, - vec![], - StreamingMetrics::unused().into(), - ActorContext::for_test(actor_id), - expr_context.clone(), - barrier_test_env - .shared_context - .local_barrier_manager - .clone(), - ); - actor_futures.push(actor.run().boxed()); + let actor_future = { + let shared_context = barrier_test_env.shared_context.clone(); + let expr_context = expr_context.clone(); + async move { + let receiver_op = Executor::new( + ExecutorInfo { + // input schema of local simple agg + schema: Schema::new(vec![Field::unnamed(DataType::Int64)]), + pk_indices: PkIndices::new(), + identity: "ReceiverExecutor".to_string(), + }, + ReceiverExecutor::for_test(actor_id, rx, shared_context.clone()).boxed(), + ); + let dispatcher = DispatchExecutor::new( + receiver_op, + vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new( + inputs, + vec![0], + 0, + ))], + 0, + 0, + shared_context.clone(), + metrics, + config::default::developer::stream_chunk_size(), + ); + let actor = Actor::new( + dispatcher, + vec![], + StreamingMetrics::unused().into(), + ActorContext::for_test(actor_id), + expr_context, + shared_context.local_barrier_manager.clone(), + ); + actor.run().await + } + .boxed() + }; + actor_futures.push(actor_future); let actor_ctx = ActorContext::for_test(gen_next_actor_id()); - // use a merge operator to collect data from dispatchers before sending them to aggregator - let merger = Executor::new( - ExecutorInfo { - // output schema of local simple agg - schema: Schema::new(vec![ - Field::unnamed(DataType::Int64), - Field::unnamed(DataType::Int64), - ]), - pk_indices: PkIndices::new(), - identity: "MergeExecutor".to_string(), - }, - MergeExecutor::for_test( - actor_ctx.id, - outputs, - barrier_test_env.shared_context.clone(), - ) - .boxed(), - ); + let items = Arc::new(Mutex::new(vec![])); + let actor_future = { + let shared_context = barrier_test_env.shared_context.clone(); + let expr_context = expr_context.clone(); + let items = items.clone(); + async move { + // use a merge operator to collect data from dispatchers before sending them to aggregator + let merger = Executor::new( + ExecutorInfo { + // output schema of local simple agg + schema: Schema::new(vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ]), + pk_indices: PkIndices::new(), + identity: "MergeExecutor".to_string(), + }, + MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone()).boxed(), + ); - // for global aggregator, we need to sum data and sum row count - let is_append_only = false; - let aggregator = new_boxed_simple_agg_executor( - actor_ctx.clone(), - MemoryStateStore::new(), - merger, - is_append_only, - vec![ - AggCall::from_pretty("(sum0:int8 $0:int8)"), - AggCall::from_pretty("(sum:int8 $1:int8)"), - AggCall::from_pretty("(count:int8)"), - ], - 2, // row_count_index - vec![], - 2, - false, - ) - .await; + // for global aggregator, we need to sum data and sum row count + let is_append_only = false; + let aggregator = new_boxed_simple_agg_executor( + actor_ctx.clone(), + MemoryStateStore::new(), + merger, + is_append_only, + vec![ + AggCall::from_pretty("(sum0:int8 $0:int8)"), + AggCall::from_pretty("(sum:int8 $1:int8)"), + AggCall::from_pretty("(count:int8)"), + ], + 2, // row_count_index + vec![], + 2, + false, + ) + .await; - let projection = ProjectExecutor::new( - actor_ctx.clone(), - aggregator, - vec![ - // TODO: use the new streaming_if_null expression here, and add `None` tests - NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)), - ], - MultiMap::new(), - vec![], - 0.0, - false, - ); + let projection = ProjectExecutor::new( + actor_ctx.clone(), + aggregator, + vec![ + // TODO: use the new streaming_if_null expression here, and add `None` tests + NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)), + ], + MultiMap::new(), + vec![], + 0.0, + false, + ); - let items = Arc::new(Mutex::new(vec![])); - let consumer = MockConsumer { - input: projection.boxed(), - data: items.clone(), + let consumer = MockConsumer { + input: projection.boxed(), + data: items.clone(), + }; + let actor = Actor::new( + consumer, + vec![], + StreamingMetrics::unused().into(), + actor_ctx.clone(), + expr_context, + shared_context.local_barrier_manager.clone(), + ); + actor.run().await + } + .boxed() }; - let actor = Actor::new( - consumer, - vec![], - StreamingMetrics::unused().into(), - actor_ctx.clone(), - expr_context.clone(), - barrier_test_env - .shared_context - .local_barrier_manager - .clone(), - ); - actor_futures.push(actor.run().boxed()); + actor_futures.push(actor_future); let mut epoch = test_epoch(1); let b1 = Barrier::new_test_barrier(epoch); diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index d45d75604fa5..0316f7cf3679 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -21,15 +21,90 @@ use futures::stream::{FusedStream, FuturesUnordered, StreamFuture}; use prometheus::Histogram; use risingwave_common::config::MetricLevel; use risingwave_common::metrics::LabelGuardedMetric; +use tokio::sync::mpsc; use tokio::time::Instant; use super::exchange::input::BoxedInput; use super::watermark::*; use super::*; -use crate::executor::exchange::input::new_input; +use crate::executor::exchange::input::{ + assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg, +}; use crate::executor::prelude::*; use crate::task::SharedContext; +pub(crate) enum MergeExecutorUpstream { + Singleton(BoxedInput), + Merge(SelectReceivers), +} + +pub(crate) struct MergeExecutorInput { + upstream: MergeExecutorUpstream, + actor_context: ActorContextRef, + upstream_fragment_id: UpstreamFragmentId, + shared_context: Arc, + executor_stats: Arc, + info: ExecutorInfo, +} + +impl MergeExecutorInput { + pub(crate) fn new( + upstream: MergeExecutorUpstream, + actor_context: ActorContextRef, + upstream_fragment_id: UpstreamFragmentId, + shared_context: Arc, + executor_stats: Arc, + info: ExecutorInfo, + ) -> Self { + Self { + upstream, + actor_context, + upstream_fragment_id, + shared_context, + executor_stats, + info, + } + } + + pub(crate) fn into_executor(self, barrier_rx: mpsc::UnboundedReceiver) -> Executor { + let fragment_id = self.actor_context.fragment_id; + let executor = match self.upstream { + MergeExecutorUpstream::Singleton(input) => ReceiverExecutor::new( + self.actor_context, + fragment_id, + self.upstream_fragment_id, + input, + self.shared_context, + self.executor_stats, + barrier_rx, + ) + .boxed(), + MergeExecutorUpstream::Merge(inputs) => MergeExecutor::new( + self.actor_context, + fragment_id, + self.upstream_fragment_id, + inputs, + self.shared_context, + self.executor_stats, + barrier_rx, + ) + .boxed(), + }; + (self.info, executor).into() + } +} + +impl Stream for MergeExecutorInput { + type Item = DispatcherMessageStreamItem; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut self.get_mut().upstream { + MergeExecutorUpstream::Singleton(input) => input.poll_next_unpin(cx), + MergeExecutorUpstream::Merge(inputs) => inputs.poll_next_unpin(cx), + } + } +} + /// `MergeExecutor` merges data from multiple channels. Dataflow from one channel /// will be stopped on barrier. pub struct MergeExecutor { @@ -37,7 +112,7 @@ pub struct MergeExecutor { actor_context: ActorContextRef, /// Upstream channels. - upstreams: Vec, + upstreams: SelectReceivers, /// Belonged fragment id. fragment_id: FragmentId, @@ -50,6 +125,8 @@ pub struct MergeExecutor { /// Streaming metrics. metrics: Arc, + + barrier_rx: mpsc::UnboundedReceiver, } impl MergeExecutor { @@ -58,18 +135,19 @@ impl MergeExecutor { ctx: ActorContextRef, fragment_id: FragmentId, upstream_fragment_id: FragmentId, - inputs: Vec, + upstreams: SelectReceivers, context: Arc, - _receiver_id: u64, metrics: Arc, + barrier_rx: mpsc::UnboundedReceiver, ) -> Self { Self { actor_context: ctx, - upstreams: inputs, + upstreams, fragment_id, upstream_fragment_id, context, metrics, + barrier_rx, } } @@ -82,38 +160,45 @@ impl MergeExecutor { use super::exchange::input::LocalInput; use crate::executor::exchange::input::Input; - Self::new( - ActorContext::for_test(actor_id), - 514, - 1919, + let barrier_rx = shared_context + .local_barrier_manager + .subscribe_barrier(actor_id); + + let metrics = StreamingMetrics::unused(); + let actor_ctx = ActorContext::for_test(actor_id); + let upstream = Self::new_select_receiver( inputs .into_iter() .enumerate() - .map(|(idx, input)| { - LocalInput::new( - input, - idx as ActorId, - actor_id, - shared_context.local_barrier_manager.clone(), - ) - .boxed_input() - }) + .map(|(idx, input)| LocalInput::new(input, idx as ActorId).boxed_input()) .collect(), + &metrics, + &actor_ctx, + ); + + Self::new( + actor_ctx, + 514, + 1919, + upstream, shared_context, - 810, - StreamingMetrics::unused().into(), + metrics.into(), + barrier_rx, ) } - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(mut self: Box) { - let merge_barrier_align_duration = if self.metrics.level >= MetricLevel::Debug { + pub(crate) fn new_select_receiver( + upstreams: Vec, + metrics: &StreamingMetrics, + actor_context: &ActorContext, + ) -> SelectReceivers { + let merge_barrier_align_duration = if metrics.level >= MetricLevel::Debug { Some( - self.metrics + metrics .merge_barrier_align_duration .with_guarded_label_values(&[ - &self.actor_context.id.to_string(), - &self.actor_context.fragment_id.to_string(), + &actor_context.id.to_string(), + &actor_context.fragment_id.to_string(), ]), ) } else { @@ -121,11 +206,16 @@ impl MergeExecutor { }; // Futures of all active upstreams. - let select_all = SelectReceivers::new( - self.actor_context.id, - self.upstreams, + SelectReceivers::new( + actor_context.id, + upstreams, merge_barrier_align_duration.clone(), - ); + ) + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_inner(mut self: Box) { + let select_all = self.upstreams; let actor_id = self.actor_context.id; let mut metrics = self.metrics.new_actor_input_metrics( @@ -141,7 +231,8 @@ impl MergeExecutor { metrics .actor_input_buffer_blocking_duration_ns .inc_by(start_time.elapsed().as_nanos() as u64); - let mut msg: Message = msg?; + let msg: DispatcherMessage = msg?; + let mut msg: Message = process_dispatcher_msg(msg, &mut self.barrier_rx).await?; match &mut msg { Message::Watermark(_) => { @@ -217,10 +308,10 @@ impl MergeExecutor { let mut select_new = SelectReceivers::new( self.actor_context.id, new_upstreams, - merge_barrier_align_duration.clone(), + select_all.merge_barrier_align_duration(), ); let new_barrier = expect_first_barrier(&mut select_new).await?; - assert_eq!(barrier, &new_barrier); + assert_equal_dispatcher_barrier(barrier, &new_barrier); // Add the new upstreams to select. select_all.add_upstreams_from(select_new); @@ -254,6 +345,11 @@ impl MergeExecutor { select_all.update_actor_ids(); } + + if barrier.is_stop(actor_id) { + yield msg; + break; + } } } @@ -272,7 +368,7 @@ impl Execute for MergeExecutor { /// A stream for merging messages from multiple upstreams. pub struct SelectReceivers { /// The barrier we're aligning to. If this is `None`, then `blocked_upstreams` is empty. - barrier: Option, + barrier: Option, /// The upstreams that're blocked by the `barrier`. blocked: Vec, /// The upstreams that're not blocked and can be polled. @@ -289,7 +385,7 @@ pub struct SelectReceivers { } impl Stream for SelectReceivers { - type Item = std::result::Result; + type Item = DispatcherMessageStreamItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.active.is_terminated() { @@ -309,19 +405,21 @@ impl Stream for SelectReceivers { Some((Some(Ok(message)), remaining)) => { let actor_id = remaining.actor_id(); match message { - Message::Chunk(chunk) => { + DispatcherMessage::Chunk(chunk) => { // Continue polling this upstream by pushing it back to `active`. self.active.push(remaining.into_future()); - return Poll::Ready(Some(Ok(Message::Chunk(chunk)))); + return Poll::Ready(Some(Ok(DispatcherMessage::Chunk(chunk)))); } - Message::Watermark(watermark) => { + DispatcherMessage::Watermark(watermark) => { // Continue polling this upstream by pushing it back to `active`. self.active.push(remaining.into_future()); if let Some(watermark) = self.handle_watermark(actor_id, watermark) { - return Poll::Ready(Some(Ok(Message::Watermark(watermark)))); + return Poll::Ready(Some(Ok(DispatcherMessage::Watermark( + watermark, + )))); } } - Message::Barrier(barrier) => { + DispatcherMessage::Barrier(barrier) => { // Block this upstream by pushing it to `blocked`. if self.blocked.is_empty() && self.merge_barrier_align_duration.is_some() @@ -333,8 +431,8 @@ impl Stream for SelectReceivers { if current_barrier.epoch != barrier.epoch { return Poll::Ready(Some(Err( StreamExecutorError::align_barrier( - current_barrier.clone(), - barrier, + current_barrier.clone().map_mutation(|_| None), + barrier.map_mutation(|_| None), ), ))); } @@ -369,17 +467,11 @@ impl Stream for SelectReceivers { assert!(self.active.is_terminated()); let barrier = self.barrier.take().unwrap(); - // If this barrier asks the actor to stop, we do not reset the active upstreams so that the - // next call would return `Poll::Ready(None)` due to `is_terminated`. let upstreams = std::mem::take(&mut self.blocked); - if barrier.is_stop(self.actor_id) { - drop(upstreams); - } else { - self.extend_active(upstreams); - assert!(!self.active.is_terminated()); - } + self.extend_active(upstreams); + assert!(!self.active.is_terminated()); - Poll::Ready(Some(Ok(Message::Barrier(barrier)))) + Poll::Ready(Some(Ok(DispatcherMessage::Barrier(barrier)))) } } @@ -462,6 +554,10 @@ impl SelectReceivers { .filter(|u| !upstream_actor_ids.contains(&u.actor_id())); self.extend_active(new_upstreams); } + + fn merge_barrier_align_duration(&self) -> Option> { + self.merge_barrier_align_duration.clone() + } } #[cfg(test)] @@ -509,8 +605,7 @@ mod tests { rxs.push(rx); } let barrier_test_env = LocalBarrierTestEnv::for_test().await; - let merger = MergeExecutor::for_test(233, rxs, barrier_test_env.shared_context.clone()); - let actor_id = merger.actor_context.id; + let actor_id = 233; let mut handles = Vec::with_capacity(CHANNEL_NUMBER); let epochs = (10..1000u64) @@ -568,6 +663,8 @@ mod tests { handles.push(handle); } + let merger = + MergeExecutor::for_test(actor_id, rxs, barrier_test_env.shared_context.clone()); let mut merger = merger.boxed().execute(); for (idx, epoch) in epochs { // expect n chunks @@ -666,14 +763,18 @@ mod tests { .flush_all_events() .await; + let barrier_rx = ctx.local_barrier_manager.subscribe_barrier(actor_id); + let actor_ctx = ActorContext::for_test(actor_id); + let upstream = MergeExecutor::new_select_receiver(inputs, &metrics, &actor_ctx); + let mut merge = MergeExecutor::new( - ActorContext::for_test(actor_id), + actor_ctx, fragment_id, upstream_fragment_id, - inputs, + upstream, ctx.clone(), - 233, metrics.clone(), + barrier_rx, ) .boxed() .execute(); @@ -820,7 +921,6 @@ mod tests { let remote_input = { let pool = ComputeClientPool::for_test(); RemoteInput::new( - test_env.shared_context.local_barrier_manager.clone(), pool, addr.into(), (0, 0), diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 3d1ca35b6d61..a053e7dc5021 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -138,6 +138,7 @@ pub use join::{AsOfDesc, AsOfJoinType, JoinType}; pub use lookup::*; pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; +pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream}; pub use mview::*; pub use no_op::NoOpExecutor; pub use now::*; diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index 9a99e59214bd..c3fd4f9f7e7e 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -14,11 +14,15 @@ use anyhow::Context; use itertools::Itertools; +use tokio::sync::mpsc; use tokio::time::Instant; use super::exchange::input::BoxedInput; -use crate::executor::exchange::input::new_input; +use crate::executor::exchange::input::{ + assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg, +}; use crate::executor::prelude::*; +use crate::executor::DispatcherMessage; use crate::task::{FragmentId, SharedContext}; /// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel, @@ -42,6 +46,8 @@ pub struct ReceiverExecutor { /// Metrics metrics: Arc, + + barrier_rx: mpsc::UnboundedReceiver, } impl std::fmt::Debug for ReceiverExecutor { @@ -58,8 +64,8 @@ impl ReceiverExecutor { upstream_fragment_id: FragmentId, input: BoxedInput, context: Arc, - _receiver_id: u64, metrics: Arc, + barrier_rx: mpsc::UnboundedReceiver, ) -> Self { Self { input, @@ -68,6 +74,7 @@ impl ReceiverExecutor { metrics, fragment_id, context, + barrier_rx, } } @@ -80,20 +87,18 @@ impl ReceiverExecutor { use super::exchange::input::LocalInput; use crate::executor::exchange::input::Input; + let barrier_rx = shared_context + .local_barrier_manager + .subscribe_barrier(actor_id); + Self::new( ActorContext::for_test(actor_id), 514, 1919, - LocalInput::new( - input, - 0, - actor_id, - shared_context.local_barrier_manager.clone(), - ) - .boxed_input(), + LocalInput::new(input, 0).boxed_input(), shared_context, - 810, StreamingMetrics::unused().into(), + barrier_rx, ) } } @@ -115,7 +120,8 @@ impl Execute for ReceiverExecutor { metrics .actor_input_buffer_blocking_duration_ns .inc_by(start_time.elapsed().as_nanos() as u64); - let mut msg: Message = msg?; + let msg: DispatcherMessage = msg?; + let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?; match &mut msg { Message::Watermark(_) => { @@ -171,7 +177,7 @@ impl Execute for ReceiverExecutor { // Poll the first barrier from the new upstream. It must be the same as // the one we polled from original upstream. let new_barrier = expect_first_barrier(&mut new_upstream).await?; - assert_eq!(barrier, &new_barrier); + assert_equal_dispatcher_barrier(barrier, &new_barrier); // Replace the input. self.input = new_upstream; @@ -276,8 +282,8 @@ mod tests { upstream_fragment_id, input, ctx.clone(), - 233, metrics.clone(), + ctx.local_barrier_manager.subscribe_barrier(actor_id), ) .boxed() .execute(); diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 65347e18e525..f4e0f40761aa 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -325,13 +325,11 @@ pub mod agg_executor { agg_call: &AggCall, group_key_indices: &[usize], pk_indices: &[usize], - input_ref: &Executor, + input_fields: Vec, is_append_only: bool, ) -> AggStateStorage { match agg_call.kind { AggKind::Builtin(PbAggKind::Min | PbAggKind::Max) if !is_append_only => { - let input_fields = input_ref.schema().fields(); - let mut column_descs = Vec::new(); let mut order_types = Vec::new(); let mut upstream_columns = Vec::new(); @@ -402,10 +400,8 @@ pub mod agg_executor { table_id: TableId, agg_calls: &[AggCall], group_key_indices: &[usize], - input_ref: &Executor, + input_fields: Vec, ) -> StateTable { - let input_fields = input_ref.schema().fields(); - let mut column_descs = Vec::new(); let mut order_types = Vec::new(); @@ -464,7 +460,7 @@ pub mod agg_executor { agg_call, &group_key_indices, &pk_indices, - &input, + input.info.schema.fields.clone(), is_append_only, ) .await, @@ -476,7 +472,7 @@ pub mod agg_executor { TableId::new(agg_calls.len() as u32), &agg_calls, &group_key_indices, - &input, + input.info.schema.fields.clone(), ) .await; @@ -533,7 +529,7 @@ pub mod agg_executor { agg_call, &[], &pk_indices, - &input, + input.info.schema.fields.clone(), is_append_only, ) })) @@ -544,7 +540,7 @@ pub mod agg_executor { TableId::new(agg_calls.len() as u32), &agg_calls, &[], - &input, + input.info.schema.fields.clone(), ) .await; diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index eded1f59d294..d6c7ce157a93 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -12,22 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use risingwave_pb::stream_plan::{DispatcherType, MergeNode}; use super::*; use crate::executor::exchange::input::new_input; -use crate::executor::{MergeExecutor, ReceiverExecutor}; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream}; +use crate::task::SharedContext; pub struct MergeExecutorBuilder; -impl ExecutorBuilder for MergeExecutorBuilder { - type Node = MergeNode; - - async fn new_boxed_executor( - params: ExecutorParams, - node: &Self::Node, - _store: impl StateStore, - ) -> StreamResult { +impl MergeExecutorBuilder { + pub(crate) fn new_input( + shared_context: Arc, + executor_stats: Arc, + actor_context: ActorContextRef, + info: ExecutorInfo, + node: &MergeNode, + ) -> StreamResult { let upstreams = node.get_upstream_actor_id(); let upstream_fragment_id = node.get_upstream_fragment_id(); @@ -35,10 +39,10 @@ impl ExecutorBuilder for MergeExecutorBuilder { .iter() .map(|&upstream_actor_id| { new_input( - ¶ms.shared_context, - params.executor_stats.clone(), - params.actor_context.id, - params.fragment_id, + &shared_context, + executor_stats.clone(), + actor_context.id, + actor_context.fragment_id, upstream_actor_id, upstream_fragment_id, ) @@ -56,29 +60,45 @@ impl ExecutorBuilder for MergeExecutorBuilder { DispatcherType::NoShuffle => true, }; - let exec = if always_single_input { - ReceiverExecutor::new( - params.actor_context, - params.fragment_id, - upstream_fragment_id, - inputs.into_iter().exactly_one().unwrap(), - params.shared_context.clone(), - params.operator_id, - params.executor_stats.clone(), - ) - .boxed() + let upstreams = if always_single_input { + MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap()) } else { - MergeExecutor::new( - params.actor_context, - params.fragment_id, - upstream_fragment_id, + MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver( inputs, - params.shared_context.clone(), - params.operator_id, - params.executor_stats.clone(), - ) - .boxed() + &executor_stats, + &actor_context, + )) }; - Ok((params.info, exec).into()) + Ok(MergeExecutorInput::new( + upstreams, + actor_context, + upstream_fragment_id, + shared_context, + executor_stats, + info, + )) + } +} + +impl ExecutorBuilder for MergeExecutorBuilder { + type Node = MergeNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + _store: impl StateStore, + ) -> StreamResult { + let barrier_rx = params + .shared_context + .local_barrier_manager + .subscribe_barrier(params.actor_context.id); + Ok(Self::new_input( + params.shared_context, + params.executor_stats, + params.actor_context, + params.info, + node, + )? + .into_executor(barrier_rx)) } } diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 1f63b6cd5db8..9a51dd10ddfb 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -81,7 +81,7 @@ use self::hash_join::*; use self::hop_window::*; use self::lookup::*; use self::lookup_union::*; -use self::merge::*; +pub(crate) use self::merge::MergeExecutorBuilder; use self::mview::*; use self::no_op::*; use self::now::NowExecutorBuilder; diff --git a/src/stream/src/from_proto/stream_scan.rs b/src/stream/src/from_proto/stream_scan.rs index eaa436752455..e5428d54fa38 100644 --- a/src/stream/src/from_proto/stream_scan.rs +++ b/src/stream/src/from_proto/stream_scan.rs @@ -25,7 +25,7 @@ use super::*; use crate::common::table::state_table::{ReplicatedStateTable, StateTable}; use crate::executor::{ ArrangementBackfillExecutor, BackfillExecutor, ChainExecutor, RearrangedChainExecutor, - SnapshotBackfillExecutor, TroublemakerExecutor, + TroublemakerExecutor, }; pub struct StreamScanExecutorBuilder; @@ -144,34 +144,7 @@ impl ExecutorBuilder for StreamScanExecutorBuilder { } } StreamScanType::SnapshotBackfill => { - let table_desc: &StorageTableDesc = node.get_table_desc()?; - - let column_ids = node - .upstream_column_ids - .iter() - .map(ColumnId::from) - .collect_vec(); - - let vnodes = params.vnode_bitmap.map(Arc::new); - let barrier_rx = params - .shared_context - .local_barrier_manager - .subscribe_barrier(params.actor_context.id); - - let upstream_table = - StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc); - SnapshotBackfillExecutor::new( - upstream_table, - upstream, - output_indices, - params.actor_context.clone(), - progress, - params.env.config().developer.chunk_size, - node.rate_limit.map(|x| x as _), - barrier_rx, - params.executor_stats.clone(), - ) - .boxed() + unreachable!("SnapshotBackfillExecutor is handled specially when in `StreamActorManager::create_nodes_inner`") } StreamScanType::Unspecified => unreachable!(), }; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 16c12411e6e0..242f21c17272 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -62,7 +62,7 @@ use risingwave_pb::stream_service::{ use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{Barrier, BarrierInner, DispatcherBarrier, Mutation, StreamExecutorError}; +use crate::executor::{Barrier, BarrierInner, StreamExecutorError}; use crate::task::barrier_manager::managed_state::ManagedBarrierStateDebugInfo; use crate::task::barrier_manager::progress::BackfillState; @@ -177,8 +177,6 @@ impl ControlStreamHandle { } } -pub(crate) type SubscribeMutationItem = (u64, Option>); - pub(super) enum LocalBarrierEvent { ReportActorCollected { actor_id: ActorId, @@ -189,11 +187,6 @@ pub(super) enum LocalBarrierEvent { actor: ActorId, state: BackfillState, }, - SubscribeBarrierMutation { - actor_id: ActorId, - epoch: EpochPair, - mutation_sender: mpsc::UnboundedSender, - }, RegisterBarrierSender { actor_id: ActorId, barrier_sender: mpsc::UnboundedSender, @@ -414,14 +407,6 @@ impl LocalBarrierWorker { } => { self.update_create_mview_progress(epoch, actor, state); } - LocalBarrierEvent::SubscribeBarrierMutation { - actor_id, - epoch, - mutation_sender, - } => { - self.state - .subscribe_actor_mutation(actor_id, epoch.prev, mutation_sender); - } LocalBarrierEvent::RegisterBarrierSender { actor_id, barrier_sender, @@ -765,21 +750,6 @@ impl LocalBarrierManager { .send((actor_id, err.into_unexpected_exit(actor_id))); } - /// When a `RemoteInput` get a barrier, it should wait and read the barrier mutation from the barrier manager. - pub fn subscribe_barrier_mutation( - &self, - actor_id: ActorId, - first_barrier: &DispatcherBarrier, - ) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded_channel(); - self.send_event(LocalBarrierEvent::SubscribeBarrierMutation { - actor_id, - epoch: first_barrier.epoch, - mutation_sender: tx, - }); - rx - } - pub fn subscribe_barrier(&self, actor_id: ActorId) -> UnboundedReceiver { let (tx, rx) = mpsc::unbounded_channel(); self.send_event(LocalBarrierEvent::RegisterBarrierSender { @@ -981,7 +951,6 @@ pub(crate) mod barrier_test_utils { actor_ids_to_collect: actor_to_collect.into_iter().collect(), table_ids_to_sync: vec![], partial_graph_id: u32::MAX, - actor_ids_to_pre_sync_barrier_mutation: vec![], broadcast_info: vec![], actors_to_build: vec![], subscriptions_to_add: vec![], diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 7682c37071be..4d0e82661fad 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -38,7 +38,7 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; use super::progress::BackfillState; -use super::{BarrierCompleteResult, SubscribeMutationItem}; +use super::BarrierCompleteResult; use crate::error::{StreamError, StreamResult}; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; @@ -265,12 +265,9 @@ impl InflightActorStatus { pub(crate) struct InflightActorState { actor_id: ActorId, - pending_subscribers: BTreeMap>>, barrier_senders: Vec>, /// `prev_epoch` -> partial graph id pub(super) inflight_barriers: BTreeMap, - /// `prev_epoch` -> (`mutation`, `curr_epoch`) - barrier_mutations: BTreeMap>, u64)>, status: InflightActorStatus, /// Whether the actor has been issued a stop barrier is_stopping: bool, @@ -289,16 +286,11 @@ impl InflightActorState { ) -> Self { Self { actor_id, - pending_subscribers: Default::default(), barrier_senders: vec![], inflight_barriers: BTreeMap::from_iter([( initial_barrier.epoch.prev, initial_partial_graph_id, )]), - barrier_mutations: BTreeMap::from_iter([( - initial_barrier.epoch.prev, - (initial_barrier.mutation.clone(), initial_barrier.epoch.curr), - )]), status: InflightActorStatus::IssuedFirst(vec![initial_barrier.clone()]), is_stopping: false, join_handle, @@ -306,25 +298,6 @@ impl InflightActorState { } } - pub(super) fn sync_barrier(&mut self, barrier: &Barrier) { - if let Some(mut subscribers) = self.pending_subscribers.remove(&barrier.epoch.prev) { - subscribers.retain(|tx| { - tx.send((barrier.epoch.prev, barrier.mutation.clone())) - .is_ok() - }); - if !subscribers.is_empty() { - self.pending_subscribers - .entry(barrier.epoch.curr) - .or_default() - .extend(subscribers); - } - } - self.barrier_mutations.insert( - barrier.epoch.prev, - (barrier.mutation.clone(), barrier.epoch.curr), - ); - } - pub(super) fn issue_barrier( &mut self, partial_graph_id: PartialGraphId, @@ -333,28 +306,6 @@ impl InflightActorState { ) -> StreamResult<()> { assert!(barrier.epoch.prev > self.status.max_issued_epoch()); - if let Some((first_epoch, _)) = self.pending_subscribers.first_key_value() { - assert!( - *first_epoch >= barrier.epoch.prev, - "barrier epoch {:?} skip subscribed epoch {}", - barrier.epoch, - first_epoch - ); - if *first_epoch == barrier.epoch.prev { - let (_, mut subscribers) = self.pending_subscribers.pop_first().expect("non empty"); - subscribers.retain(|tx| { - tx.send((barrier.epoch.prev, barrier.mutation.clone())) - .is_ok() - }); - if !is_stop && !subscribers.is_empty() { - self.pending_subscribers - .entry(barrier.epoch.curr) - .or_default() - .extend(subscribers); - } - } - } - for barrier_sender in &self.barrier_senders { barrier_sender.send(barrier.clone()).map_err(|_| { StreamError::barrier_send( @@ -370,13 +321,6 @@ impl InflightActorState { .insert(barrier.epoch.prev, partial_graph_id) .is_none()); - if let Some((_, curr_epoch)) = self.barrier_mutations.insert( - barrier.epoch.prev, - (barrier.mutation.clone(), barrier.epoch.curr), - ) { - assert_eq!(curr_epoch, barrier.epoch.curr); - } - match &mut self.status { InflightActorStatus::IssuedFirst(pending_barriers) => { pending_barriers.push(barrier.clone()); @@ -387,7 +331,6 @@ impl InflightActorState { }; if is_stop { - assert!(self.pending_subscribers.is_empty()); assert!(!self.is_stopping, "stopped actor should not issue barrier"); self.is_stopping = true; } @@ -398,8 +341,6 @@ impl InflightActorState { let (prev_epoch, prev_partial_graph_id) = self.inflight_barriers.pop_first().expect("should exist"); assert_eq!(prev_epoch, epoch.prev); - let (min_mutation_epoch, _) = self.barrier_mutations.pop_first().expect("should exist"); - assert_eq!(min_mutation_epoch, epoch.prev); match &self.status { InflightActorStatus::IssuedFirst(pending_barriers) => { assert_eq!( @@ -529,52 +470,6 @@ impl ManagedBarrierState { } impl InflightActorState { - pub(super) fn subscribe_actor_mutation( - &mut self, - start_prev_epoch: u64, - tx: mpsc::UnboundedSender, - ) { - if let Some((mutation, start_curr_epoch)) = self.barrier_mutations.get(&start_prev_epoch) { - if tx.send((start_prev_epoch, mutation.clone())).is_err() { - return; - } - let mut prev_epoch = *start_curr_epoch; - for (mutation_prev_epoch, (mutation, mutation_curr_epoch)) in - self.barrier_mutations.range(start_curr_epoch..) - { - if prev_epoch == *mutation_prev_epoch { - if tx.send((prev_epoch, mutation.clone())).is_err() { - // No more subscribe on the mutation. Simply return. - return; - } - prev_epoch = *mutation_curr_epoch; - } else { - assert!(prev_epoch < *mutation_prev_epoch); - break; - } - } - if !self.is_stopping { - // Only add the subscribers when the actor is not stopped yet. - self.pending_subscribers - .entry(prev_epoch) - .or_default() - .push(tx); - } - } else { - let max_issued_epoch = self.status.max_issued_epoch(); - assert!( - max_issued_epoch < start_prev_epoch, - "later barrier {} has been issued, but skip the start epoch {:?}", - max_issued_epoch, - start_prev_epoch - ); - self.pending_subscribers - .entry(start_prev_epoch) - .or_default() - .push(tx); - } - } - pub(super) fn register_barrier_sender( &mut self, tx: mpsc::UnboundedSender, @@ -601,18 +496,6 @@ impl InflightActorState { } impl ManagedBarrierState { - pub(super) fn subscribe_actor_mutation( - &mut self, - actor_id: ActorId, - start_prev_epoch: u64, - tx: mpsc::UnboundedSender, - ) { - self.actor_states - .get_mut(&actor_id) - .expect("should exist") - .subscribe_actor_mutation(start_prev_epoch, tx); - } - pub(super) fn register_barrier_sender( &mut self, actor_id: ActorId, @@ -773,16 +656,6 @@ impl ManagedBarrierState { .issue_barrier(partial_graph_id, barrier, is_stop_actor(*actor_id))?; } - if partial_graph_id.is_global_graph() { - for actor_id in request.actor_ids_to_pre_sync_barrier_mutation { - self.actor_states - .get_mut(&actor_id) - .expect("should exist") - .sync_barrier(barrier); - } - } else { - assert!(request.actor_ids_to_pre_sync_barrier_mutation.is_empty()); - } Ok(()) } diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 112ee533d8e6..a9ba0b4b7ed0 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -124,15 +124,17 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { .map(register_sender) .collect_vec(); - let mut mutation_subscriber = - manager.subscribe_barrier_mutation(extra_actor_id, &barrier.clone().into_dispatcher()); + let mut barrier_subscriber = manager.subscribe_barrier(extra_actor_id); // Read the mutation after receiving the barrier from remote input. - let mut mutation_reader = pin!(mutation_subscriber.recv()); + let mut mutation_reader = pin!(barrier_subscriber.recv()); assert!(poll_fn(|cx| Poll::Ready(mutation_reader.as_mut().poll(cx).is_pending())).await); - let (epoch, mutation) = mutation_reader.await.unwrap(); - assert_eq!((epoch, &mutation), (barrier.epoch.prev, &barrier.mutation)); + let recv_barrier = mutation_reader.await.unwrap(); + assert_eq!( + (recv_barrier.epoch, &recv_barrier.mutation), + (barrier.epoch, &barrier.mutation) + ); // Collect a barrier before sending manager.collect(extra_actor_id, &barrier); @@ -140,7 +142,7 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { // Collect barriers from actors let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { let barrier = rx.recv().await.unwrap(); - assert_eq!(barrier.epoch.prev, epoch); + assert_eq!(barrier.epoch, recv_barrier.epoch); (*actor_id, barrier) })) .await; diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 59851fdf09ad..889495fbefed 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -28,7 +28,6 @@ mod barrier_manager; mod env; mod stream_manager; -pub(crate) use barrier_manager::SubscribeMutationItem; pub use barrier_manager::*; pub use env::*; pub use stream_manager::*; @@ -47,10 +46,6 @@ impl PartialGraphId { fn new(id: u32) -> Self { Self(id) } - - fn is_global_graph(&self) -> bool { - self.0 == u32::MAX - } } impl From for u32 { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 604055409d55..83a7137d4403 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -24,20 +24,22 @@ use await_tree::InstrumentAwait; use futures::stream::BoxStream; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; -use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::catalog::{Field, Schema, TableId}; +use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; use risingwave_common::config::MetricLevel; +use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::ActorInfo; +use risingwave_pb::plan_common::StorageTableDesc; use risingwave_pb::stream_plan; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::{StreamActor, StreamNode}; +use risingwave_pb::stream_plan::{StreamActor, StreamNode, StreamScanNode, StreamScanType}; use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; use risingwave_pb::stream_service::{ StreamingControlStreamRequest, StreamingControlStreamResponse, }; use risingwave_storage::monitor::HummockTraceFutureExt; +use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::{dispatch_state_store, StateStore}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -50,10 +52,11 @@ use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::{ - Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Executor, ExecutorInfo, + Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Execute, Executor, + ExecutorInfo, MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor, }; -use crate::from_proto::create_executor; +use crate::from_proto::{create_executor, MergeExecutorBuilder}; use crate::task::barrier_manager::{ ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker, }; @@ -290,6 +293,128 @@ impl StreamActorManager { )) } + fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 { + // We assume that the operator_id of different instances from the same RelNode will be the + // same. + unique_executor_id(actor_context.id, node.operator_id) + } + + fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo { + let schema: Schema = node.fields.iter().map(Field::from).collect(); + + let pk_indices = node + .get_stream_key() + .iter() + .map(|idx| *idx as usize) + .collect::>(); + + let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id); + ExecutorInfo { + schema, + pk_indices, + identity, + } + } + + fn create_snapshot_backfill_input( + &self, + upstream_node: &StreamNode, + actor_context: &ActorContextRef, + shared_context: &Arc, + ) -> StreamResult { + let info = Self::get_executor_info( + upstream_node, + Self::get_executor_id(actor_context, upstream_node), + ); + + let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => { + upstream_merge + }); + + MergeExecutorBuilder::new_input( + shared_context.clone(), + self.streaming_metrics.clone(), + actor_context.clone(), + info, + upstream_merge, + ) + } + + #[expect(clippy::too_many_arguments)] + fn create_snapshot_backfill_node( + &self, + stream_node: &StreamNode, + node: &StreamScanNode, + actor_context: &ActorContextRef, + vnode_bitmap: Option, + shared_context: &Arc, + env: StreamEnvironment, + state_store: impl StateStore, + ) -> StreamResult { + let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap(); + let upstream = + self.create_snapshot_backfill_input(upstream_node, actor_context, shared_context)?; + + let table_desc: &StorageTableDesc = node.get_table_desc()?; + + let output_indices = node + .output_indices + .iter() + .map(|&i| i as usize) + .collect_vec(); + + let column_ids = node + .upstream_column_ids + .iter() + .map(ColumnId::from) + .collect_vec(); + + let progress = shared_context + .local_barrier_manager + .register_create_mview_progress(actor_context.id); + + let vnodes = vnode_bitmap.map(Arc::new); + let barrier_rx = shared_context + .local_barrier_manager + .subscribe_barrier(actor_context.id); + + let upstream_table = + StorageTable::new_partial(state_store.clone(), column_ids, vnodes, table_desc); + + let executor = SnapshotBackfillExecutor::new( + upstream_table, + upstream, + output_indices, + actor_context.clone(), + progress, + env.config().developer.chunk_size, + node.rate_limit.map(|x| x as _), + barrier_rx, + self.streaming_metrics.clone(), + ) + .boxed(); + + let info = Self::get_executor_info( + stream_node, + Self::get_executor_id(actor_context, stream_node), + ); + + if crate::consistency::insane() { + let mut troubled_info = info.clone(); + troubled_info.identity = format!("{} (troubled)", info.identity); + Ok(( + info, + TroublemakerExecutor::new( + (troubled_info, executor).into(), + env.config().developer.chunk_size, + ), + ) + .into()) + } else { + Ok((info, executor).into()) + } + } + /// Create a chain(tree) of nodes, with given `store`. #[allow(clippy::too_many_arguments)] #[async_recursion] @@ -305,6 +430,22 @@ impl StreamActorManager { subtasks: &mut Vec, shared_context: &Arc, ) -> StreamResult { + if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap() + && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type() + { + return dispatch_state_store!(env.state_store(), store, { + self.create_snapshot_backfill_node( + node, + stream_scan, + actor_context, + vnode_bitmap, + shared_context, + env, + store, + ) + }); + } + // The "stateful" here means that the executor may issue read operations to the state store // massively and continuously. Used to decide whether to apply the optimization of subtasks. fn is_stateful_executor(stream_node: &StreamNode) -> bool { @@ -343,24 +484,13 @@ impl StreamActorManager { } let op_info = node.get_identity().clone(); - let pk_indices = node - .get_stream_key() - .iter() - .map(|idx| *idx as usize) - .collect::>(); // We assume that the operator_id of different instances from the same RelNode will be the // same. - let executor_id = unique_executor_id(actor_context.id, node.operator_id); + let executor_id = Self::get_executor_id(actor_context, node); let operator_id = unique_operator_id(fragment_id, node.operator_id); - let schema: Schema = node.fields.iter().map(Field::from).collect(); - let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id); - let info = ExecutorInfo { - schema, - pk_indices, - identity, - }; + let info = Self::get_executor_info(node, executor_id); let eval_error_report = ActorEvalErrorReport { actor_context: actor_context.clone(),