Skip to content

Commit

Permalink
refactor: actor wait barrier manager inject barrier (#17613)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jul 19, 2024
1 parent dba2c53 commit bd3b9a1
Show file tree
Hide file tree
Showing 17 changed files with 588 additions and 321 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BatchEnvironment {
BatchManagerMetrics::for_test(),
u64::MAX,
)),
server_addr: "127.0.0.1:5688".parse().unwrap(),
server_addr: "127.0.0.1:2333".parse().unwrap(),
config: Arc::new(BatchConfig::default()),
worker_id: WorkerNodeId::default(),
state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
Expand Down
13 changes: 3 additions & 10 deletions src/compute/src/rpc/service/exchange_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_pb::task_service::{
permits, GetDataRequest, GetDataResponse, GetStreamRequest, GetStreamResponse, PbPermits,
};
use risingwave_stream::executor::exchange::permit::{MessageWithPermits, Receiver};
use risingwave_stream::executor::Message;
use risingwave_stream::executor::DispatcherMessage;
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -169,21 +169,14 @@ impl ExchangeServiceImpl {
Either::Left(permits_to_add) => {
permits.add_permits(permits_to_add);
}
Either::Right(MessageWithPermits {
mut message,
permits,
}) => {
// Erase the mutation of the barrier to avoid decoding in remote side.
if let Message::Barrier(barrier) = &mut message {
barrier.mutation = None;
}
Either::Right(MessageWithPermits { message, permits }) => {
let proto = message.to_protobuf();
// forward the acquired permit to the downstream
let response = GetStreamResponse {
message: Some(proto),
permits: Some(PbPermits { value: permits }),
};
let bytes = Message::get_encoded_len(&response);
let bytes = DispatcherMessage::get_encoded_len(&response);

yield response;

Expand Down
18 changes: 3 additions & 15 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);

self.recovery(paused_reason).instrument(span).await;
self.recovery(paused_reason, None).instrument(span).await;
}

self.context.set_status(BarrierManagerStatus::Running);
Expand Down Expand Up @@ -789,12 +789,6 @@ impl GlobalBarrierManager {
}

async fn failure_recovery(&mut self, err: MetaError) {
self.context
.tracker
.lock()
.await
.abort_all(&err, &self.context)
.await;
self.checkpoint_control.clear_on_err(&err).await;
self.pending_non_checkpoint_barriers.clear();

Expand All @@ -813,7 +807,7 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
self.recovery(None, Some(err)).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
Expand All @@ -822,12 +816,6 @@ impl GlobalBarrierManager {

async fn adhoc_recovery(&mut self) {
let err = MetaErrorInner::AdhocRecovery.into();
self.context
.tracker
.lock()
.await
.abort_all(&err, &self.context)
.await;
self.checkpoint_control.clear_on_err(&err).await;

self.context
Expand All @@ -842,7 +830,7 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
self.recovery(None, Some(err)).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running);
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::controller::catalog::ReleaseContext;
use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
use crate::{model, MetaResult};
use crate::{model, MetaError, MetaResult};

impl GlobalBarrierManager {
// Retry base interval in milliseconds.
Expand Down Expand Up @@ -224,7 +224,7 @@ impl GlobalBarrierManager {
/// the cluster or `risectl` command. Used for debugging purpose.
///
/// Returns the new state of the barrier manager after recovery.
pub async fn recovery(&mut self, paused_reason: Option<PausedReason>) {
pub async fn recovery(&mut self, paused_reason: Option<PausedReason>, err: Option<MetaError>) {
let prev_epoch = TracedEpoch::new(
self.context
.hummock_manager
Expand All @@ -246,6 +246,15 @@ impl GlobalBarrierManager {
let new_state = tokio_retry::Retry::spawn(retry_strategy, || {
async {
let recovery_result: MetaResult<_> = try {
if let Some(err) = &err {
self.context
.tracker
.lock()
.await
.abort_all(err, &self.context)
.await;
}

self.context
.clean_dirty_streaming_jobs()
.await
Expand Down
96 changes: 66 additions & 30 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use tokio::time::Instant;
use tracing::{event, Instrument};

use super::exchange::output::{new_output, BoxedOutput};
use super::{AddMutation, TroublemakerExecutor, UpdateMutation};
use super::{
AddMutation, DispatcherBarrier, DispatcherMessage, TroublemakerExecutor, UpdateMutation,
};
use crate::executor::prelude::*;
use crate::executor::StreamConsumer;
use crate::task::{DispatcherId, SharedContext};
Expand Down Expand Up @@ -142,7 +144,9 @@ impl DispatchExecutorInner {
.map(Ok)
.try_for_each_concurrent(limit, |dispatcher| async {
let start_time = Instant::now();
dispatcher.dispatch_barrier(barrier.clone()).await?;
dispatcher
.dispatch_barrier(barrier.clone().into_dispatcher())
.await?;
dispatcher
.actor_output_buffer_blocking_duration_ns
.inc_by(start_time.elapsed().as_nanos() as u64);
Expand Down Expand Up @@ -497,7 +501,7 @@ macro_rules! impl_dispatcher {
}
}

pub async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> {
pub async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> {
match self {
$( Self::$variant_name(inner) => inner.dispatch_barrier(barrier).await, )*
}
Expand Down Expand Up @@ -561,7 +565,7 @@ pub trait Dispatcher: Debug + 'static {
/// Dispatch a data chunk to downstream actors.
fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>;
/// Dispatch a barrier to downstream actors, generally by broadcasting it.
fn dispatch_barrier(&mut self, barrier: Barrier) -> impl DispatchFuture<'_>;
fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> impl DispatchFuture<'_>;
/// Dispatch a watermark to downstream actors, generally by broadcasting it.
fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>;

Expand Down Expand Up @@ -591,7 +595,7 @@ pub trait Dispatcher: Debug + 'static {
/// always unlimited.
async fn broadcast_concurrent(
outputs: impl IntoIterator<Item = &'_ mut BoxedOutput>,
message: Message,
message: DispatcherMessage,
) -> StreamResult<()> {
futures::future::try_join_all(
outputs
Expand Down Expand Up @@ -637,21 +641,24 @@ impl Dispatcher for RoundRobinDataDispatcher {
chunk.project(&self.output_indices)
};

self.outputs[self.cur].send(Message::Chunk(chunk)).await?;
self.outputs[self.cur]
.send(DispatcherMessage::Chunk(chunk))
.await?;
self.cur += 1;
self.cur %= self.outputs.len();
Ok(())
}

async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> {
async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> {
// always broadcast barrier
broadcast_concurrent(&mut self.outputs, Message::Barrier(barrier)).await
broadcast_concurrent(&mut self.outputs, DispatcherMessage::Barrier(barrier)).await
}

async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
// always broadcast watermark
broadcast_concurrent(&mut self.outputs, Message::Watermark(watermark)).await?;
broadcast_concurrent(&mut self.outputs, DispatcherMessage::Watermark(watermark))
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -725,15 +732,16 @@ impl Dispatcher for HashDataDispatcher {
self.outputs.extend(outputs);
}

async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> {
async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> {
// always broadcast barrier
broadcast_concurrent(&mut self.outputs, Message::Barrier(barrier)).await
broadcast_concurrent(&mut self.outputs, DispatcherMessage::Barrier(barrier)).await
}

async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
// always broadcast watermark
broadcast_concurrent(&mut self.outputs, Message::Watermark(watermark)).await?;
broadcast_concurrent(&mut self.outputs, DispatcherMessage::Watermark(watermark))
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -818,7 +826,9 @@ impl Dispatcher for HashDataDispatcher {
"send = \n{:#?}",
new_stream_chunk
);
output.send(Message::Chunk(new_stream_chunk)).await?;
output
.send(DispatcherMessage::Chunk(new_stream_chunk))
.await?;
}
StreamResult::Ok(())
}),
Expand Down Expand Up @@ -888,18 +898,26 @@ impl Dispatcher for BroadcastDispatcher {
} else {
chunk.project(&self.output_indices)
};
broadcast_concurrent(self.outputs.values_mut(), Message::Chunk(chunk)).await
broadcast_concurrent(self.outputs.values_mut(), DispatcherMessage::Chunk(chunk)).await
}

async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> {
async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> {
// always broadcast barrier
broadcast_concurrent(self.outputs.values_mut(), Message::Barrier(barrier)).await
broadcast_concurrent(
self.outputs.values_mut(),
DispatcherMessage::Barrier(barrier),
)
.await
}

async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
// always broadcast watermark
broadcast_concurrent(self.outputs.values_mut(), Message::Watermark(watermark)).await?;
broadcast_concurrent(
self.outputs.values_mut(),
DispatcherMessage::Watermark(watermark),
)
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -970,10 +988,12 @@ impl Dispatcher for SimpleDispatcher {
assert!(self.output.len() <= 2);
}

async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> {
async fn dispatch_barrier(&mut self, barrier: DispatcherBarrier) -> StreamResult<()> {
// Only barrier is allowed to be dispatched to multiple outputs during migration.
for output in &mut self.output {
output.send(Message::Barrier(barrier.clone())).await?;
output
.send(DispatcherMessage::Barrier(barrier.clone()))
.await?;
}
Ok(())
}
Expand All @@ -992,7 +1012,7 @@ impl Dispatcher for SimpleDispatcher {
} else {
chunk.project(&self.output_indices)
};
output.send(Message::Chunk(chunk)).await
output.send(DispatcherMessage::Chunk(chunk)).await
}

async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> {
Expand All @@ -1003,7 +1023,7 @@ impl Dispatcher for SimpleDispatcher {
.expect("expect exactly one output");

if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) {
output.send(Message::Watermark(watermark)).await?;
output.send(DispatcherMessage::Watermark(watermark)).await?;
}
Ok(())
}
Expand Down Expand Up @@ -1044,23 +1064,25 @@ mod tests {
use crate::executor::exchange::output::Output;
use crate::executor::exchange::permit::channel_for_test;
use crate::executor::receiver::ReceiverExecutor;
use crate::executor::{BarrierInner as Barrier, MessageInner as Message};
use crate::task::barrier_test_utils::LocalBarrierTestEnv;
use crate::task::test_utils::helper_make_local_actor;

#[derive(Debug)]
pub struct MockOutput {
actor_id: ActorId,
data: Arc<Mutex<Vec<Message>>>,
data: Arc<Mutex<Vec<DispatcherMessage>>>,
}

impl MockOutput {
pub fn new(actor_id: ActorId, data: Arc<Mutex<Vec<Message>>>) -> Self {
pub fn new(actor_id: ActorId, data: Arc<Mutex<Vec<DispatcherMessage>>>) -> Self {
Self { actor_id, data }
}
}

#[async_trait]
impl Output for MockOutput {
async fn send(&mut self, message: Message) -> StreamResult<()> {
async fn send(&mut self, message: DispatcherMessage) -> StreamResult<()> {
self.data.lock().unwrap().push(message);
Ok(())
}
Expand Down Expand Up @@ -1154,7 +1176,11 @@ mod tests {
let (tx, rx) = channel_for_test();
let actor_id = 233;
let fragment_id = 666;
let input = Executor::new(Default::default(), ReceiverExecutor::for_test(rx).boxed());
let barrier_test_env = LocalBarrierTestEnv::for_test().await;
let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(),
);
let ctx = Arc::new(SharedContext::for_test());
let metrics = Arc::new(StreamingMetrics::unused());

Expand Down Expand Up @@ -1245,7 +1271,10 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
tx.send(Message::Barrier(b1)).await.unwrap();
barrier_test_env.inject_barrier(&b1, [], [actor_id]);
tx.send(Message::Barrier(b1.clone().into_dispatcher()))
.await
.unwrap();
executor.next().await.unwrap().unwrap();

// 5. Check downstream.
Expand All @@ -1261,7 +1290,9 @@ mod tests {
try_recv!(old_simple).unwrap().as_barrier().unwrap(); // Untouched.

// 6. Send another barrier.
tx.send(Message::Barrier(Barrier::new_test_barrier(test_epoch(2))))
let b2 = Barrier::new_test_barrier(test_epoch(2));
barrier_test_env.inject_barrier(&b2, [], [actor_id]);
tx.send(Message::Barrier(b2.into_dispatcher()))
.await
.unwrap();
executor.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -1299,7 +1330,10 @@ mod tests {
actor_new_dispatchers: Default::default(),
},
));
tx.send(Message::Barrier(b3)).await.unwrap();
barrier_test_env.inject_barrier(&b3, [], [actor_id]);
tx.send(Message::Barrier(b3.into_dispatcher()))
.await
.unwrap();
executor.next().await.unwrap().unwrap();

// 10. Check downstream.
Expand All @@ -1309,7 +1343,9 @@ mod tests {
try_recv!(new_simple).unwrap().as_barrier().unwrap(); // Since it's just added, it won't receive the chunk.

// 11. Send another barrier.
tx.send(Message::Barrier(Barrier::new_test_barrier(test_epoch(4))))
let b4 = Barrier::new_test_barrier(test_epoch(4));
barrier_test_env.inject_barrier(&b4, [], [actor_id]);
tx.send(Message::Barrier(b4.into_dispatcher()))
.await
.unwrap();
executor.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -1403,7 +1439,7 @@ mod tests {
} else {
let message = guard.first().unwrap();
let real_chunk = match message {
Message::Chunk(chunk) => chunk,
DispatcherMessage::Chunk(chunk) => chunk,
_ => panic!(),
};
real_chunk
Expand Down
Loading

0 comments on commit bd3b9a1

Please sign in to comment.