Skip to content

Commit

Permalink
refactor: own global barrier manager in worker loop (#14410)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 8, 2024
1 parent f52c046 commit 414c6ec
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 348 deletions.
8 changes: 4 additions & 4 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,15 @@ pub async fn start_service_as_election_leader(
let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker();
let mut sub_tasks = vec![shutdown_handle];

let barrier_manager = Arc::new(GlobalBarrierManager::new(
let barrier_manager = GlobalBarrierManager::new(
scheduled_barriers,
env.clone(),
metadata_manager.clone(),
hummock_manager.clone(),
source_manager.clone(),
sink_manager.clone(),
meta_metrics.clone(),
));
);

{
let source_manager = source_manager.clone();
Expand Down Expand Up @@ -611,7 +611,7 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
stream_manager.clone(),
source_manager.clone(),
barrier_manager.clone(),
barrier_manager.context().clone(),
sink_manager.clone(),
)
.await;
Expand All @@ -622,7 +622,7 @@ pub async fn start_service_as_election_leader(
metadata_manager.clone(),
source_manager,
stream_manager.clone(),
barrier_manager.clone(),
barrier_manager.context().clone(),
);

let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone());
Expand Down
78 changes: 40 additions & 38 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,14 @@ use risingwave_pb::stream_plan::{
UpdateMutation,
};
use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest};
use risingwave_rpc_client::StreamClientPoolRef;
use uuid::Uuid;

use super::info::BarrierActorInfo;
use super::trace::TracedEpoch;
use crate::barrier::CommandChanges;
use crate::hummock::HummockManagerRef;
use crate::barrier::{CommandChanges, GlobalBarrierManagerContext};
use crate::manager::{DdlType, MetadataManager, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{
build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment,
ThrottleConfig,
};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
Expand Down Expand Up @@ -266,12 +261,6 @@ impl Command {
/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
/// [`Command`].
pub struct CommandContext {
pub metadata_manager: MetadataManager,

hummock_manager: HummockManagerRef,

client_pool: StreamClientPoolRef,

/// Resolved info in this barrier loop.
// TODO: this could be stale when we are calling `post_collect`, check if it matters
pub info: Arc<BarrierActorInfo>,
Expand All @@ -285,9 +274,7 @@ pub struct CommandContext {

pub kind: BarrierKind,

source_manager: SourceManagerRef,

scale_controller: Option<ScaleControllerRef>,
barrier_manager_context: GlobalBarrierManagerContext,

/// The tracing span of this command.
///
Expand All @@ -300,34 +287,30 @@ pub struct CommandContext {
impl CommandContext {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
metadata_manager: MetadataManager,
hummock_manager: HummockManagerRef,
client_pool: StreamClientPoolRef,
info: BarrierActorInfo,
prev_epoch: TracedEpoch,
curr_epoch: TracedEpoch,
current_paused_reason: Option<PausedReason>,
command: Command,
kind: BarrierKind,
source_manager: SourceManagerRef,
scale_controller: Option<ScaleControllerRef>,
barrier_manager_context: GlobalBarrierManagerContext,
span: tracing::Span,
) -> Self {
Self {
metadata_manager,
hummock_manager,
client_pool,
info: Arc::new(info),
prev_epoch,
curr_epoch,
current_paused_reason,
command,
kind,
source_manager,
scale_controller,
barrier_manager_context,
span,
}
}

pub fn metadata_manager(&self) -> &MetadataManager {
&self.barrier_manager_context.metadata_manager
}
}

impl CommandContext {
Expand Down Expand Up @@ -382,7 +365,8 @@ impl CommandContext {
}

Command::DropStreamingJobs(table_ids) => {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unreachable!("only available in v1");
};

Expand Down Expand Up @@ -477,7 +461,8 @@ impl CommandContext {
),

Command::RescheduleFragment { reschedules, .. } => {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement scale functions in v2");
};
let mut dispatcher_update = HashMap::new();
Expand Down Expand Up @@ -736,7 +721,12 @@ impl CommandContext {
let request_id = Uuid::new_v4().to_string();

async move {
let client = self.client_pool.get(node).await?;
let client = self
.barrier_manager_context
.env
.stream_client_pool()
.get(node)
.await?;
let request = DropActorsRequest {
request_id,
actor_ids: actors.to_owned(),
Expand All @@ -751,7 +741,12 @@ impl CommandContext {

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.info.node_map.values().map(|worker_node| async {
let client = self.client_pool.get(worker_node).await?;
let client = self
.barrier_manager_context
.env
.stream_client_pool()
.get(worker_node)
.await?;
let request = WaitEpochCommitRequest { epoch };
client.wait_epoch_commit(request).await
});
Expand Down Expand Up @@ -782,19 +777,22 @@ impl CommandContext {
Command::Resume(_) => {}

Command::SourceSplitAssignment(split_assignment) => {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement config change funcs in v2");
};
mgr.fragment_manager
.update_actor_splits_by_split_assignment(split_assignment)
.await?;
self.source_manager
self.barrier_manager_context
.source_manager
.apply_source_change(None, Some(split_assignment.clone()), None)
.await;
}

Command::DropStreamingJobs(table_ids) => {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unreachable!("only available in v1");
};
// Tell compute nodes to drop actors.
Expand Down Expand Up @@ -834,11 +832,12 @@ impl CommandContext {
let table_id = table_fragments.table_id().table_id;
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
self.hummock_manager
self.barrier_manager_context
.hummock_manager
.unregister_table_ids_fail_fast(&table_ids)
.await;

match &self.metadata_manager {
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
Expand Down Expand Up @@ -889,7 +888,7 @@ impl CommandContext {
replace_table,
..
} => {
match &self.metadata_manager {
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
let mut dependent_table_actors =
Vec::with_capacity(upstream_mview_actors.len());
Expand Down Expand Up @@ -944,7 +943,8 @@ impl CommandContext {
// Extract the fragments that include source operators.
let source_fragments = table_fragments.stream_source_fragments();

self.source_manager
self.barrier_manager_context
.source_manager
.apply_source_change(
Some(source_fragments),
Some(init_split_assignment.clone()),
Expand All @@ -958,6 +958,7 @@ impl CommandContext {
table_parallelism,
} => {
let node_dropped_actors = self
.barrier_manager_context
.scale_controller
.as_ref()
.unwrap()
Expand All @@ -973,7 +974,8 @@ impl CommandContext {
dispatchers,
init_split_assignment,
}) => {
let MetadataManager::V1(mgr) = &self.metadata_manager else {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement replace funcs in v2");
};
let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id()));
Expand Down
Loading

0 comments on commit 414c6ec

Please sign in to comment.