Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove drop_actors rpc and simplify barrier worker state #18354

Merged
merged 14 commits into from
Sep 6, 2024
11 changes: 0 additions & 11 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
}

message DropActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
Expand Down Expand Up @@ -109,7 +99,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}
Expand Down
14 changes: 0 additions & 14 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn drop_actors(
&self,
request: Request<DropActorsRequest>,
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actors(actors).await?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
Expand Down
6 changes: 0 additions & 6 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common_service::{MetricsManager, TracingExtractLayer};
use risingwave_meta::barrier::StreamRpcManager;
use risingwave_meta::controller::catalog::CatalogController;
use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::{
Expand Down Expand Up @@ -550,12 +549,9 @@ pub async fn start_service_as_election_leader(
// TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks.
let mut sub_tasks = vec![shutdown_handle];

let stream_rpc_manager = StreamRpcManager::new(env.clone());

let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

Expand All @@ -567,7 +563,6 @@ pub async fn start_service_as_election_leader(
source_manager.clone(),
sink_manager.clone(),
meta_metrics.clone(),
stream_rpc_manager.clone(),
scale_controller.clone(),
)
.await;
Expand All @@ -585,7 +580,6 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
barrier_scheduler.clone(),
source_manager.clone(),
stream_rpc_manager,
scale_controller.clone(),
)
.unwrap(),
Expand Down
32 changes: 0 additions & 32 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
Expand Down Expand Up @@ -959,19 +958,6 @@ impl Command {
}

impl CommandContext {
/// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands.
async fn clean_up(&self, actors: Vec<ActorId>) -> MetaResult<()> {
self.barrier_manager_context
.stream_rpc_manager
.drop_actors(
&self.node_map,
self.node_map
.keys()
.map(|worker_id| (*worker_id, actors.clone())),
)
.await
}

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.node_map.values().map(|worker_node| async {
let client = self
Expand Down Expand Up @@ -1021,13 +1007,9 @@ impl CommandContext {
}

Command::DropStreamingJobs {
actors,
unregistered_state_table_ids,
..
} => {
// Tell compute nodes to drop actors.
self.clean_up(actors.clone()).await?;

self.barrier_manager_context
.hummock_manager
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())
Expand All @@ -1036,7 +1018,6 @@ impl CommandContext {

Command::CancelStreamingJob(table_fragments) => {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
self.clean_up(table_fragments.actor_ids()).await?;

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
Expand Down Expand Up @@ -1136,8 +1117,6 @@ impl CommandContext {
..
}) = job_type
{
self.clean_up(old_table_fragments.actor_ids()).await?;

// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
Expand All @@ -1164,13 +1143,9 @@ impl CommandContext {
new_table_fragments,
dispatchers,
init_split_assignment,
old_table_fragments,
..
}) = job_type
{
// Tell compute nodes to drop actors.
self.clean_up(old_table_fragments.actor_ids()).await?;

mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
Expand Down Expand Up @@ -1201,11 +1176,6 @@ impl CommandContext {
table_parallelism,
..
} => {
let removed_actors = reschedules
.values()
.flat_map(|reschedule| reschedule.removed_actors.clone().into_iter())
.collect_vec();
self.clean_up(removed_actors).await?;
self.barrier_manager_context
.scale_controller
.post_apply_reschedule(reschedules, table_parallelism)
Expand All @@ -1220,8 +1190,6 @@ impl CommandContext {
init_split_assignment,
..
}) => {
self.clean_up(old_table_fragments.actor_ids()).await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// Drop fragment info in meta store.
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub use self::command::{
Reschedule, SnapshotBackfillInfo,
};
pub use self::info::InflightSubscriptionInfo;
pub use self::rpc::StreamRpcManager;
pub use self::schedule::BarrierScheduler;
pub use self::trace::TracedEpoch;

Expand Down Expand Up @@ -172,8 +171,6 @@ pub struct GlobalBarrierManagerContext {

pub(super) metrics: Arc<MetaMetrics>,

stream_rpc_manager: StreamRpcManager,

env: MetaSrvEnv,
}

Expand Down Expand Up @@ -596,7 +593,6 @@ impl GlobalBarrierManager {
source_manager: SourceManagerRef,
sink_manager: SinkCoordinatorManager,
metrics: Arc<MetaMetrics>,
stream_rpc_manager: StreamRpcManager,
scale_controller: ScaleControllerRef,
) -> Self {
let enable_recovery = env.opts.enable_recovery;
Expand Down Expand Up @@ -624,7 +620,6 @@ impl GlobalBarrierManager {
scale_controller,
sink_manager,
metrics,
stream_rpc_manager,
env: env.clone(),
};

Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,14 @@ impl GlobalBarrierManagerContext {
return Err(anyhow!("actors dropped during update").into());
}

{
for (node_id, actors) in &info.actor_map {
if !actors.is_empty() && !all_node_actors.contains_key(node_id) {
return Err(anyhow!("streaming job dropped during update").into());
}
}
}

Ok(all_node_actors)
}
}
Expand Down
99 changes: 4 additions & 95 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
Expand All @@ -34,11 +34,9 @@ use risingwave_pb::stream_service::build_actor_info::SubscriptionIds;
use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest;
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BuildActorInfo, DropActorsRequest, InjectBarrierRequest, StreamingControlStreamRequest,
BuildActorInfo, InjectBarrierRequest, StreamingControlStreamRequest,
StreamingControlStreamResponse,
};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedSender;
Expand All @@ -50,7 +48,7 @@ use uuid::Uuid;
use super::command::CommandContext;
use super::{BarrierKind, GlobalBarrierManagerContext, TracedEpoch};
use crate::barrier::info::InflightGraphInfo;
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::manager::WorkerId;
use crate::{MetaError, MetaResult};

const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3);
Expand Down Expand Up @@ -393,7 +391,7 @@ impl ControlStreamManager {
request: Some(
streaming_control_stream_request::Request::InjectBarrier(
InjectBarrierRequest {
request_id: StreamRpcManager::new_request_id(),
request_id: Uuid::new_v4().to_string(),
barrier: Some(barrier),
actor_ids_to_collect,
table_ids_to_sync,
Expand Down Expand Up @@ -512,95 +510,6 @@ impl GlobalBarrierManagerContext {
}
}

#[derive(Clone)]
pub struct StreamRpcManager {
env: MetaSrvEnv,
}

impl StreamRpcManager {
pub fn new(env: MetaSrvEnv) -> Self {
Self { env }
}

async fn make_request<REQ, RSP, Fut: Future<Output = Result<RSP, RpcError>> + 'static>(
&self,
request: impl Iterator<Item = (&WorkerNode, REQ)>,
f: impl Fn(StreamClient, REQ) -> Fut,
) -> MetaResult<Vec<RSP>> {
let pool = self.env.stream_client_pool();
let f = &f;
let iters = request.map(|(node, input)| async move {
let client = pool.get(node).await.map_err(|e| (node.id, e))?;
f(client, input).await.map_err(|e| (node.id, e))
});
let result = try_join_all_with_error_timeout(iters, COLLECT_ERROR_TIMEOUT).await;
result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err))
}

fn new_request_id() -> String {
Uuid::new_v4().to_string()
}

pub async fn drop_actors(
&self,
node_map: &HashMap<WorkerId, WorkerNode>,
node_actors: impl Iterator<Item = (WorkerId, Vec<ActorId>)>,
) -> MetaResult<()> {
self.make_request(
node_actors
.map(|(worker_id, actor_ids)| (node_map.get(&worker_id).unwrap(), actor_ids)),
|client, actor_ids| async move {
client
.drop_actors(DropActorsRequest {
request_id: Self::new_request_id(),
actor_ids,
})
.await
},
)
.await?;
Ok(())
}
}

/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`.
async fn try_join_all_with_error_timeout<I, RSP, E, F>(
iters: I,
error_timeout: Duration,
) -> Result<Vec<RSP>, Vec<E>>
where
I: IntoIterator<Item = F>,
F: Future<Output = Result<RSP, E>>,
{
let stream = FuturesUnordered::from_iter(iters);
pin_mut!(stream);
let mut results_ok = vec![];
let mut results_err = vec![];
while let Some(result) = stream.next().await {
match result {
Ok(rsp) => {
results_ok.push(rsp);
}
Err(err) => {
results_err.push(err);
break;
}
}
}
if results_err.is_empty() {
return Ok(results_ok);
}
let _ = timeout(error_timeout, async {
while let Some(result) = stream.next().await {
if let Err(err) = result {
results_err.push(err);
}
}
})
.await;
Err(results_err)
}

pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
message: &str,
errors: impl IntoIterator<Item = (WorkerId, E)>,
Expand Down
6 changes: 1 addition & 5 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::{Instant, MissedTickBehavior};

use crate::barrier::{Command, Reschedule, StreamRpcManager};
use crate::barrier::{Command, Reschedule};
use crate::controller::scale::RescheduleWorkingSet;
use crate::manager::{
IdCategory, IdGenManagerImpl, LocalNotification, MetaSrvEnv, MetadataManager,
Expand Down Expand Up @@ -437,8 +437,6 @@ pub struct ScaleController {

pub source_manager: SourceManagerRef,

pub stream_rpc_manager: StreamRpcManager,

pub env: MetaSrvEnv,

pub reschedule_lock: RwLock<()>,
Expand All @@ -448,11 +446,9 @@ impl ScaleController {
pub fn new(
metadata_manager: &MetadataManager,
source_manager: SourceManagerRef,
stream_rpc_manager: StreamRpcManager,
env: MetaSrvEnv,
) -> Self {
Self {
stream_rpc_manager,
metadata_manager: metadata_manager.clone(),
source_manager,
env,
Expand Down
Loading
Loading