Skip to content

Commit

Permalink
poll response stream separately
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 23, 2024
1 parent c0d9225 commit d0af265
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 122 deletions.
51 changes: 39 additions & 12 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use risingwave_pb::stream_service::{
streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamResponse,
};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
Expand All @@ -52,7 +54,7 @@ use self::progress::TrackingCommand;
use crate::barrier::info::InflightActorInfo;
use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
use crate::barrier::rpc::BarrierRpcManager;
use crate::barrier::rpc::{BarrierRpcManager, ControlStreamManager};
use crate::barrier::state::BarrierManagerState;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
Expand Down Expand Up @@ -191,6 +193,8 @@ pub struct GlobalBarrierManager {
rpc_manager: BarrierRpcManager,

active_streaming_nodes: ActiveStreamingWorkerNodes,

control_stream_manager: ControlStreamManager,
}

/// Controls the concurrent execution of commands.
Expand Down Expand Up @@ -422,7 +426,8 @@ impl GlobalBarrierManager {
env: env.clone(),
};

let rpc_manager = BarrierRpcManager::new(context.clone());
let rpc_manager = BarrierRpcManager::new();
let control_stream_manager = ControlStreamManager::new(context.clone());

Self {
enable_recovery,
Expand All @@ -434,6 +439,7 @@ impl GlobalBarrierManager {
checkpoint_control,
rpc_manager,
active_streaming_nodes,
control_stream_manager,
}
}

Expand Down Expand Up @@ -603,7 +609,7 @@ impl GlobalBarrierManager {
self.state
.resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned());
if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
self.rpc_manager.add_worker(&node).await;
self.control_stream_manager.add_worker(node).await;
}
}

Expand All @@ -617,6 +623,23 @@ impl GlobalBarrierManager {
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
}
}
resp_result = self.control_stream_manager.next_response() => {
match resp_result {
Ok((worker_id, prev_epoch, resp)) => {
let resp: StreamingControlStreamResponse = resp;
match resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => {
self.rpc_manager.barrier_collected(worker_id, prev_epoch, resp);
},
resp => unreachable!("invalid response: {:?}", resp),
}

}
Err(e) => {
self.failure_recovery(e, empty()).await;
}
}
}
// Barrier completes.
completion = self.rpc_manager.next_complete_barrier() => {
self.handle_barrier_complete(
Expand Down Expand Up @@ -673,12 +696,16 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

self.rpc_manager
let node_to_collect = self
.control_stream_manager
.inject_barrier(command_ctx.clone())
.inspect_err(|_| {
fail_point!("inject_barrier_err_success");
})?;

self.rpc_manager
.barrier_injected(command_ctx.clone(), node_to_collect);

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
let curr_paused_reason = command_ctx.next_paused_reason();
Expand Down Expand Up @@ -713,9 +740,8 @@ impl GlobalBarrierManager {
if let Err(err) = result {
// FIXME: If it is a connector source error occurred in the init barrier, we should pass
// back to frontend
let fail_node = self.checkpoint_control.barrier_failed();
warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch");
self.failure_recovery(err, fail_node).await;
self.failure_recovery(err, empty()).await;
return;
}
// change the state to Complete
Expand All @@ -735,10 +761,7 @@ impl GlobalBarrierManager {
}
// Handle the error node and the nodes after it
if let Some(err) = err_msg {
let fail_nodes = complete_nodes
.drain(index..)
.chain(self.checkpoint_control.barrier_failed().into_iter())
.collect_vec();
let fail_nodes = complete_nodes.drain(index..);
warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch");
self.failure_recovery(err, fail_nodes).await;
}
Expand All @@ -750,8 +773,12 @@ impl GlobalBarrierManager {
fail_nodes: impl IntoIterator<Item = EpochNode>,
) {
self.checkpoint_control.clear_changes();
self.rpc_manager.clear();

for node in fail_nodes {
for node in fail_nodes
.into_iter()
.chain(self.checkpoint_control.barrier_failed().into_iter())
{
if let Some(timer) = node.timer {
timer.observe_duration();
}
Expand Down
39 changes: 18 additions & 21 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::barrier::command::CommandContext;
use crate::barrier::info::InflightActorInfo;
use crate::barrier::notifier::Notifier;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::BarrierRpcManager;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext};
Expand Down Expand Up @@ -413,9 +413,10 @@ impl GlobalBarrierManager {
})?
};

let mut rpc_manager = BarrierRpcManager::new(self.context.clone());
let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

rpc_manager
control_stream_manager
.reset(prev_epoch.value().0, active_streaming_nodes.current())
.await
.inspect_err(|err| {
Expand Down Expand Up @@ -470,28 +471,20 @@ impl GlobalBarrierManager {
tracing::Span::current(), // recovery span
));

rpc_manager.inject_barrier(command_ctx.clone())?;

let res = match rpc_manager.next_complete_barrier().await.result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
warn!(error = %err.as_report(), "post_collect failed");
Err(err)
} else {
Ok((new_epoch.clone(), response))
}
}
Err(err) => {
warn!(error = %err.as_report(), "inject_barrier failed");
Err(err)
let mut node_to_collect =
control_stream_manager.inject_barrier(command_ctx.clone())?;
loop {
let (worker_id, _, _) = control_stream_manager.next_response().await?;
assert!(node_to_collect.remove(&worker_id));
if node_to_collect.is_empty() {
break;
}
};
let (new_epoch, _) = res?;
}

(
BarrierManagerState::new(new_epoch, info, command_ctx.next_paused_reason()),
active_streaming_nodes,
rpc_manager,
control_stream_manager,
)
};
if recovery_result.is_err() {
Expand All @@ -507,7 +500,11 @@ impl GlobalBarrierManager {
recovery_timer.observe_duration();
self.scheduled_barriers.mark_ready();

(self.state, self.active_streaming_nodes, self.rpc_manager) = new_state;
(
self.state,
self.active_streaming_nodes,
self.control_stream_manager,
) = new_state;

tracing::info!(
epoch = self.state.in_flight_prev_epoch().value().0,
Expand Down
Loading

0 comments on commit d0af265

Please sign in to comment.