Skip to content

Commit

Permalink
fix(meta): avoid removing control stream node when failed to send req…
Browse files Browse the repository at this point in the history
…uest (#18767)
  • Loading branch information
wenym1 authored Oct 8, 2024
1 parent ba761f2 commit 24a6851
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 115 deletions.
8 changes: 5 additions & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,9 +879,11 @@ impl GlobalBarrierManager {
assert_matches!(output.command_ctx.kind, BarrierKind::Barrier);
self.scheduled_barriers.force_checkpoint_in_next_barrier();
}
self.control_stream_manager.remove_partial_graph(
output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect()
);
if !output.table_ids_to_finish.is_empty() {
self.control_stream_manager.remove_partial_graph(
output.table_ids_to_finish.iter().map(|table_id| table_id.table_id).collect()
);
}
}
Ok(None) => {}
Err(e) => {
Expand Down
195 changes: 83 additions & 112 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::future::poll_fn;
use std::task::Poll;
use std::time::Duration;

use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::StreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
Expand All @@ -31,11 +32,11 @@ use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor, Subscrip
use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest;
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
InjectBarrierRequest, StreamingControlStreamRequest,
};
use risingwave_rpc_client::StreamingControlHandle;
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{sleep, timeout};
use tokio_retry::strategy::ExponentialBackoff;
use tracing::{error, info, warn};
Expand All @@ -51,63 +52,19 @@ const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3);

struct ControlStreamNode {
worker: WorkerNode,
sender: UnboundedSender<StreamingControlStreamRequest>,
handle: StreamingControlHandle,
}

mod response_stream_future {
use std::future::Future;

use anyhow::anyhow;
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use risingwave_pb::stream_service::StreamingControlStreamResponse;

use crate::manager::WorkerId;
use crate::MetaResult;

pub(super) fn into_future(
worker_id: WorkerId,
stream: BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
) -> ResponseStreamFuture {
stream.into_future().map(move |(opt, stream)| {
(
worker_id,
stream,
opt.ok_or_else(|| anyhow!("end of stream").into())
.and_then(|result| result.map_err(|e| e.into())),
)
})
}

pub(super) type ResponseStreamFuture = impl Future<
Output = (
WorkerId,
BoxStream<
'static,
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>,
>,
MetaResult<StreamingControlStreamResponse>,
),
> + 'static;
}

use response_stream_future::*;

pub(super) struct ControlStreamManager {
context: GlobalBarrierManagerContext,
nodes: HashMap<WorkerId, ControlStreamNode>,
response_streams: FuturesUnordered<ResponseStreamFuture>,
}

impl ControlStreamManager {
pub(super) fn new(context: GlobalBarrierManagerContext) -> Self {
Self {
context,
nodes: Default::default(),
response_streams: FuturesUnordered::new(),
}
}

Expand Down Expand Up @@ -141,10 +98,8 @@ impl ControlStreamManager {
)
.await
{
Ok((stream_node, response_stream)) => {
let _ = self.nodes.insert(node_id, stream_node);
self.response_streams
.push(into_future(node_id, response_stream));
Ok(stream_node) => {
assert!(self.nodes.insert(node_id, stream_node).is_none());
info!(?node_host, "add control stream worker");
return;
}
Expand Down Expand Up @@ -179,11 +134,8 @@ impl ControlStreamManager {
}))
.await?;
self.nodes.clear();
self.response_streams.clear();
for (worker_id, (node, response_stream)) in nodes {
self.nodes.insert(worker_id, node);
self.response_streams
.push(into_future(worker_id, response_stream));
for (worker_id, node) in nodes {
assert!(self.nodes.insert(worker_id, node).is_none());
}

Ok(())
Expand All @@ -196,17 +148,56 @@ impl ControlStreamManager {

async fn next_response(
&mut self,
) -> Option<(WorkerId, MetaResult<StreamingControlStreamResponse>)> {
let (worker_id, response_stream, result) = self.response_streams.next().await?;

match result.as_ref().map(|r| r.response.as_ref().unwrap()) {
Ok(streaming_control_stream_response::Response::Shutdown(_)) | Err(_) => {
// Do not add it back to the `response_streams` so that it will not be polled again.
}
_ => {
self.response_streams
.push(into_future(worker_id, response_stream));
) -> Option<(
WorkerId,
MetaResult<streaming_control_stream_response::Response>,
)> {
if self.nodes.is_empty() {
return None;
}
let (worker_id, result) = poll_fn(|cx| {
for (worker_id, node) in &mut self.nodes {
match node.handle.response_stream.poll_next_unpin(cx) {
Poll::Ready(result) => {
return Poll::Ready((
*worker_id,
result
.ok_or_else(|| anyhow!("end of stream").into())
.and_then(|result| {
result.map_err(Into::<MetaError>::into).and_then(|resp| {
match resp
.response
.ok_or_else(||anyhow!("empty response"))?
{
streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
"worker node {worker_id} is shutting down"
)
.into()),
streaming_control_stream_response::Response::Init(_) => {
// This arm should be unreachable.
Err(anyhow!("get unexpected init response").into())
}
resp => Ok(resp),
}
})
}),
));
}
Poll::Pending => {
continue;
}
}
}
Poll::Pending
})
.await;

if let Err(err) = &result {
let node = self
.nodes
.remove(&worker_id)
.expect("should exist when get shutdown resp");
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
}

Some((worker_id, result))
Expand All @@ -219,30 +210,16 @@ impl ControlStreamManager {

{
let (worker_id, result) = pending_on_none(self.next_response()).await;
let result = match result {
Ok(resp) => match resp.response.unwrap() {
Response::CompleteBarrier(resp) => {
assert_eq!(worker_id, resp.worker_id);
Ok(resp)
}
Response::Shutdown(_) => {
Err(anyhow!("worker node {worker_id} is shutting down").into())
}
Response::Init(_) => {
// This arm should be unreachable.
Err(anyhow!("get unexpected init response").into())

(
worker_id,
result.map(|resp| match resp {
Response::CompleteBarrier(resp) => resp,
Response::Shutdown(_) | Response::Init(_) => {
unreachable!("should be treated as error")
}
},
Err(err) => Err(err),
};
if let Err(err) = &result {
let node = self
.nodes
.remove(&worker_id)
.expect("should exist when get shutdown resp");
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
}
(worker_id, result)
}),
)
}
}

Expand Down Expand Up @@ -356,7 +333,7 @@ impl ControlStreamManager {
.collect_vec();

self.nodes
.iter_mut()
.iter()
.try_for_each(|(node_id, node)| {
let actor_ids_to_collect: Vec<_> = pre_applied_graph_info
.actor_ids_to_collect(*node_id)
Expand All @@ -383,7 +360,8 @@ impl ControlStreamManager {
passed_actors: vec![],
};

node.sender
node.handle
.request_sender
.send(StreamingControlStreamRequest {
request: Some(
streaming_control_stream_request::Request::InjectBarrier(
Expand Down Expand Up @@ -436,22 +414,21 @@ impl ControlStreamManager {
}

pub(super) fn remove_partial_graph(&mut self, partial_graph_ids: Vec<u32>) {
self.nodes.retain(|_, node| {
if node
.sender
self.nodes.iter().for_each(|(_, node)| {
if node.handle
.request_sender
.send(StreamingControlStreamRequest {
request: Some(
streaming_control_stream_request::Request::RemovePartialGraph(
RemovePartialGraphRequest { partial_graph_ids: partial_graph_ids.clone() },
RemovePartialGraphRequest {
partial_graph_ids: partial_graph_ids.clone(),
},
),
),
})
.is_ok()
.is_err()
{
true
} else {
warn!(id = node.worker.id, host = ?node.worker.host, ?partial_graph_ids, "fail to remove_partial_graph request, node removed");
false
warn!(worker_id = node.worker.id,node = ?node.worker.host,"failed to send remove partial graph request");
}
})
}
Expand All @@ -463,24 +440,18 @@ impl GlobalBarrierManagerContext {
node: WorkerNode,
initial_version_id: HummockVersionId,
mv_depended_subscriptions: &HashMap<TableId, HashMap<u32, u64>>,
) -> MetaResult<(
ControlStreamNode,
BoxStream<'static, risingwave_rpc_client::error::Result<StreamingControlStreamResponse>>,
)> {
) -> MetaResult<ControlStreamNode> {
let handle = self
.env
.stream_client_pool()
.get(&node)
.await?
.start_streaming_control(initial_version_id, mv_depended_subscriptions)
.await?;
Ok((
ControlStreamNode {
worker: node.clone(),
sender: handle.request_sender,
},
handle.response_stream,
))
Ok(ControlStreamNode {
worker: node.clone(),
handle,
})
}

/// Send barrier-complete-rpc and wait for responses from all CNs
Expand Down

0 comments on commit 24a6851

Please sign in to comment.