Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inject and collect barrier from partial graph #17758

Merged
merged 24 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ message DropActorsResponse {
common.Status status = 2;
}

message PartialGraphInfo {
repeated uint32 actor_ids_to_send = 1;
repeated uint32 actor_ids_to_collect = 2;
repeated uint32 table_ids_to_sync = 3;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
reserved 3, 4, 5;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
reserved "actor_ids_to_send", "actor_ids_to_collect", "table_ids_to_sync";
map<uint32, PartialGraphInfo> graph_info = 6;
}

message BarrierCompleteResponse {
Expand All @@ -80,6 +86,9 @@ message BarrierCompleteResponse {
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint32 partial_graph_id = 8;
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
// prev_epoch of barrier
uint64 epoch = 9;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand All @@ -100,9 +109,14 @@ message StreamingControlStreamRequest {
uint64 prev_epoch = 2;
}

message RemovePartialGraphRequest {
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
repeated uint32 partial_graph_ids = 1;
}

oneof request {
InitRequest init = 1;
InjectBarrierRequest inject_barrier = 2;
RemovePartialGraphRequest remove_partial_graph = 3;
}
}

Expand Down
45 changes: 30 additions & 15 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,10 @@ impl CheckpointControl {

/// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
/// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
fn barrier_collected(
&mut self,
worker_id: WorkerId,
prev_epoch: u64,
resp: BarrierCompleteResponse,
) {
fn barrier_collected(&mut self, resp: BarrierCompleteResponse) {
let worker_id = resp.worker_id;
let prev_epoch = resp.epoch;
assert_eq!(resp.partial_graph_id, u32::MAX);
if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
assert!(node.state.node_to_collect.remove(&worker_id));
node.state.resps.push(resp);
Expand Down Expand Up @@ -391,10 +389,22 @@ impl CheckpointControl {
node.enqueue_time.observe_duration();
}
}

/// Return whether the collect failure on `worker_id` should trigger a recovery
fn on_collect_failure(&self, worker_id: WorkerId, e: &MetaError) -> bool {
for epoch_node in self.command_ctx_queue.values() {
if epoch_node.state.node_to_collect.contains(&worker_id) {
self.context
.report_collect_failure(&epoch_node.command_ctx, e);
return true;
}
}
false
}
}

/// The state and message of this barrier, a node for concurrent checkpoint.
pub struct EpochNode {
struct EpochNode {
/// Timer for recording barrier latency, taken after `complete_barriers`.
enqueue_time: HistogramTimer,

Expand Down Expand Up @@ -676,14 +686,19 @@ impl GlobalBarrierManager {
_ => {}
}
}
resp_result = self.control_stream_manager.next_complete_barrier_response() => {
(worker_id, resp_result) = self.control_stream_manager.next_complete_barrier_response() => {
match resp_result {
Ok((worker_id, prev_epoch, resp)) => {
self.checkpoint_control.barrier_collected(worker_id, prev_epoch, resp);
Ok(resp) => {
self.checkpoint_control.barrier_collected(resp);

}
Err(e) => {
self.failure_recovery(e).await;
if self.checkpoint_control.on_collect_failure(worker_id, &e)
|| self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to return a bool for self.checkpoint_control.on_collect_failure. Simply checking self.state.inflight_actor_infos.actor_map seems to be sufficient if the purpose here is to detect whether there is any actor allocated for a worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self.state.inflight_actor_infos.actor_map stores the latest map that has been applied with mutation all the injected barrier. Therefore, for example, if we have a drop mview command that drops all mviews, after its barrier is injected and wait for collection, the self.state.inflight_actor_infos.actor_map is empty, and only checking it will ignore this error, but we should not ignore it.

self.failure_recovery(e).await;
} else {
warn!(e = ?e.as_report(), worker_id, "no barrier to collect from worker, ignore err");
}
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -756,10 +771,10 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

let node_to_collect = match self.control_stream_manager.inject_barrier(
command_ctx.clone(),
self.state.inflight_actor_infos.existing_table_ids(),
) {
let node_to_collect = match self
.control_stream_manager
.inject_barrier(command_ctx.clone(), &self.state.inflight_actor_infos)
{
Ok(node_to_collect) => node_to_collect,
Err(err) => {
for notifier in notifiers {
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,14 @@ impl GlobalBarrierManager {
tracing::Span::current(), // recovery span
));

let mut node_to_collect = control_stream_manager
.inject_barrier(command_ctx.clone(), info.existing_table_ids())?;
let mut node_to_collect =
control_stream_manager.inject_barrier(command_ctx.clone(), &info)?;
while !node_to_collect.is_empty() {
let (worker_id, prev_epoch, _) = control_stream_manager
let (worker_id, result) = control_stream_manager
.next_complete_barrier_response()
.await?;
assert_eq!(prev_epoch, command_ctx.prev_epoch.value().0);
.await;
let resp = result?;
assert_eq!(resp.epoch, command_ctx.prev_epoch.value().0);
assert!(node_to_collect.remove(&worker_id));
}

Expand Down
82 changes: 40 additions & 42 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::future::Future;
use std::sync::Arc;
Expand All @@ -24,16 +24,15 @@ use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation};
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BroadcastActorInfoTableRequest, BuildActorInfo, BuildActorsRequest, DropActorsRequest,
InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
UpdateActorsRequest,
InjectBarrierRequest, PbPartialGraphInfo, StreamingControlStreamRequest,
StreamingControlStreamResponse, UpdateActorsRequest,
};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
Expand All @@ -47,6 +46,7 @@ use uuid::Uuid;

use super::command::CommandContext;
use super::GlobalBarrierManagerContext;
use crate::barrier::info::InflightActorInfo;
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::{MetaError, MetaResult};

Expand All @@ -55,8 +55,6 @@ const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3);
struct ControlStreamNode {
worker: WorkerNode,
sender: UnboundedSender<StreamingControlStreamRequest>,
// earlier epoch at the front
inflight_barriers: VecDeque<Arc<CommandContext>>,
}

fn into_future(
Expand Down Expand Up @@ -180,43 +178,30 @@ impl ControlStreamManager {

pub(super) async fn next_complete_barrier_response(
&mut self,
) -> MetaResult<(WorkerId, u64, BarrierCompleteResponse)> {
loop {
) -> (WorkerId, MetaResult<BarrierCompleteResponse>) {
{
let (worker_id, result) = pending_on_none(self.next_response()).await;
match result {
Ok(resp) => match resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => {
let node = self
.nodes
.get_mut(&worker_id)
.expect("should exist when get collect resp");
let command = node
.inflight_barriers
.pop_front()
.expect("should exist when get collect resp");
break Ok((worker_id, command.prev_epoch.value().0, resp));
}
resp => {
break Err(anyhow!("get unexpected resp: {:?}", resp).into());
assert_eq!(worker_id, resp.worker_id);
(worker_id, Ok(resp))
}
resp => (
worker_id,
Err(anyhow!("get unexpected resp: {:?}", resp).into()),
),
},
Err(err) => {
let mut node = self
let node = self
.nodes
.remove(&worker_id)
.expect("should exist when get collect resp");
// Note: No need to use `?` as the backtrace is from meta and not useful.
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
if let Some(command) = node.inflight_barriers.pop_front() {
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
self.context.report_collect_failure(&command, &err);
break Err(err);
} else {
// for node with no inflight barrier, simply ignore the error
info!(node = ?node.worker, "no inflight barrier no node. Ignore error");
continue;
}
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
(worker_id, Err(err))
}
}
}
Expand Down Expand Up @@ -248,7 +233,7 @@ impl ControlStreamManager {
pub(super) fn inject_barrier(
&mut self,
command_context: Arc<CommandContext>,
table_ids_to_sync: HashSet<TableId>,
inflight_actor_info_after_apply_command: &InflightActorInfo,
) -> MetaResult<HashSet<WorkerId>> {
fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
"inject_barrier_err"
Expand All @@ -266,9 +251,25 @@ impl ControlStreamManager {
// No need to send or collect barrier for this node.
assert!(actor_ids_to_send.is_empty());
}
let graph_info = HashMap::from_iter([(
u32::MAX,
PbPartialGraphInfo {
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync: inflight_actor_info_after_apply_command
.existing_table_ids()
.iter()
.map(|table_id| table_id.table_id)
.collect(),
},
)]);

{
let Some(node) = self.nodes.get_mut(node_id) else {
if actor_ids_to_collect.is_empty() {
if graph_info
.values()
.all(|info| info.actor_ids_to_collect.is_empty())
{
// Worker node get disconnected but has no actor to collect. Simply skip it.
return Ok(());
}
Expand Down Expand Up @@ -298,12 +299,7 @@ impl ControlStreamManager {
InjectBarrierRequest {
request_id: StreamRpcManager::new_request_id(),
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync: table_ids_to_sync
.iter()
.map(|table_id| table_id.table_id)
.collect(),
graph_info,
},
),
),
Expand All @@ -316,7 +312,6 @@ impl ControlStreamManager {
))
})?;

node.inflight_barriers.push_back(command_context.clone());
node_need_collect.insert(*node_id);
Result::<_, MetaError>::Ok(())
}
Expand Down Expand Up @@ -359,14 +354,17 @@ impl GlobalBarrierManagerContext {
ControlStreamNode {
worker: node.clone(),
sender: handle.request_sender,
inflight_barriers: VecDeque::new(),
},
handle.response_stream,
))
}

/// Send barrier-complete-rpc and wait for responses from all CNs
fn report_collect_failure(&self, command_context: &CommandContext, error: &MetaError) {
pub(super) fn report_collect_failure(
&self,
command_context: &CommandContext,
error: &MetaError,
) {
// Record failure in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventCollectBarrierFail {
Expand Down
Loading
Loading