diff --git a/src/batch/src/task/env.rs b/src/batch/src/task/env.rs index ecb7a3a8d3eb..6c4f32ac92e7 100644 --- a/src/batch/src/task/env.rs +++ b/src/batch/src/task/env.rs @@ -112,7 +112,7 @@ impl BatchEnvironment { BatchManagerMetrics::for_test(), u64::MAX, )), - server_addr: "127.0.0.1:5688".parse().unwrap(), + server_addr: "127.0.0.1:2333".parse().unwrap(), config: Arc::new(BatchConfig::default()), worker_id: WorkerNodeId::default(), state_store: StateStoreImpl::shared_in_memory_store(Arc::new( diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index f44f6f7552bb..e4082a88ea9e 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -24,7 +24,7 @@ use risingwave_pb::task_service::{ permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits, }; use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver}; -use risingwave_stream::executor::Message; +use risingwave_stream::executor::DispatcherMessage; use risingwave_stream::task::LocalStreamManager; use thiserror_ext::AsReport; use tokio_stream::wrappers::ReceiverStream; @@ -169,21 +169,14 @@ impl ExchangeServiceImpl { Either::Left(permits_to_add) => { permits.add_permits(permits_to_add); } - Either::Right(MessageWithPermits { - mut message, - permits, - }) => { - // Erase the mutation of the barrier to avoid decoding in remote side. - if let Message::Barrier(barrier) = &mut message { - barrier.mutation = None; - } + Either::Right(MessageWithPermits { message, permits }) => { let proto = message.to_protobuf(); // forward the acquired permit to the downstream let response = GetStreamResponse { message: Some(proto), permits: Some(PbPermits { value: permits }), }; - let bytes = Message::get_encoded_len(&response); + let bytes = DispatcherMessage::get_encoded_len(&response); yield response; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 1a792b5ebfab..1c05497665fe 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -579,7 +579,7 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(paused_reason).instrument(span).await; + self.recovery(paused_reason, None).instrument(span).await; } self.context.set_status(BarrierManagerStatus::Running); @@ -789,12 +789,6 @@ impl GlobalBarrierManager { } async fn failure_recovery(&mut self, err: MetaError) { - self.context - .tracker - .lock() - .await - .abort_all(&err, &self.context) - .await; self.checkpoint_control.clear_on_err(&err).await; self.pending_non_checkpoint_barriers.clear(); @@ -813,7 +807,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(None).instrument(span).await; + self.recovery(None, Some(err)).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); @@ -822,12 +816,6 @@ impl GlobalBarrierManager { async fn adhoc_recovery(&mut self) { let err = MetaErrorInner::AdhocRecovery.into(); - self.context - .tracker - .lock() - .await - .abort_all(&err, &self.context) - .await; self.checkpoint_control.clear_on_err(&err).await; self.context @@ -842,7 +830,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(None).instrument(span).await; + self.recovery(None, Some(err)).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 4bb9d2f669c0..f9bba534e561 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -43,7 +43,7 @@ use crate::controller::catalog::ReleaseContext; use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; -use crate::{model, MetaResult}; +use crate::{model, MetaError, MetaResult}; impl GlobalBarrierManager { // Retry base interval in milliseconds. @@ -224,7 +224,7 @@ impl GlobalBarrierManager { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery(&mut self, paused_reason: Option) { + pub async fn recovery(&mut self, paused_reason: Option, err: Option) { let prev_epoch = TracedEpoch::new( self.context .hummock_manager @@ -246,6 +246,15 @@ impl GlobalBarrierManager { let new_state = tokio_retry::Retry::spawn(retry_strategy, || { async { let recovery_result: MetaResult<_> = try { + if let Some(err) = &err { + self.context + .tracker + .lock() + .await + .abort_all(err, &self.context) + .await; + } + self.context .clean_dirty_streaming_jobs() .await diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index cf788e9ebd7f..9f452dc1863b 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -31,7 +31,9 @@ use tokio::time::Instant; use tracing::{event, Instrument}; use super::exchange::output::{new_output, BoxedOutput}; -use super::{AddMutation, TroublemakerExecutor, UpdateMutation}; +use super::{ + AddMutation, DispatcherBarrier, DispatcherMessage, TroublemakerExecutor, UpdateMutation, +}; use crate::executor::prelude::*; use crate::executor::StreamConsumer; use crate::task::{DispatcherId, SharedContext}; @@ -142,7 +144,9 @@ impl DispatchExecutorInner { .map(Ok) .try_for_each_concurrent(limit, |dispatcher| async { let start_time = Instant::now(); - dispatcher.dispatch_barrier(barrier.clone()).await?; + dispatcher + .dispatch_barrier(barrier.clone().into_dispatcher()) + .await?; dispatcher .actor_output_buffer_blocking_duration_ns .inc_by(start_time.elapsed().as_nanos() as u64); @@ -497,7 +501,7 @@ macro_rules! impl_dispatcher { } } - pub async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + pub async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { match self { $( Self::$variant_name(inner) => inner.dispatch_barrier(barrier).await, )* } @@ -561,7 +565,7 @@ pub trait Dispatcher: Debug + 'static { /// Dispatch a data chunk to downstream actors. fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>; /// Dispatch a barrier to downstream actors, generally by broadcasting it. - fn dispatch_barrier(&mut self, barrier: Barrier) -> impl DispatchFuture<'_>; + fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> impl DispatchFuture<'_>; /// Dispatch a watermark to downstream actors, generally by broadcasting it. fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>; @@ -591,7 +595,7 @@ pub trait Dispatcher: Debug + 'static { /// always unlimited. async fn broadcast_concurrent( outputs: impl IntoIterator, - message: Message, + message: DispatcherMessage, ) -> StreamResult<()> { futures::future::try_join_all( outputs @@ -637,21 +641,24 @@ impl Dispatcher for RoundRobinDataDispatcher { chunk.project(&self.output_indices) }; - self.outputs[self.cur].send(Message::Chunk(chunk)).await?; + self.outputs[self.cur] + .send(DispatcherMessage::Chunk(chunk)) + .await?; self.cur += 1; self.cur %= self.outputs.len(); Ok(()) } - async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { // always broadcast barrier - broadcast_concurrent(&mut self.outputs, Message::Barrier(barrier)).await + broadcast_concurrent(&mut self.outputs, DispatcherMessage::Barrier(barrier)).await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { // always broadcast watermark - broadcast_concurrent(&mut self.outputs, Message::Watermark(watermark)).await?; + broadcast_concurrent(&mut self.outputs, DispatcherMessage::Watermark(watermark)) + .await?; } Ok(()) } @@ -725,15 +732,16 @@ impl Dispatcher for HashDataDispatcher { self.outputs.extend(outputs); } - async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { // always broadcast barrier - broadcast_concurrent(&mut self.outputs, Message::Barrier(barrier)).await + broadcast_concurrent(&mut self.outputs, DispatcherMessage::Barrier(barrier)).await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { // always broadcast watermark - broadcast_concurrent(&mut self.outputs, Message::Watermark(watermark)).await?; + broadcast_concurrent(&mut self.outputs, DispatcherMessage::Watermark(watermark)) + .await?; } Ok(()) } @@ -818,7 +826,9 @@ impl Dispatcher for HashDataDispatcher { "send = \n{:#?}", new_stream_chunk ); - output.send(Message::Chunk(new_stream_chunk)).await?; + output + .send(DispatcherMessage::Chunk(new_stream_chunk)) + .await?; } StreamResult::Ok(()) }), @@ -888,18 +898,26 @@ impl Dispatcher for BroadcastDispatcher { } else { chunk.project(&self.output_indices) }; - broadcast_concurrent(self.outputs.values_mut(), Message::Chunk(chunk)).await + broadcast_concurrent(self.outputs.values_mut(), DispatcherMessage::Chunk(chunk)).await } - async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { // always broadcast barrier - broadcast_concurrent(self.outputs.values_mut(), Message::Barrier(barrier)).await + broadcast_concurrent( + self.outputs.values_mut(), + DispatcherMessage::Barrier(barrier), + ) + .await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { // always broadcast watermark - broadcast_concurrent(self.outputs.values_mut(), Message::Watermark(watermark)).await?; + broadcast_concurrent( + self.outputs.values_mut(), + DispatcherMessage::Watermark(watermark), + ) + .await?; } Ok(()) } @@ -970,10 +988,12 @@ impl Dispatcher for SimpleDispatcher { assert!(self.output.len() <= 2); } - async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> { // Only barrier is allowed to be dispatched to multiple outputs during migration. for output in &mut self.output { - output.send(Message::Barrier(barrier.clone())).await?; + output + .send(DispatcherMessage::Barrier(barrier.clone())) + .await?; } Ok(()) } @@ -992,7 +1012,7 @@ impl Dispatcher for SimpleDispatcher { } else { chunk.project(&self.output_indices) }; - output.send(Message::Chunk(chunk)).await + output.send(DispatcherMessage::Chunk(chunk)).await } async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { @@ -1003,7 +1023,7 @@ impl Dispatcher for SimpleDispatcher { .expect("expect exactly one output"); if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - output.send(Message::Watermark(watermark)).await?; + output.send(DispatcherMessage::Watermark(watermark)).await?; } Ok(()) } @@ -1044,23 +1064,25 @@ mod tests { use crate::executor::exchange::output::Output; use crate::executor::exchange::permit::channel_for_test; use crate::executor::receiver::ReceiverExecutor; + use crate::executor::{BarrierInner as Barrier, MessageInner as Message}; + use crate::task::barrier_test_utils::LocalBarrierTestEnv; use crate::task::test_utils::helper_make_local_actor; #[derive(Debug)] pub struct MockOutput { actor_id: ActorId, - data: Arc>>, + data: Arc>>, } impl MockOutput { - pub fn new(actor_id: ActorId, data: Arc>>) -> Self { + pub fn new(actor_id: ActorId, data: Arc>>) -> Self { Self { actor_id, data } } } #[async_trait] impl Output for MockOutput { - async fn send(&mut self, message: Message) -> StreamResult<()> { + async fn send(&mut self, message: DispatcherMessage) -> StreamResult<()> { self.data.lock().unwrap().push(message); Ok(()) } @@ -1154,7 +1176,11 @@ mod tests { let (tx, rx) = channel_for_test(); let actor_id = 233; let fragment_id = 666; - let input = Executor::new(Default::default(), ReceiverExecutor::for_test(rx).boxed()); + let barrier_test_env = LocalBarrierTestEnv::for_test().await; + let input = Executor::new( + Default::default(), + ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(), + ); let ctx = Arc::new(SharedContext::for_test()); let metrics = Arc::new(StreamingMetrics::unused()); @@ -1245,7 +1271,10 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - tx.send(Message::Barrier(b1)).await.unwrap(); + barrier_test_env.inject_barrier(&b1, [], [actor_id]); + tx.send(Message::Barrier(b1.clone().into_dispatcher())) + .await + .unwrap(); executor.next().await.unwrap().unwrap(); // 5. Check downstream. @@ -1261,7 +1290,9 @@ mod tests { try_recv!(old_simple).unwrap().as_barrier().unwrap(); // Untouched. // 6. Send another barrier. - tx.send(Message::Barrier(Barrier::new_test_barrier(test_epoch(2)))) + let b2 = Barrier::new_test_barrier(test_epoch(2)); + barrier_test_env.inject_barrier(&b2, [], [actor_id]); + tx.send(Message::Barrier(b2.into_dispatcher())) .await .unwrap(); executor.next().await.unwrap().unwrap(); @@ -1299,7 +1330,10 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - tx.send(Message::Barrier(b3)).await.unwrap(); + barrier_test_env.inject_barrier(&b3, [], [actor_id]); + tx.send(Message::Barrier(b3.into_dispatcher())) + .await + .unwrap(); executor.next().await.unwrap().unwrap(); // 10. Check downstream. @@ -1309,7 +1343,9 @@ mod tests { try_recv!(new_simple).unwrap().as_barrier().unwrap(); // Since it's just added, it won't receive the chunk. // 11. Send another barrier. - tx.send(Message::Barrier(Barrier::new_test_barrier(test_epoch(4)))) + let b4 = Barrier::new_test_barrier(test_epoch(4)); + barrier_test_env.inject_barrier(&b4, [], [actor_id]); + tx.send(Message::Barrier(b4.into_dispatcher())) .await .unwrap(); executor.next().await.unwrap().unwrap(); @@ -1403,7 +1439,7 @@ mod tests { } else { let message = guard.first().unwrap(); let real_chunk = match message { - Message::Chunk(chunk) => chunk, + DispatcherMessage::Chunk(chunk) => chunk, _ => panic!(), }; real_chunk diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index bd348e65defc..be64af33acd7 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -22,10 +22,12 @@ use pin_project::pin_project; use risingwave_common::util::addr::{is_local_address, HostAddr}; use risingwave_pb::task_service::{permits, GetStreamResponse}; use risingwave_rpc_client::ComputeClientPool; +use tokio::sync::mpsc; use super::error::ExchangeChannelClosed; use super::permit::Receiver; use crate::executor::prelude::*; +use crate::executor::{DispatcherBarrier, DispatcherMessage}; use crate::task::{ FragmentId, LocalBarrierManager, SharedContext, UpDownActorIds, UpDownFragmentIds, }; @@ -64,23 +66,80 @@ pub struct LocalInput { } type LocalInputStreamInner = impl MessageStream; +async fn process_msg<'a>( + msg: DispatcherMessage, + get_mutation_subscriber: impl for<'b> FnOnce( + &'b DispatcherBarrier, + ) + -> &'a mut mpsc::UnboundedReceiver + + 'a, +) -> StreamExecutorResult { + let barrier = match msg { + DispatcherMessage::Chunk(c) => { + return Ok(Message::Chunk(c)); + } + DispatcherMessage::Barrier(b) => b, + DispatcherMessage::Watermark(watermark) => { + return Ok(Message::Watermark(watermark)); + } + }; + let mutation_subscriber = get_mutation_subscriber(&barrier); + + let mutation = mutation_subscriber + .recv() + .await + .ok_or_else(|| anyhow!("failed to receive mutation of barrier {:?}", barrier)) + .map(|(prev_epoch, mutation)| { + assert_eq!(prev_epoch, barrier.epoch.prev); + mutation + })?; + Ok(Message::Barrier(Barrier { + epoch: barrier.epoch, + mutation, + kind: barrier.kind, + tracing_context: barrier.tracing_context, + passed_actors: barrier.passed_actors, + })) +} + impl LocalInput { - pub fn new(channel: Receiver, actor_id: ActorId) -> Self { + pub fn new( + channel: Receiver, + upstream_actor_id: ActorId, + self_actor_id: ActorId, + local_barrier_manager: LocalBarrierManager, + ) -> Self { Self { - inner: Self::run(channel, actor_id), - actor_id, + inner: Self::run( + channel, + upstream_actor_id, + self_actor_id, + local_barrier_manager, + ), + actor_id: upstream_actor_id, } } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn run(mut channel: Receiver, actor_id: ActorId) { - let span: await_tree::Span = format!("LocalInput (actor {actor_id})").into(); + async fn run( + mut channel: Receiver, + upstream_actor_id: ActorId, + self_actor_id: ActorId, + local_barrier_manager: LocalBarrierManager, + ) { + let span: await_tree::Span = format!("LocalInput (actor {upstream_actor_id})").into(); + let mut mutation_subscriber = None; while let Some(msg) = channel.recv().verbose_instrument_await(span.clone()).await { - yield msg; + yield process_msg(msg, |barrier| { + mutation_subscriber.get_or_insert_with(|| { + local_barrier_manager.subscribe_barrier_mutation(self_actor_id, barrier) + }) + }) + .await?; } // Always emit an error outside the loop. This is because we use barrier as the control // message to stop the stream. Reaching here means the channel is closed unexpectedly. - Err(ExchangeChannelClosed::local_input(actor_id))? + Err(ExchangeChannelClosed::local_input(upstream_actor_id))? } } @@ -170,11 +229,11 @@ impl RemoteInput { match data_res { Ok(GetStreamResponse { message, permits }) => { let msg = message.unwrap(); - let bytes = Message::get_encoded_len(&msg); + let bytes = DispatcherMessage::get_encoded_len(&msg); exchange_frag_recv_size_metrics.inc_by(bytes as u64); - let msg_res = Message::from_protobuf(&msg); + let msg_res = DispatcherMessage::from_protobuf(&msg); if let Some(add_back_permits) = match permits.unwrap().value { // For records, batch the permits we received to reduce the backward // `AddPermits` messages. @@ -196,35 +255,14 @@ impl RemoteInput { .context("RemoteInput backward permits channel closed.")?; } - let mut msg = msg_res.context("RemoteInput decode message error")?; - - // Read barrier mutation from local barrier manager and attach it to the barrier message. - if cfg!(not(test)) { - if let Message::Barrier(barrier) = &mut msg { - assert!( - barrier.mutation.is_none(), - "Mutation should be erased in remote side" - ); - let mutation_subscriber = - mutation_subscriber.get_or_insert_with(|| { - local_barrier_manager - .subscribe_barrier_mutation(self_actor_id, barrier) - }); - - let mutation = mutation_subscriber - .recv() - .await - .ok_or_else(|| { - anyhow!("failed to receive mutation of barrier {:?}", barrier) - }) - .map(|(prev_epoch, mutation)| { - assert_eq!(prev_epoch, barrier.epoch.prev); - mutation - })?; - barrier.mutation = mutation; - } - } - yield msg; + let msg = msg_res.context("RemoteInput decode message error")?; + + yield process_msg(msg, |barrier| { + mutation_subscriber.get_or_insert_with(|| { + local_barrier_manager.subscribe_barrier_mutation(self_actor_id, barrier) + }) + }) + .await?; } Err(e) => Err(ExchangeChannelClosed::remote_input(up_down_ids.0, Some(e)))?, @@ -270,6 +308,8 @@ pub(crate) fn new_input( LocalInput::new( context.take_receiver((upstream_actor_id, actor_id))?, upstream_actor_id, + actor_id, + context.local_barrier_manager.clone(), ) .boxed_input() } else { diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index 41b4b5b84475..145286f561e1 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -22,7 +22,7 @@ use risingwave_common::util::addr::is_local_address; use super::error::ExchangeChannelClosed; use super::permit::Sender; use crate::error::StreamResult; -use crate::executor::Message; +use crate::executor::DispatcherMessage as Message; use crate::task::{ActorId, SharedContext}; /// `Output` provides an interface for `Dispatcher` to send data into downstream actors. diff --git a/src/stream/src/executor/exchange/permit.rs b/src/stream/src/executor/exchange/permit.rs index 159494355cff..8c86eb275381 100644 --- a/src/stream/src/executor/exchange/permit.rs +++ b/src/stream/src/executor/exchange/permit.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use risingwave_pb::task_service::permits; use tokio::sync::{mpsc, AcquireError, Semaphore, SemaphorePermit}; -use crate::executor::Message; +use crate::executor::DispatcherMessage as Message; /// Message with its required permits. /// @@ -214,7 +214,7 @@ mod tests { use futures::FutureExt; use super::*; - use crate::executor::Barrier; + use crate::executor::DispatcherBarrier as Barrier; #[test] fn test_channel_close() { diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 8f73fe26ee7d..0b7415adac38 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -34,7 +34,8 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::test_utils::agg_executor::{ generate_agg_schema, new_boxed_simple_agg_executor, }; -use crate::task::{LocalBarrierManager, SharedContext}; +use crate::executor::{BarrierInner as Barrier, MessageInner as Message}; +use crate::task::barrier_test_utils::LocalBarrierTestEnv; /// This test creates a merger-dispatcher pair, and run a sum. Each chunk /// has 0~9 elements. We first insert the 10 chunks, then delete them, @@ -45,9 +46,19 @@ async fn test_merger_sum_aggr() { time_zone: String::from("UTC"), }; - let actor_ctx = ActorContext::for_test(0); + let barrier_test_env = LocalBarrierTestEnv::for_test().await; + let mut next_actor_id = 0; + let next_actor_id = &mut next_actor_id; + let mut actors = HashSet::new(); + let mut gen_next_actor_id = || { + *next_actor_id += 1; + actors.insert(*next_actor_id); + *next_actor_id + }; // `make_actor` build an actor to do local aggregation - let make_actor = |input_rx| { + let mut make_actor = |input_rx| { + let actor_id = gen_next_actor_id(); + let actor_ctx = ActorContext::for_test(actor_id); let input_schema = Schema { fields: vec![Field::unnamed(DataType::Int64)], }; @@ -57,7 +68,8 @@ async fn test_merger_sum_aggr() { pk_indices: PkIndices::new(), identity: "ReceiverExecutor".to_string(), }, - ReceiverExecutor::for_test(input_rx).boxed(), + ReceiverExecutor::for_test(actor_id, input_rx, barrier_test_env.shared_context.clone()) + .boxed(), ); let agg_calls = vec![ AggCall::from_pretty("(count:int8)"), @@ -72,13 +84,17 @@ async fn test_merger_sum_aggr() { input: aggregator.boxed(), channel: Box::new(LocalOutput::new(233, tx)), }; + let actor = Actor::new( consumer, vec![], StreamingMetrics::unused().into(), - actor_ctx.clone(), + actor_ctx, expr_context.clone(), - LocalBarrierManager::for_test(), + barrier_test_env + .shared_context + .local_barrier_manager + .clone(), ); (actor, rx) }; @@ -90,7 +106,6 @@ async fn test_merger_sum_aggr() { let mut inputs = vec![]; let mut outputs = vec![]; - let ctx = Arc::new(SharedContext::for_test()); let metrics = Arc::new(StreamingMetrics::unused()); // create 17 local aggregation actors @@ -103,6 +118,8 @@ async fn test_merger_sum_aggr() { } // create a round robin dispatcher, which dispatches messages to the actors + + let actor_id = gen_next_actor_id(); let (input, rx) = channel_for_test(); let receiver_op = Executor::new( ExecutorInfo { @@ -111,7 +128,7 @@ async fn test_merger_sum_aggr() { pk_indices: PkIndices::new(), identity: "ReceiverExecutor".to_string(), }, - ReceiverExecutor::for_test(rx).boxed(), + ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()).boxed(), ); let dispatcher = DispatchExecutor::new( receiver_op, @@ -122,7 +139,7 @@ async fn test_merger_sum_aggr() { ))], 0, 0, - ctx, + barrier_test_env.shared_context.clone(), metrics, config::default::developer::stream_chunk_size(), ); @@ -130,12 +147,17 @@ async fn test_merger_sum_aggr() { dispatcher, vec![], StreamingMetrics::unused().into(), - actor_ctx.clone(), + ActorContext::for_test(actor_id), expr_context.clone(), - LocalBarrierManager::for_test(), + barrier_test_env + .shared_context + .local_barrier_manager + .clone(), ); handles.push(tokio::spawn(actor.run())); + let actor_ctx = ActorContext::for_test(gen_next_actor_id()); + // use a merge operator to collect data from dispatchers before sending them to aggregator let merger = Executor::new( ExecutorInfo { @@ -147,7 +169,12 @@ async fn test_merger_sum_aggr() { pk_indices: PkIndices::new(), identity: "MergeExecutor".to_string(), }, - MergeExecutor::for_test(outputs).boxed(), + MergeExecutor::for_test( + actor_ctx.id, + outputs, + barrier_test_env.shared_context.clone(), + ) + .boxed(), ); // for global aggregator, we need to sum data and sum row count @@ -192,13 +219,18 @@ async fn test_merger_sum_aggr() { StreamingMetrics::unused().into(), actor_ctx.clone(), expr_context.clone(), - LocalBarrierManager::for_test(), + barrier_test_env + .shared_context + .local_barrier_manager + .clone(), ); handles.push(tokio::spawn(actor.run())); let mut epoch = test_epoch(1); + let b1 = Barrier::new_test_barrier(epoch); + barrier_test_env.inject_barrier(&b1, [], actors.clone()); input - .send(Message::Barrier(Barrier::new_test_barrier(epoch))) + .send(Message::Barrier(b1.into_dispatcher())) .await .unwrap(); epoch.inc_epoch(); @@ -211,17 +243,19 @@ async fn test_merger_sum_aggr() { ); input.send(Message::Chunk(chunk)).await.unwrap(); } + let b = Barrier::new_test_barrier(epoch); + barrier_test_env.inject_barrier(&b, [], actors.clone()); input - .send(Message::Barrier(Barrier::new_test_barrier(epoch))) + .send(Message::Barrier(b.into_dispatcher())) .await .unwrap(); epoch.inc_epoch(); } + let b = Barrier::new_test_barrier(epoch) + .with_mutation(Mutation::Stop(actors.clone().into_iter().collect())); + barrier_test_env.inject_barrier(&b, [], actors); input - .send(Message::Barrier( - Barrier::new_test_barrier(epoch) - .with_mutation(Mutation::Stop([0].into_iter().collect())), - )) + .send(Message::Barrier(b.into_dispatcher())) .await .unwrap(); @@ -241,7 +275,7 @@ struct MockConsumer { } impl StreamConsumer for MockConsumer { - type BarrierStream = impl Stream> + Send; + type BarrierStream = impl Stream> + Send; fn execute(self: Box) -> Self::BarrierStream { let mut input = self.input.execute(); @@ -268,7 +302,7 @@ pub struct SenderConsumer { } impl StreamConsumer for SenderConsumer { - type BarrierStream = impl Stream> + Send; + type BarrierStream = impl Stream> + Send; fn execute(self: Box) -> Self::BarrierStream { let mut input = self.input.execute(); @@ -279,7 +313,16 @@ impl StreamConsumer for SenderConsumer { let msg = item?; let barrier = msg.as_barrier().cloned(); - channel.send(msg).await.expect("failed to send message"); + channel + .send(match msg { + Message::Chunk(chunk) => DispatcherMessage::Chunk(chunk), + Message::Barrier(barrier) => { + DispatcherMessage::Barrier(barrier.into_dispatcher()) + } + Message::Watermark(watermark) => DispatcherMessage::Watermark(watermark), + }) + .await + .expect("failed to send message"); if let Some(barrier) = barrier { yield barrier; diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 19124fe8c22d..60f88e866c6b 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -74,20 +74,32 @@ impl MergeExecutor { } #[cfg(test)] - pub fn for_test(inputs: Vec) -> Self { + pub fn for_test( + actor_id: ActorId, + inputs: Vec, + shared_context: Arc, + ) -> Self { use super::exchange::input::LocalInput; use crate::executor::exchange::input::Input; Self::new( - ActorContext::for_test(114), + ActorContext::for_test(actor_id), 514, 1919, inputs .into_iter() .enumerate() - .map(|(idx, input)| LocalInput::new(input, idx as ActorId).boxed_input()) + .map(|(idx, input)| { + LocalInput::new( + input, + idx as ActorId, + actor_id, + shared_context.local_barrier_manager.clone(), + ) + .boxed_input() + }) .collect(), - SharedContext::for_test().into(), + shared_context, 810, StreamingMetrics::unused().into(), ) @@ -474,10 +486,11 @@ mod tests { use tonic::{Request, Response, Status, Streaming}; use super::*; - use crate::executor::exchange::input::RemoteInput; + use crate::executor::exchange::input::{Input, RemoteInput}; use crate::executor::exchange::permit::channel_for_test; + use crate::executor::{BarrierInner as Barrier, MessageInner as Message}; + use crate::task::barrier_test_utils::LocalBarrierTestEnv; use crate::task::test_utils::helper_make_local_actor; - use crate::task::LocalBarrierManager; fn build_test_chunk(epoch: u64) -> StreamChunk { // The number of items in `ops` is the epoch count. @@ -495,64 +508,80 @@ mod tests { txs.push(tx); rxs.push(rx); } - let merger = MergeExecutor::for_test(rxs); + let barrier_test_env = LocalBarrierTestEnv::for_test().await; + let merger = MergeExecutor::for_test(233, rxs, barrier_test_env.shared_context.clone()); + let actor_id = merger.actor_context.id; let mut handles = Vec::with_capacity(CHANNEL_NUMBER); - let epochs = (10..1000u64).step_by(10).collect_vec(); + let epochs = (10..1000u64) + .step_by(10) + .map(|idx| (idx, test_epoch(idx))) + .collect_vec(); + let mut prev_epoch = 0; + let prev_epoch = &mut prev_epoch; + let barriers: HashMap<_, _> = epochs + .iter() + .map(|(_, epoch)| { + let barrier = Barrier::with_prev_epoch_for_test(*epoch, *prev_epoch); + *prev_epoch = *epoch; + barrier_test_env.inject_barrier(&barrier, [], [actor_id]); + (*epoch, barrier) + }) + .collect(); + let b2 = Barrier::with_prev_epoch_for_test(test_epoch(1000), *prev_epoch) + .with_mutation(Mutation::Stop(HashSet::default())); + barrier_test_env.inject_barrier(&b2, [], [actor_id]); for (tx_id, tx) in txs.into_iter().enumerate() { let epochs = epochs.clone(); + let barriers = barriers.clone(); + let b2 = b2.clone(); let handle = tokio::spawn(async move { - for epoch in epochs { - if epoch % 20 == 0 { - tx.send(Message::Chunk(build_test_chunk(epoch))) + for (idx, epoch) in epochs { + if idx % 20 == 0 { + tx.send(Message::Chunk(build_test_chunk(idx))) .await .unwrap(); } else { tx.send(Message::Watermark(Watermark { - col_idx: (epoch as usize / 20 + tx_id) % CHANNEL_NUMBER, + col_idx: (idx as usize / 20 + tx_id) % CHANNEL_NUMBER, data_type: DataType::Int64, - val: ScalarImpl::Int64(epoch as i64), + val: ScalarImpl::Int64(idx as i64), })) .await .unwrap(); } - tx.send(Message::Barrier(Barrier::new_test_barrier(test_epoch( - epoch, - )))) - .await - .unwrap(); + tx.send(Message::Barrier(barriers[&epoch].clone().into_dispatcher())) + .await + .unwrap(); sleep(Duration::from_millis(1)).await; } - tx.send(Message::Barrier( - Barrier::new_test_barrier(test_epoch(1000)) - .with_mutation(Mutation::Stop(HashSet::default())), - )) - .await - .unwrap(); + tx.send(Message::Barrier(b2.clone().into_dispatcher())) + .await + .unwrap(); }); handles.push(handle); } let mut merger = merger.boxed().execute(); - for epoch in epochs { + for (idx, epoch) in epochs { // expect n chunks - if epoch % 20 == 0 { + if idx % 20 == 0 { for _ in 0..CHANNEL_NUMBER { assert_matches!(merger.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { - assert_eq!(chunk.ops().len() as u64, epoch); + assert_eq!(chunk.ops().len() as u64, idx); }); } - } else if epoch as usize / 20 >= CHANNEL_NUMBER - 1 { + } else if idx as usize / 20 >= CHANNEL_NUMBER - 1 { for _ in 0..CHANNEL_NUMBER { assert_matches!(merger.next().await.unwrap().unwrap(), Message::Watermark(watermark) => { - assert_eq!(watermark.val, ScalarImpl::Int64((epoch - 20 * (CHANNEL_NUMBER as u64 - 1)) as i64)); + assert_eq!(watermark.val, ScalarImpl::Int64((idx - 20 * (CHANNEL_NUMBER as u64 - 1)) as i64)); }); } } // expect a barrier assert_matches!(merger.next().await.unwrap().unwrap(), Message::Barrier(Barrier{epoch:barrier_epoch,mutation:_,..}) => { - assert_eq!(barrier_epoch.curr, test_epoch(epoch)); + assert_eq!(barrier_epoch.curr, epoch); }); } assert_matches!( @@ -572,7 +601,8 @@ mod tests { async fn test_configuration_change() { let actor_id = 233; let (untouched, old, new) = (234, 235, 238); // upstream actors - let ctx = Arc::new(SharedContext::for_test()); + let barrier_test_env = LocalBarrierTestEnv::for_test().await; + let ctx = barrier_test_env.shared_context.clone(); let metrics = Arc::new(StreamingMetrics::unused()); // 1. Register info in context. @@ -628,9 +658,21 @@ mod tests { } }; } + + macro_rules! assert_recv_pending { + () => { + assert!(merge + .next() + .now_or_never() + .flatten() + .transpose() + .unwrap() + .is_none()); + }; + } macro_rules! recv { () => { - merge.next().now_or_never().flatten().transpose().unwrap() + merge.next().await.transpose().unwrap() }; } @@ -638,7 +680,7 @@ mod tests { send!([untouched, old], Message::Chunk(StreamChunk::default())); recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk twice. recv!().unwrap().as_chunk().unwrap(); - assert!(recv!().is_none()); + assert_recv_pending!(); // 4. Send a configuration change barrier. let merge_updates = maplit::hashmap! { @@ -661,23 +703,31 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - send!([untouched, old], Message::Barrier(b1.clone())); - assert!(recv!().is_none()); // We should not receive the barrier, since merger is waiting for the new upstream new. + barrier_test_env.inject_barrier(&b1, [], [actor_id]); + send!( + [untouched, old], + Message::Barrier(b1.clone().into_dispatcher()) + ); + assert_recv_pending!(); // We should not receive the barrier, since merger is waiting for the new upstream new. - send!([new], Message::Barrier(b1.clone())); + send!([new], Message::Barrier(b1.clone().into_dispatcher())); recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier. // 5. Send a chunk. send!([untouched, new], Message::Chunk(StreamChunk::default())); recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk twice, since old is removed. recv!().unwrap().as_chunk().unwrap(); - assert!(recv!().is_none()); + assert_recv_pending!(); } struct FakeExchangeService { rpc_called: Arc, } + fn exchange_client_test_barrier() -> crate::executor::Barrier { + Barrier::new_test_barrier(test_epoch(1)) + } + #[async_trait::async_trait] impl ExchangeService for FakeExchangeService { type GetDataStream = ReceiverStream>; @@ -711,7 +761,7 @@ mod tests { .await .unwrap(); // send barrier - let barrier = Barrier::new_test_barrier(test_epoch(1)); + let barrier = exchange_client_test_barrier(); tx.send(Ok(GetStreamResponse { message: Some(StreamMessage { stream_message: Some( @@ -755,10 +805,12 @@ mod tests { sleep(Duration::from_secs(1)).await; assert!(server_run.load(Ordering::SeqCst)); + let test_env = LocalBarrierTestEnv::for_test().await; + let remote_input = { let pool = ComputeClientPool::default(); RemoteInput::new( - LocalBarrierManager::for_test(), + test_env.shared_context.local_barrier_manager.clone(), pool, addr.into(), (0, 0), @@ -768,6 +820,12 @@ mod tests { ) }; + test_env.inject_barrier( + &exchange_client_test_barrier(), + [], + [remote_input.actor_id()], + ); + pin_mut!(remote_input); assert_matches!(remote_input.next().await.unwrap().unwrap(), Message::Chunk(chunk) => { diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index dc63e62b1d58..a1ef0691d14e 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -41,9 +41,9 @@ use risingwave_pb::stream_plan::stream_message::StreamMessage; use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate}; use risingwave_pb::stream_plan::{ BarrierMutation, CombinedMutation, CreateSubscriptionMutation, Dispatchers, - DropSubscriptionMutation, PauseMutation, PbAddMutation, PbBarrier, PbDispatcher, - PbStreamMessage, PbUpdateMutation, PbWatermark, ResumeMutation, SourceChangeSplitMutation, - StopMutation, ThrottleMutation, + DropSubscriptionMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation, + PbDispatcher, PbStreamMessage, PbUpdateMutation, PbWatermark, ResumeMutation, + SourceChangeSplitMutation, StopMutation, ThrottleMutation, }; use smallvec::SmallVec; @@ -297,20 +297,28 @@ pub enum Mutation { }, } +/// The generic type `M` is the mutation type of the barrier. +/// +/// For barrier of in the dispatcher, `M` is `()`, which means the mutation is erased. +/// For barrier flowing within the streaming actor, `M` is the normal `BarrierMutationType`. #[derive(Debug, Clone)] -pub struct Barrier { +pub struct BarrierInner { pub epoch: EpochPair, - pub mutation: Option>, + pub mutation: M, pub kind: BarrierKind, /// Tracing context for the **current** epoch of this barrier. - tracing_context: TracingContext, + pub tracing_context: TracingContext, /// The actors that this barrier has passed locally. Used for debugging only. pub passed_actors: Vec, } -impl Barrier { +pub type BarrierMutationType = Option>; +pub type Barrier = BarrierInner; +pub type DispatcherBarrier = BarrierInner<()>; + +impl BarrierInner { /// Create a plain barrier. pub fn new_test_barrier(epoch: u64) -> Self { Self { @@ -331,6 +339,18 @@ impl Barrier { passed_actors: Default::default(), } } +} + +impl Barrier { + pub fn into_dispatcher(self) -> DispatcherBarrier { + DispatcherBarrier { + epoch: self.epoch, + mutation: (), + kind: self.kind, + tracing_context: self.tracing_context, + passed_actors: self.passed_actors, + } + } #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { @@ -493,7 +513,7 @@ impl Barrier { } } -impl PartialEq for Barrier { +impl PartialEq for BarrierInner { fn eq(&self, other: &Self) -> bool { self.epoch == other.epoch && self.mutation == other.mutation } @@ -751,50 +771,72 @@ impl Mutation { } } -impl Barrier { - pub fn to_protobuf(&self) -> PbBarrier { - let Barrier { +impl BarrierInner { + fn to_protobuf_inner(&self, barrier_fn: impl FnOnce(&M) -> Option) -> PbBarrier { + let Self { epoch, mutation, kind, passed_actors, tracing_context, .. - } = self.clone(); + } = self; PbBarrier { epoch: Some(PbEpoch { curr: epoch.curr, prev: epoch.prev, }), - mutation: mutation.map(|m| BarrierMutation { - mutation: Some(m.to_protobuf()), + mutation: Some(PbBarrierMutation { + mutation: barrier_fn(mutation), }), tracing_context: tracing_context.to_protobuf(), - kind: kind as _, - passed_actors, + kind: *kind as _, + passed_actors: passed_actors.clone(), } } - pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult { - let mutation = prost - .mutation - .as_ref() - .map(|m| Mutation::from_protobuf(m.mutation.as_ref().unwrap())) - .transpose()? - .map(Arc::new); + fn from_protobuf_inner( + prost: &PbBarrier, + mutation_from_pb: impl FnOnce(Option<&PbMutation>) -> StreamExecutorResult, + ) -> StreamExecutorResult { let epoch = prost.get_epoch()?; - Ok(Barrier { + Ok(Self { kind: prost.kind(), epoch: EpochPair::new(epoch.curr, epoch.prev), - mutation, + mutation: mutation_from_pb( + prost + .mutation + .as_ref() + .and_then(|mutation| mutation.mutation.as_ref()), + )?, passed_actors: prost.get_passed_actors().clone(), tracing_context: TracingContext::from_protobuf(&prost.tracing_context), }) } } +impl DispatcherBarrier { + pub fn to_protobuf(&self) -> PbBarrier { + self.to_protobuf_inner(|_| None) + } +} + +impl Barrier { + pub fn to_protobuf(&self) -> PbBarrier { + self.to_protobuf_inner(|mutation| mutation.as_ref().map(|mutation| mutation.to_protobuf())) + } + + pub fn from_protobuf(prost: &PbBarrier) -> StreamExecutorResult { + Self::from_protobuf_inner(prost, |mutation| { + mutation + .map(|m| Mutation::from_protobuf(m).map(Arc::new)) + .transpose() + }) + } +} + #[derive(Debug, PartialEq, Eq, Clone)] pub struct Watermark { pub col_idx: usize, @@ -871,12 +913,15 @@ impl Watermark { } #[derive(Debug, EnumAsInner, PartialEq, Clone)] -pub enum Message { +pub enum MessageInner { Chunk(StreamChunk), - Barrier(Barrier), + Barrier(BarrierInner), Watermark(Watermark), } +pub type Message = MessageInner; +pub type DispatcherMessage = MessageInner<()>; + impl From for Message { fn from(chunk: StreamChunk) -> Self { Message::Chunk(chunk) @@ -910,7 +955,9 @@ impl Message { }) if mutation.as_ref().unwrap().is_stop() ) } +} +impl DispatcherMessage { pub fn to_protobuf(&self) -> PbStreamMessage { let prost = match self { Self::Chunk(stream_chunk) => { @@ -927,10 +974,21 @@ impl Message { pub fn from_protobuf(prost: &PbStreamMessage) -> StreamExecutorResult { let res = match prost.get_stream_message()? { - StreamMessage::StreamChunk(chunk) => Message::Chunk(StreamChunk::from_protobuf(chunk)?), - StreamMessage::Barrier(barrier) => Message::Barrier(Barrier::from_protobuf(barrier)?), + StreamMessage::StreamChunk(chunk) => Self::Chunk(StreamChunk::from_protobuf(chunk)?), + StreamMessage::Barrier(barrier) => Self::Barrier( + DispatcherBarrier::from_protobuf_inner(barrier, |mutation| { + if mutation.is_some() { + if cfg!(debug_assertions) { + panic!("should not receive message of barrier with mutation"); + } else { + warn!(?barrier, "receive message of barrier with mutation"); + } + } + Ok(()) + })?, + ), StreamMessage::Watermark(watermark) => { - Message::Watermark(Watermark::from_protobuf(watermark)?) + Self::Watermark(Watermark::from_protobuf(watermark)?) } }; Ok(res) diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs index effe773d5459..58700d2a1350 100644 --- a/src/stream/src/executor/receiver.rs +++ b/src/stream/src/executor/receiver.rs @@ -72,16 +72,26 @@ impl ReceiverExecutor { } #[cfg(test)] - pub fn for_test(input: super::exchange::permit::Receiver) -> Self { + pub fn for_test( + actor_id: ActorId, + input: super::exchange::permit::Receiver, + shared_context: Arc, + ) -> Self { use super::exchange::input::LocalInput; use crate::executor::exchange::input::Input; Self::new( - ActorContext::for_test(114), + ActorContext::for_test(actor_id), 514, 1919, - LocalInput::new(input, 0).boxed_input(), - SharedContext::for_test().into(), + LocalInput::new( + input, + 0, + actor_id, + shared_context.local_barrier_manager.clone(), + ) + .boxed_input(), + shared_context, 810, StreamingMetrics::unused().into(), ) @@ -194,7 +204,8 @@ mod tests { use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use super::*; - use crate::executor::UpdateMutation; + use crate::executor::{MessageInner as Message, UpdateMutation}; + use crate::task::barrier_test_utils::LocalBarrierTestEnv; use crate::task::test_utils::helper_make_local_actor; #[tokio::test] @@ -202,7 +213,9 @@ mod tests { let actor_id = 233; let (old, new) = (114, 514); // old and new upstream actor id - let ctx = Arc::new(SharedContext::for_test()); + let barrier_test_env = LocalBarrierTestEnv::for_test().await; + + let ctx = barrier_test_env.shared_context.clone(); let metrics = Arc::new(StreamingMetrics::unused()); // 1. Register info in context. @@ -261,21 +274,28 @@ mod tests { } }; } - macro_rules! recv { + macro_rules! assert_recv_pending { () => { - receiver + assert!(receiver .next() .now_or_never() .flatten() .transpose() .unwrap() + .is_none()); + }; + } + + macro_rules! recv { + () => { + receiver.next().await.transpose().unwrap() }; } // 3. Send a chunk. send!([old], Message::Chunk(StreamChunk::default())); recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk. - assert!(recv!().is_none()); + assert_recv_pending!(); // 4. Send a configuration change barrier. let merge_updates = maplit::hashmap! { @@ -298,19 +318,22 @@ mod tests { actor_new_dispatchers: Default::default(), }, )); - send!([new], Message::Barrier(b1.clone())); - assert!(recv!().is_none()); // We should not receive the barrier, as new is not the upstream. - send!([old], Message::Barrier(b1.clone())); + barrier_test_env.inject_barrier(&b1, [], [actor_id]); + + send!([new], Message::Barrier(b1.clone().into_dispatcher())); + assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream. + + send!([old], Message::Barrier(b1.clone().into_dispatcher())); recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier. // 5. Send a chunk to the removed upstream. send_error!([old], Message::Chunk(StreamChunk::default())); - assert!(recv!().is_none()); + assert_recv_pending!(); // 6. Send a chunk to the added upstream. send!([new], Message::Chunk(StreamChunk::default())); recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk. - assert!(recv!().is_none()); + assert_recv_pending!(); } } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b0ce6ad30540..1fd932d96675 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -63,7 +63,10 @@ use risingwave_pb::stream_service::{ use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{Actor, Barrier, DispatchExecutor, Mutation, StreamExecutorError}; +use crate::executor::{ + Actor, Barrier, BarrierInner, DispatchExecutor, DispatcherBarrier, Mutation, + StreamExecutorError, +}; use crate::task::barrier_manager::managed_state::ManagedBarrierStateDebugInfo; use crate::task::barrier_manager::progress::BackfillState; @@ -182,12 +185,12 @@ impl CreateActorContext { } } -pub(super) type SubscribeMutationItem = (u64, Option>); +pub(crate) type SubscribeMutationItem = (u64, Option>); pub(super) enum LocalBarrierEvent { ReportActorCollected { actor_id: ActorId, - barrier: Barrier, + epoch: EpochPair, }, ReportCreateProgress { current_epoch: u64, @@ -508,8 +511,8 @@ impl LocalBarrierWorker { fn handle_barrier_event(&mut self, event: LocalBarrierEvent) { match event { - LocalBarrierEvent::ReportActorCollected { actor_id, barrier } => { - self.collect(actor_id, &barrier) + LocalBarrierEvent::ReportActorCollected { actor_id, epoch } => { + self.collect(actor_id, epoch) } LocalBarrierEvent::ReportCreateProgress { current_epoch, @@ -764,8 +767,8 @@ impl LocalBarrierWorker { /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. - fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { - self.state.collect(actor_id, barrier) + fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) { + self.state.collect(actor_id, epoch) } /// When a actor exit unexpectedly, the error is reported using this function. The control stream @@ -904,10 +907,10 @@ impl LocalBarrierManager { /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. - pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { + pub fn collect(&self, actor_id: ActorId, barrier: &BarrierInner) { self.send_event(LocalBarrierEvent::ReportActorCollected { actor_id, - barrier: barrier.clone(), + epoch: barrier.epoch, }) } @@ -923,7 +926,7 @@ impl LocalBarrierManager { pub fn subscribe_barrier_mutation( &self, actor_id: ActorId, - first_barrier: &Barrier, + first_barrier: &DispatcherBarrier, ) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded_channel(); self.send_event(LocalBarrierEvent::SubscribeBarrierMutation { @@ -996,7 +999,7 @@ pub fn try_find_root_actor_failure<'a>( #[cfg(test)] impl LocalBarrierManager { - pub(super) fn spawn_for_test() -> EventSender { + fn spawn_for_test() -> EventSender { use std::sync::atomic::AtomicU64; let (tx, rx) = unbounded_channel(); let _join_handle = LocalBarrierWorker::spawn( @@ -1028,3 +1031,85 @@ impl LocalBarrierManager { rx.await.unwrap() } } + +#[cfg(test)] +pub(crate) mod barrier_test_utils { + use std::sync::Arc; + + use assert_matches::assert_matches; + use futures::StreamExt; + use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest; + use risingwave_pb::stream_service::{ + streaming_control_stream_request, streaming_control_stream_response, InjectBarrierRequest, + StreamingControlStreamRequest, StreamingControlStreamResponse, + }; + use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + use tokio_stream::wrappers::UnboundedReceiverStream; + use tonic::Status; + + use crate::executor::Barrier; + use crate::task::barrier_manager::{ControlStreamHandle, EventSender, LocalActorOperation}; + use crate::task::{ActorId, LocalBarrierManager, SharedContext}; + + pub(crate) struct LocalBarrierTestEnv { + pub shared_context: Arc, + pub(super) actor_op_tx: EventSender, + pub request_tx: UnboundedSender>, + pub response_rx: UnboundedReceiver>, + } + + impl LocalBarrierTestEnv { + pub(crate) async fn for_test() -> Self { + let actor_op_tx = LocalBarrierManager::spawn_for_test(); + + let (request_tx, request_rx) = unbounded_channel(); + let (response_tx, mut response_rx) = unbounded_channel(); + + actor_op_tx.send_event(LocalActorOperation::NewControlStream { + handle: ControlStreamHandle::new( + response_tx, + UnboundedReceiverStream::new(request_rx).boxed(), + ), + init_request: InitRequest { prev_epoch: 0 }, + }); + + assert_matches!( + response_rx.recv().await.unwrap().unwrap().response.unwrap(), + streaming_control_stream_response::Response::Init(_) + ); + + let shared_context = actor_op_tx + .send_and_await(LocalActorOperation::GetCurrentSharedContext) + .await + .unwrap(); + + Self { + shared_context, + actor_op_tx, + request_tx, + response_rx, + } + } + + pub(crate) fn inject_barrier( + &self, + barrier: &Barrier, + actor_to_send: impl IntoIterator, + actor_to_collect: impl IntoIterator, + ) { + self.request_tx + .send(Ok(StreamingControlStreamRequest { + request: Some(streaming_control_stream_request::Request::InjectBarrier( + InjectBarrierRequest { + request_id: "".to_string(), + barrier: Some(barrier.to_protobuf()), + actor_ids_to_send: actor_to_send.into_iter().collect(), + actor_ids_to_collect: actor_to_collect.into_iter().collect(), + table_ids_to_sync: vec![], + }, + )), + })) + .unwrap(); + } + } +} diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index f4a3fb31c03c..ae1a576fe7c4 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -476,21 +476,21 @@ impl ManagedBarrierState { } /// Collect a `barrier` from the actor with `actor_id`. - pub(super) fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { + pub(super) fn collect(&mut self, actor_id: ActorId, epoch: EpochPair) { tracing::debug!( target: "events::stream::barrier::manager::collect", - epoch = ?barrier.epoch, actor_id, state = ?self.epoch_barrier_state_map, + ?epoch, actor_id, state = ?self.epoch_barrier_state_map, "collect_barrier", ); - match self.epoch_barrier_state_map.get_mut(&barrier.epoch.prev) { + match self.epoch_barrier_state_map.get_mut(&epoch.prev) { None => { // If the barrier's state is stashed, this occurs exclusively in scenarios where the barrier has not been // injected by the barrier manager, or the barrier message is blocked at the `RemoteInput` side waiting for injection. // Given these conditions, it's inconceivable for an actor to attempt collect at this point. panic!( "cannot collect new actor barrier {:?} at current state: None", - barrier.epoch, + epoch, ) } Some(&mut BarrierState { @@ -506,15 +506,15 @@ impl ManagedBarrierState { assert!( exist, "the actor doesn't exist. actor_id: {:?}, curr_epoch: {:?}", - actor_id, barrier.epoch.curr + actor_id, epoch.curr ); - assert_eq!(curr_epoch, barrier.epoch.curr); - self.may_have_collected_all(barrier.epoch.prev); + assert_eq!(curr_epoch, epoch.curr); + self.may_have_collected_all(epoch.prev); } Some(BarrierState { inner, .. }) => { panic!( "cannot collect new actor barrier {:?} at current state: {:?}", - barrier.epoch, inner + epoch, inner ) } } @@ -723,8 +723,8 @@ mod tests { managed_barrier_state.transform_to_issued(&barrier1, actor_ids_to_collect1, HashSet::new()); managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new()); managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new()); - managed_barrier_state.collect(1, &barrier1); - managed_barrier_state.collect(2, &barrier1); + managed_barrier_state.collect(1, barrier1.epoch); + managed_barrier_state.collect(2, barrier1.epoch); assert_eq!( managed_barrier_state.pop_next_completed_epoch().await, test_epoch(0) @@ -737,9 +737,9 @@ mod tests { .0, &test_epoch(1) ); - managed_barrier_state.collect(1, &barrier2); - managed_barrier_state.collect(1, &barrier3); - managed_barrier_state.collect(2, &barrier2); + managed_barrier_state.collect(1, barrier2.epoch); + managed_barrier_state.collect(1, barrier3.epoch); + managed_barrier_state.collect(2, barrier2.epoch); assert_eq!( managed_barrier_state.pop_next_completed_epoch().await, test_epoch(1) @@ -752,8 +752,8 @@ mod tests { .0, { &test_epoch(2) } ); - managed_barrier_state.collect(2, &barrier3); - managed_barrier_state.collect(3, &barrier3); + managed_barrier_state.collect(2, barrier3.epoch); + managed_barrier_state.collect(3, barrier3.epoch); assert_eq!( managed_barrier_state.pop_next_completed_epoch().await, test_epoch(2) @@ -774,12 +774,12 @@ mod tests { managed_barrier_state.transform_to_issued(&barrier2, actor_ids_to_collect2, HashSet::new()); managed_barrier_state.transform_to_issued(&barrier3, actor_ids_to_collect3, HashSet::new()); - managed_barrier_state.collect(1, &barrier1); - managed_barrier_state.collect(1, &barrier2); - managed_barrier_state.collect(1, &barrier3); - managed_barrier_state.collect(2, &barrier1); - managed_barrier_state.collect(2, &barrier2); - managed_barrier_state.collect(2, &barrier3); + managed_barrier_state.collect(1, barrier1.epoch); + managed_barrier_state.collect(1, barrier2.epoch); + managed_barrier_state.collect(1, barrier3.epoch); + managed_barrier_state.collect(2, barrier1.epoch); + managed_barrier_state.collect(2, barrier2.epoch); + managed_barrier_state.collect(2, barrier3.epoch); assert_eq!( managed_barrier_state .epoch_barrier_state_map @@ -788,8 +788,8 @@ mod tests { .0, &0 ); - managed_barrier_state.collect(3, &barrier1); - managed_barrier_state.collect(3, &barrier2); + managed_barrier_state.collect(3, barrier1.epoch); + managed_barrier_state.collect(3, barrier2.epoch); assert_eq!( managed_barrier_state .epoch_barrier_state_map @@ -798,7 +798,7 @@ mod tests { .0, &0 ); - managed_barrier_state.collect(4, &barrier1); + managed_barrier_state.collect(4, barrier1.epoch); assert_eq!( managed_barrier_state.pop_next_completed_epoch().await, test_epoch(0) diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index 60b3867a1a0c..0d1bb159ea5f 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -17,44 +17,21 @@ use std::iter::once; use std::pin::pin; use std::task::Poll; -use assert_matches::assert_matches; use futures::future::join_all; use futures::FutureExt; use risingwave_common::util::epoch::test_epoch; -use risingwave_pb::stream_service::{streaming_control_stream_request, InjectBarrierRequest}; -use tokio_stream::wrappers::UnboundedReceiverStream; use super::*; +use crate::task::barrier_test_utils::LocalBarrierTestEnv; #[tokio::test] async fn test_managed_barrier_collection() -> StreamResult<()> { - let actor_op_tx = LocalBarrierManager::spawn_for_test(); + let mut test_env = LocalBarrierTestEnv::for_test().await; - let (request_tx, request_rx) = unbounded_channel(); - let (response_tx, mut response_rx) = unbounded_channel(); - - actor_op_tx.send_event(LocalActorOperation::NewControlStream { - handle: ControlStreamHandle::new( - response_tx, - UnboundedReceiverStream::new(request_rx).boxed(), - ), - init_request: InitRequest { prev_epoch: 0 }, - }); - - assert_matches!( - response_rx.recv().await.unwrap().unwrap().response.unwrap(), - streaming_control_stream_response::Response::Init(_) - ); - - let context = actor_op_tx - .send_and_await(LocalActorOperation::GetCurrentSharedContext) - .await - .unwrap(); - - let manager = &context.local_barrier_manager; + let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let actor_op_tx = &actor_op_tx; + let actor_op_tx = &test_env.actor_op_tx; async move { let (barrier_tx, barrier_rx) = unbounded_channel(); actor_op_tx @@ -79,19 +56,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let barrier = Barrier::new_test_barrier(curr_epoch); let epoch = barrier.epoch.prev; - request_tx - .send(Ok(StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::InjectBarrier( - InjectBarrierRequest { - request_id: "".to_string(), - barrier: Some(barrier.to_protobuf()), - actor_ids_to_send: actor_ids.clone(), - actor_ids_to_collect: actor_ids, - table_ids_to_sync: vec![], - }, - )), - })) - .unwrap(); + test_env.inject_barrier(&barrier, actor_ids.clone(), actor_ids); // Collect barriers from actors let collected_barriers = join_all(rxs.iter_mut().map(|(actor_id, rx)| async move { @@ -101,7 +66,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { })) .await; - let mut await_epoch_future = pin!(response_rx.recv().map(|result| { + let mut await_epoch_future = pin!(test_env.response_rx.recv().map(|result| { let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); let resp = resp.response.unwrap(); match resp { @@ -124,33 +89,12 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { #[tokio::test] async fn test_managed_barrier_collection_separately() -> StreamResult<()> { - let actor_op_tx = LocalBarrierManager::spawn_for_test(); - - let (request_tx, request_rx) = unbounded_channel(); - let (response_tx, mut response_rx) = unbounded_channel(); + let mut test_env = LocalBarrierTestEnv::for_test().await; - actor_op_tx.send_event(LocalActorOperation::NewControlStream { - handle: ControlStreamHandle::new( - response_tx, - UnboundedReceiverStream::new(request_rx).boxed(), - ), - init_request: InitRequest { prev_epoch: 0 }, - }); - - assert_matches!( - response_rx.recv().await.unwrap().unwrap().response.unwrap(), - streaming_control_stream_response::Response::Init(_) - ); - - let context = actor_op_tx - .send_and_await(LocalActorOperation::GetCurrentSharedContext) - .await - .unwrap(); - - let manager = &context.local_barrier_manager; + let manager = &test_env.shared_context.local_barrier_manager; let register_sender = |actor_id: u32| { - let actor_op_tx = &actor_op_tx; + let actor_op_tx = &test_env.actor_op_tx; async move { let (barrier_tx, barrier_rx) = unbounded_channel(); actor_op_tx @@ -179,27 +123,16 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { // Prepare the barrier let curr_epoch = test_epoch(2); - let barrier = Barrier::new_test_barrier(curr_epoch); + let barrier = Barrier::new_test_barrier(curr_epoch).with_stop(); - let mut mutation_subscriber = manager.subscribe_barrier_mutation(extra_actor_id, &barrier); + let mut mutation_subscriber = + manager.subscribe_barrier_mutation(extra_actor_id, &barrier.clone().into_dispatcher()); // Read the mutation after receiving the barrier from remote input. let mut mutation_reader = pin!(mutation_subscriber.recv()); assert!(poll_fn(|cx| Poll::Ready(mutation_reader.as_mut().poll(cx).is_pending())).await); - request_tx - .send(Ok(StreamingControlStreamRequest { - request: Some(streaming_control_stream_request::Request::InjectBarrier( - InjectBarrierRequest { - request_id: "".to_string(), - barrier: Some(barrier.to_protobuf()), - actor_ids_to_send, - actor_ids_to_collect, - table_ids_to_sync: vec![], - }, - )), - })) - .unwrap(); + test_env.inject_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect); let (epoch, mutation) = mutation_reader.await.unwrap(); assert_eq!((epoch, &mutation), (barrier.epoch.prev, &barrier.mutation)); @@ -215,7 +148,7 @@ async fn test_managed_barrier_collection_separately() -> StreamResult<()> { })) .await; - let mut await_epoch_future = pin!(response_rx.recv().map(|result| { + let mut await_epoch_future = pin!(test_env.response_rx.recv().map(|result| { let resp: StreamingControlStreamResponse = result.unwrap().unwrap(); let resp = resp.response.unwrap(); match resp { diff --git a/src/stream/src/task/env.rs b/src/stream/src/task/env.rs index a47eb8279224..75c64f9ae8bf 100644 --- a/src/stream/src/task/env.rs +++ b/src/stream/src/task/env.rs @@ -89,7 +89,7 @@ impl StreamEnvironment { use risingwave_dml::dml_manager::DmlManager; use risingwave_storage::monitor::MonitoredStorageMetrics; StreamEnvironment { - server_addr: "127.0.0.1:5688".parse().unwrap(), + server_addr: "127.0.0.1:2333".parse().unwrap(), config: Arc::new(StreamingConfig::default()), worker_id: WorkerNodeId::default(), state_store: StateStoreImpl::shared_in_memory_store(Arc::new( diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 77f21b52406f..9a337e1dc0ab 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -28,6 +28,7 @@ mod barrier_manager; mod env; mod stream_manager; +pub(crate) use barrier_manager::SubscribeMutationItem; pub use barrier_manager::*; pub use env::*; pub use stream_manager::*;