Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Aug 7, 2024
1 parent edafeeb commit 911a072
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 196 deletions.
17 changes: 8 additions & 9 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;
use std::sync::Arc;

use futures::future::try_join_all;
use itertools::Itertools;
Expand All @@ -26,6 +25,7 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
Expand All @@ -42,7 +42,7 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
use tracing::warn;

use super::info::{CommandFragmentChanges, InflightActorInfo};
use super::info::CommandFragmentChanges;
use super::trace::TracedEpoch;
use crate::barrier::{GlobalBarrierManagerContext, InflightSubscriptionInfo};
use crate::manager::{DdlType, InflightFragmentInfo, MetadataManager, StreamingJob, WorkerId};
Expand Down Expand Up @@ -418,7 +418,7 @@ impl BarrierKind {
/// [`Command`].
pub struct CommandContext {
/// Resolved info in this barrier loop.
pub info: Arc<InflightActorInfo>,
pub node_map: HashMap<WorkerId, PbWorkerNode>,
pub subscription_info: InflightSubscriptionInfo,
pub table_ids_to_commit: HashSet<TableId>,

Expand Down Expand Up @@ -455,7 +455,7 @@ impl std::fmt::Debug for CommandContext {
impl CommandContext {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
info: InflightActorInfo,
node_map: HashMap<WorkerId, PbWorkerNode>,
subscription_info: InflightSubscriptionInfo,
table_ids_to_commit: HashSet<TableId>,
prev_epoch: TracedEpoch,
Expand All @@ -467,7 +467,7 @@ impl CommandContext {
span: tracing::Span,
) -> Self {
Self {
info: Arc::new(info),
node_map,
subscription_info,
table_ids_to_commit,
prev_epoch,
Expand Down Expand Up @@ -862,17 +862,16 @@ impl CommandContext {
self.barrier_manager_context
.stream_rpc_manager
.drop_actors(
&self.info.node_map,
self.info
.node_map
&self.node_map,
self.node_map
.keys()
.map(|worker_id| (*worker_id, actors.clone())),
)
.await
}

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.info.node_map.values().map(|worker_node| async {
let futures = self.node_map.values().map(|worker_node| async {
let client = self
.barrier_manager_context
.env
Expand Down
99 changes: 25 additions & 74 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
use tracing::warn;

use crate::barrier::Command;
use crate::manager::{
ActiveStreamingWorkerNodes, InflightFragmentInfo, InflightGraphInfo, WorkerId,
};
use crate::manager::{ActorInfos, InflightFragmentInfo, WorkerId};
use crate::model::{ActorId, FragmentId};

#[derive(Debug, Clone)]
Expand All @@ -40,39 +37,33 @@ pub struct InflightSubscriptionInfo {
pub mv_depended_subscriptions: HashMap<TableId, HashMap<u32, u64>>,
}

/// [`InflightActorInfo`] resolves the actor info read from meta store for
/// [`InflightGraphInfo`] resolves the actor info read from meta store for
/// [`crate::barrier::GlobalBarrierManager`].
#[derive(Default, Clone)]
pub struct InflightActorInfo {
/// `node_id` => node
pub node_map: HashMap<WorkerId, PbWorkerNode>,

pub(super) struct InflightGraphInfo {
/// `node_id` => actors
pub actor_map: HashMap<WorkerId, HashSet<ActorId>>,

/// `actor_id` => `WorkerId`
pub actor_location_map: HashMap<ActorId, WorkerId>,

pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
}

impl InflightActorInfo {
impl InflightGraphInfo {
/// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
pub fn resolve(
active_nodes: &ActiveStreamingWorkerNodes,
graph_info: &InflightGraphInfo,
) -> Self {
let node_map = active_nodes.current().clone();

pub fn resolve(actor_infos: ActorInfos) -> Self {
let actor_map = {
let mut map: HashMap<_, HashSet<_>> = HashMap::new();
for info in graph_info.fragment_infos.values() {
for info in actor_infos.fragment_infos.values() {
for (actor_id, worker_id) in &info.actors {
map.entry(*worker_id).or_default().insert(*actor_id);
}
}
map
};

let actor_location_map = graph_info
let actor_location_map = actor_infos
.fragment_infos
.values()
.flat_map(|fragment| {
Expand All @@ -84,49 +75,18 @@ impl InflightActorInfo {
.collect();

Self {
node_map,
actor_map,
actor_location_map,
fragment_infos: actor_infos.fragment_infos,
}
}

/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator<Item = PbWorkerNode>) {
let new_node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();

let mut deleted_actors = BTreeMap::new();
for (&actor_id, &location) in &self.actor_location_map {
if !new_node_map.contains_key(&location) {
deleted_actors
.entry(location)
.or_insert_with(BTreeSet::new)
.insert(actor_id);
}
}
for (node_id, actors) in deleted_actors {
let node = self.node_map.get(&node_id);
warn!(
node_id,
?node,
?actors,
"node with running actors is deleted"
);
}

self.node_map = new_node_map;
}
}

impl InflightGraphInfo {
/// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
/// the info correspondingly.
pub(crate) fn pre_apply(
&mut self,
fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
) -> HashMap<ActorId, WorkerId> {
) {
{
let mut to_add = HashMap::new();
for (fragment_id, change) in fragment_changes {
Expand Down Expand Up @@ -154,16 +114,7 @@ impl InflightGraphInfo {
CommandFragmentChanges::RemoveFragment => {}
}
}
to_add
}
}
}

impl InflightActorInfo {
pub fn pre_apply(&mut self, actors_to_add: Option<HashMap<ActorId, WorkerId>>) {
{
for (actor_id, node_id) in actors_to_add.into_iter().flatten() {
assert!(self.node_map.contains_key(&node_id));
for (actor_id, node_id) in to_add {
assert!(
self.actor_map.entry(node_id).or_default().insert(actor_id),
"duplicate actor in command changes"
Expand Down Expand Up @@ -203,7 +154,7 @@ impl InflightGraphInfo {
pub(crate) fn post_apply(
&mut self,
fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
) -> HashSet<ActorId> {
) {
{
let mut all_to_remove = HashSet::new();
for (fragment_id, changes) in fragment_changes {
Expand All @@ -224,21 +175,13 @@ impl InflightGraphInfo {
.fragment_infos
.remove(fragment_id)
.expect("should exist");
for actor_id in info.actors.keys() {
assert!(all_to_remove.insert(*actor_id));
for (actor_id, _) in info.actors {
assert!(all_to_remove.insert(actor_id));
}
}
}
}
all_to_remove
}
}
}

impl InflightActorInfo {
pub fn post_apply(&mut self, actors_to_remove: Option<HashSet<ActorId>>) {
{
for actor_id in actors_to_remove.into_iter().flatten() {
for actor_id in all_to_remove {
let node_id = self
.actor_location_map
.remove(&actor_id)
Expand Down Expand Up @@ -314,4 +257,12 @@ impl InflightGraphInfo {
.values()
.flat_map(|info| info.state_table_ids.iter().cloned())
}

pub fn worker_ids(&self) -> impl Iterator<Item = WorkerId> + '_ {
self.actor_map.keys().cloned()
}

pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
self.actor_map.contains_key(&worker_id)
}
}
35 changes: 20 additions & 15 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use tracing::{error, info, warn, Instrument};

use self::command::CommandContext;
use self::notifier::Notifier;
use crate::barrier::info::InflightActorInfo;
use crate::barrier::info::InflightGraphInfo;
use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
Expand All @@ -61,8 +61,8 @@ use crate::error::MetaErrorInner;
use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, InflightGraphInfo, LocalNotification,
MetaSrvEnv, MetadataManager, SystemParamsManagerImpl, WorkerId,
ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
MetadataManager, SystemParamsManagerImpl, WorkerId,
};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleControllerRef, SourceManagerRef};
Expand Down Expand Up @@ -490,7 +490,6 @@ impl GlobalBarrierManager {

let initial_invalid_state = BarrierManagerState::new(
TracedEpoch::new(Epoch(INVALID_EPOCH)),
InflightActorInfo::default(),
InflightGraphInfo::default(),
InflightSubscriptionInfo::default(),
None,
Expand Down Expand Up @@ -719,8 +718,6 @@ impl GlobalBarrierManager {

info!(?changed_worker, "worker changed");

self.state
.resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned());
if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
self.control_stream_manager.add_worker(node).await;
}
Expand Down Expand Up @@ -751,7 +748,7 @@ impl GlobalBarrierManager {
Err(e) => {
let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id);
if failed_command.is_some()
|| self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) {
|| self.state.inflight_actor_infos.contains_worker(worker_id) {
let errors = self.control_stream_manager.collect_errors(worker_id, e).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
if let Some(failed_command) = failed_command {
Expand Down Expand Up @@ -802,7 +799,7 @@ impl GlobalBarrierManager {
span,
} = scheduled;

let (pre_applied_actor_info, pre_applied_graph_info, pre_applied_subscription_info) =
let (pre_applied_actor_info, pre_applied_subscription_info) =
self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
Expand All @@ -822,9 +819,9 @@ impl GlobalBarrierManager {
span.record("epoch", curr_epoch.value().0);

let command_ctx = Arc::new(CommandContext::new(
pre_applied_actor_info,
self.active_streaming_nodes.current().clone(),
pre_applied_subscription_info,
pre_applied_graph_info.existing_table_ids().collect(),
pre_applied_actor_info.existing_table_ids().collect(),
prev_epoch.clone(),
curr_epoch.clone(),
self.state.paused_reason(),
Expand All @@ -838,8 +835,8 @@ impl GlobalBarrierManager {

let node_to_collect = match self.control_stream_manager.inject_barrier(
&command_ctx,
&pre_applied_graph_info,
Some(&self.state.inflight_graph_info),
&pre_applied_actor_info,
Some(&self.state.inflight_actor_infos),
) {
Ok(node_to_collect) => node_to_collect,
Err(err) => {
Expand Down Expand Up @@ -1192,10 +1189,18 @@ impl GlobalBarrierManagerContext {
/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn load_graph_info(&self) -> MetaResult<InflightGraphInfo> {
async fn resolve_actor_info(&self) -> MetaResult<InflightGraphInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.fragment_manager.load_graph_info().await,
MetadataManager::V2(mgr) => mgr.catalog_controller.load_graph_info().await?,
MetadataManager::V1(mgr) => {
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightGraphInfo::resolve(all_actor_infos)
}
MetadataManager::V2(mgr) => {
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightGraphInfo::resolve(all_actor_infos)
}
};

Ok(info)
Expand Down
Loading

0 comments on commit 911a072

Please sign in to comment.