Skip to content

Commit

Permalink
refactor: remove drop_actors rpc and simplify barrier worker state
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 2, 2024
1 parent 602c6ad commit fe78fb3
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 467 deletions.
11 changes: 0 additions & 11 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@ message BuildActorInfo {
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

message DropActorsRequest {
string request_id = 1;
repeated uint32 actor_ids = 2;
}

message DropActorsResponse {
string request_id = 1;
common.Status status = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
Expand Down Expand Up @@ -109,7 +99,6 @@ message StreamingControlStreamResponse {
}

service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
}
Expand Down
14 changes: 0 additions & 14 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn drop_actors(
&self,
request: Request<DropActorsRequest>,
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actors(actors).await?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
Expand Down
6 changes: 0 additions & 6 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::{report_scarf_enabled, report_to_scarf, telemetry_env_enabled};
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common_service::{MetricsManager, TracingExtractLayer};
use risingwave_meta::barrier::StreamRpcManager;
use risingwave_meta::controller::catalog::CatalogController;
use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::{
Expand Down Expand Up @@ -550,12 +549,9 @@ pub async fn start_service_as_election_leader(
// TODO(shutdown): remove this as there's no need to gracefully shutdown some of these sub-tasks.
let mut sub_tasks = vec![shutdown_handle];

let stream_rpc_manager = StreamRpcManager::new(env.clone());

let scale_controller = Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
stream_rpc_manager.clone(),
env.clone(),
));

Expand All @@ -567,7 +563,6 @@ pub async fn start_service_as_election_leader(
source_manager.clone(),
sink_manager.clone(),
meta_metrics.clone(),
stream_rpc_manager.clone(),
scale_controller.clone(),
)
.await;
Expand All @@ -585,7 +580,6 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
barrier_scheduler.clone(),
source_manager.clone(),
stream_rpc_manager,
scale_controller.clone(),
)
.unwrap(),
Expand Down
32 changes: 0 additions & 32 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;

use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
Expand Down Expand Up @@ -951,19 +950,6 @@ impl Command {
}

impl CommandContext {
/// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands.
async fn clean_up(&self, actors: Vec<ActorId>) -> MetaResult<()> {
self.barrier_manager_context
.stream_rpc_manager
.drop_actors(
&self.node_map,
self.node_map
.keys()
.map(|worker_id| (*worker_id, actors.clone())),
)
.await
}

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.node_map.values().map(|worker_node| async {
let client = self
Expand Down Expand Up @@ -1013,13 +999,9 @@ impl CommandContext {
}

Command::DropStreamingJobs {
actors,
unregistered_state_table_ids,
..
} => {
// Tell compute nodes to drop actors.
self.clean_up(actors.clone()).await?;

self.barrier_manager_context
.hummock_manager
.unregister_table_ids(unregistered_state_table_ids.iter().cloned())
Expand All @@ -1028,7 +1010,6 @@ impl CommandContext {

Command::CancelStreamingJob(table_fragments) => {
tracing::debug!(id = ?table_fragments.table_id(), "cancelling stream job");
self.clean_up(table_fragments.actor_ids()).await?;

// NOTE(kwannoel): At this point, meta has already registered the table ids.
// We should unregister them.
Expand Down Expand Up @@ -1128,8 +1109,6 @@ impl CommandContext {
..
}) = job_type
{
self.clean_up(old_table_fragments.actor_ids()).await?;

// Drop fragment info in meta store.
mgr.fragment_manager
.post_replace_table(
Expand All @@ -1156,13 +1135,9 @@ impl CommandContext {
new_table_fragments,
dispatchers,
init_split_assignment,
old_table_fragments,
..
}) = job_type
{
// Tell compute nodes to drop actors.
self.clean_up(old_table_fragments.actor_ids()).await?;

mgr.catalog_controller
.post_collect_table_fragments(
new_table_fragments.table_id().table_id as _,
Expand Down Expand Up @@ -1193,11 +1168,6 @@ impl CommandContext {
table_parallelism,
..
} => {
let removed_actors = reschedules
.values()
.flat_map(|reschedule| reschedule.removed_actors.clone().into_iter())
.collect_vec();
self.clean_up(removed_actors).await?;
self.barrier_manager_context
.scale_controller
.post_apply_reschedule(reschedules, table_parallelism)
Expand All @@ -1212,8 +1182,6 @@ impl CommandContext {
init_split_assignment,
..
}) => {
self.clean_up(old_table_fragments.actor_ids()).await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// Drop fragment info in meta store.
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub use self::command::{
Reschedule, SnapshotBackfillInfo,
};
pub use self::info::InflightSubscriptionInfo;
pub use self::rpc::StreamRpcManager;
pub use self::schedule::BarrierScheduler;
pub use self::trace::TracedEpoch;

Expand Down Expand Up @@ -172,8 +171,6 @@ pub struct GlobalBarrierManagerContext {

pub(super) metrics: Arc<MetaMetrics>,

stream_rpc_manager: StreamRpcManager,

env: MetaSrvEnv,
}

Expand Down Expand Up @@ -596,7 +593,6 @@ impl GlobalBarrierManager {
source_manager: SourceManagerRef,
sink_manager: SinkCoordinatorManager,
metrics: Arc<MetaMetrics>,
stream_rpc_manager: StreamRpcManager,
scale_controller: ScaleControllerRef,
) -> Self {
let enable_recovery = env.opts.enable_recovery;
Expand Down Expand Up @@ -624,7 +620,6 @@ impl GlobalBarrierManager {
scale_controller,
sink_manager,
metrics,
stream_rpc_manager,
env: env.clone(),
};

Expand Down
99 changes: 4 additions & 95 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
Expand All @@ -34,11 +34,9 @@ use risingwave_pb::stream_service::build_actor_info::SubscriptionIds;
use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest;
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BuildActorInfo, DropActorsRequest, InjectBarrierRequest, StreamingControlStreamRequest,
BuildActorInfo, InjectBarrierRequest, StreamingControlStreamRequest,
StreamingControlStreamResponse,
};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
use rw_futures_util::pending_on_none;
use thiserror_ext::AsReport;
use tokio::sync::mpsc::UnboundedSender;
Expand All @@ -50,7 +48,7 @@ use uuid::Uuid;
use super::command::CommandContext;
use super::{BarrierKind, GlobalBarrierManagerContext, TracedEpoch};
use crate::barrier::info::InflightGraphInfo;
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::manager::WorkerId;
use crate::{MetaError, MetaResult};

const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3);
Expand Down Expand Up @@ -390,7 +388,7 @@ impl ControlStreamManager {
request: Some(
streaming_control_stream_request::Request::InjectBarrier(
InjectBarrierRequest {
request_id: StreamRpcManager::new_request_id(),
request_id: Uuid::new_v4().to_string(),
barrier: Some(barrier),
actor_ids_to_collect,
table_ids_to_sync,
Expand Down Expand Up @@ -509,95 +507,6 @@ impl GlobalBarrierManagerContext {
}
}

#[derive(Clone)]
pub struct StreamRpcManager {
env: MetaSrvEnv,
}

impl StreamRpcManager {
pub fn new(env: MetaSrvEnv) -> Self {
Self { env }
}

async fn make_request<REQ, RSP, Fut: Future<Output = Result<RSP, RpcError>> + 'static>(
&self,
request: impl Iterator<Item = (&WorkerNode, REQ)>,
f: impl Fn(StreamClient, REQ) -> Fut,
) -> MetaResult<Vec<RSP>> {
let pool = self.env.stream_client_pool();
let f = &f;
let iters = request.map(|(node, input)| async move {
let client = pool.get(node).await.map_err(|e| (node.id, e))?;
f(client, input).await.map_err(|e| (node.id, e))
});
let result = try_join_all_with_error_timeout(iters, COLLECT_ERROR_TIMEOUT).await;
result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err))
}

fn new_request_id() -> String {
Uuid::new_v4().to_string()
}

pub async fn drop_actors(
&self,
node_map: &HashMap<WorkerId, WorkerNode>,
node_actors: impl Iterator<Item = (WorkerId, Vec<ActorId>)>,
) -> MetaResult<()> {
self.make_request(
node_actors
.map(|(worker_id, actor_ids)| (node_map.get(&worker_id).unwrap(), actor_ids)),
|client, actor_ids| async move {
client
.drop_actors(DropActorsRequest {
request_id: Self::new_request_id(),
actor_ids,
})
.await
},
)
.await?;
Ok(())
}
}

/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`.
async fn try_join_all_with_error_timeout<I, RSP, E, F>(
iters: I,
error_timeout: Duration,
) -> Result<Vec<RSP>, Vec<E>>
where
I: IntoIterator<Item = F>,
F: Future<Output = Result<RSP, E>>,
{
let stream = FuturesUnordered::from_iter(iters);
pin_mut!(stream);
let mut results_ok = vec![];
let mut results_err = vec![];
while let Some(result) = stream.next().await {
match result {
Ok(rsp) => {
results_ok.push(rsp);
}
Err(err) => {
results_err.push(err);
break;
}
}
}
if results_err.is_empty() {
return Ok(results_ok);
}
let _ = timeout(error_timeout, async {
while let Some(result) = stream.next().await {
if let Err(err) = result {
results_err.push(err);
}
}
})
.await;
Err(results_err)
}

pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
message: &str,
errors: impl IntoIterator<Item = (WorkerId, E)>,
Expand Down
6 changes: 1 addition & 5 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::{Instant, MissedTickBehavior};

use crate::barrier::{Command, Reschedule, StreamRpcManager};
use crate::barrier::{Command, Reschedule};
use crate::controller::scale::RescheduleWorkingSet;
use crate::manager::{
IdCategory, IdGenManagerImpl, LocalNotification, MetaSrvEnv, MetadataManager,
Expand Down Expand Up @@ -437,8 +437,6 @@ pub struct ScaleController {

pub source_manager: SourceManagerRef,

pub stream_rpc_manager: StreamRpcManager,

pub env: MetaSrvEnv,

pub reschedule_lock: RwLock<()>,
Expand All @@ -448,11 +446,9 @@ impl ScaleController {
pub fn new(
metadata_manager: &MetadataManager,
source_manager: SourceManagerRef,
stream_rpc_manager: StreamRpcManager,
env: MetaSrvEnv,
) -> Self {
Self {
stream_rpc_manager,
metadata_manager: metadata_manager.clone(),
source_manager,
env,
Expand Down
Loading

0 comments on commit fe78fb3

Please sign in to comment.