Skip to content

Commit

Permalink
feat(meta): maintain snapshot of active streaming compute node in wor…
Browse files Browse the repository at this point in the history
…ker loop (#15033)
  • Loading branch information
wenym1 authored Feb 7, 2024
1 parent 85f0023 commit 8d0f414
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 144 deletions.
9 changes: 8 additions & 1 deletion src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::{HashMap, HashSet};

use risingwave_pb::common::PbWorkerNode;
use tracing::warn;

use crate::manager::{ActorInfos, WorkerId};
use crate::model::ActorId;
Expand Down Expand Up @@ -87,10 +88,16 @@ impl InflightActorInfo {

/// Update worker nodes snapshot. We need to support incremental updates for it in the future.
pub fn resolve_worker_nodes(&mut self, all_nodes: impl IntoIterator<Item = PbWorkerNode>) {
self.node_map = all_nodes
let new_node_map = all_nodes
.into_iter()
.map(|node| (node.id, node))
.collect::<HashMap<_, _>>();
for (actor_id, location) in &self.actor_location_map {
if !new_node_map.contains_key(location) {
warn!(actor_id, location, node = ?self.node_map.get(location), "node with running actors is deleted");
}
}
self.node_map = new_node_map;
}

/// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
Expand Down
107 changes: 75 additions & 32 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_hummock_sdk::table_watermark::{
};
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
Expand All @@ -41,7 +42,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{info, warn, Instrument};

use self::command::CommandContext;
use self::notifier::Notifier;
Expand All @@ -54,7 +55,9 @@ use crate::barrier::state::BarrierManagerState;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
use crate::manager::{
ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId,
};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleControllerRef, SourceManagerRef};
Expand Down Expand Up @@ -183,6 +186,8 @@ pub struct GlobalBarrierManager {
checkpoint_control: CheckpointControl,

rpc_manager: BarrierRpcManager,

active_streaming_nodes: ActiveStreamingWorkerNodes,
}

/// Controls the concurrent execution of commands.
Expand Down Expand Up @@ -397,6 +402,8 @@ impl GlobalBarrierManager {
);
let checkpoint_control = CheckpointControl::new(metrics.clone());

let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized();

let tracker = CreateMviewProgressTracker::new();

let context = GlobalBarrierManagerContext {
Expand All @@ -423,6 +430,7 @@ impl GlobalBarrierManager {
state: initial_invalid_state,
checkpoint_control,
rpc_manager,
active_streaming_nodes,
}
}

Expand All @@ -447,7 +455,7 @@ impl GlobalBarrierManager {
.await
.pause_on_next_bootstrap();
if paused {
tracing::warn!(
warn!(
"The cluster will bootstrap with all data sources paused as specified by the system parameter `{}`. \
It will now be reset to `false`. \
To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
Expand Down Expand Up @@ -498,7 +506,7 @@ impl GlobalBarrierManager {
}
}

self.state = {
{
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
assert_eq!(
latest_snapshot.committed_epoch, latest_snapshot.current_epoch,
Expand All @@ -518,11 +526,10 @@ impl GlobalBarrierManager {
let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);

self.context
.recovery(prev_epoch, paused_reason, &self.scheduled_barriers)
self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await
};
.await;
}

self.context.set_status(BarrierManagerStatus::Running).await;

Expand All @@ -545,6 +552,59 @@ impl GlobalBarrierManager {
tracing::info!("Barrier manager is stopped");
break;
}

changed_worker = self.active_streaming_nodes.changed() => {
#[cfg(debug_assertions)]
{
match self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
{
Ok(worker_nodes) => {
let ignore_irrelevant_info = |node: &WorkerNode| {
(
node.id,
WorkerNode {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallel_units: node.parallel_units.clone(),
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
},
)
};
let worker_nodes: HashMap<_, _> =
worker_nodes.iter().map(ignore_irrelevant_info).collect();
let curr_worker_nodes: HashMap<_, _> = self
.active_streaming_nodes
.current()
.values()
.map(ignore_irrelevant_info)
.collect();
if worker_nodes != curr_worker_nodes {
warn!(
?worker_nodes,
?curr_worker_nodes,
"different to global snapshot"
);
}
}
Err(e) => {
warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
}
}
}

info!(?changed_worker, "worker changed");

self.state
.resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned());
}

// Checkpoint frequency changes.
notification = local_notification_rx.recv() => {
let notification = notification.unwrap();
Expand Down Expand Up @@ -595,13 +655,6 @@ impl GlobalBarrierManager {
span,
} = self.scheduled_barriers.pop_or_default().await;

let all_nodes = self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
.unwrap();
self.state.resolve_worker_nodes(all_nodes);
let info = self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
Expand Down Expand Up @@ -668,7 +721,7 @@ impl GlobalBarrierManager {
// back to frontend
fail_point!("inject_barrier_err_success");
let fail_node = self.checkpoint_control.barrier_failed();
tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch");
warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch");
self.failure_recovery(err, fail_node).await;
return;
}
Expand All @@ -693,7 +746,7 @@ impl GlobalBarrierManager {
.drain(index..)
.chain(self.checkpoint_control.barrier_failed().into_iter())
.collect_vec();
tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch");
warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch");
self.failure_recovery(err, fail_nodes).await;
}
}
Expand Down Expand Up @@ -734,11 +787,7 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.state = self
.context
.recovery(prev_epoch, None, &self.scheduled_barriers)
.instrument(span)
.await;
self.recovery(prev_epoch, None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running).await;
} else {
panic!("failed to execute barrier: {}", err.as_report());
Expand Down Expand Up @@ -914,23 +963,17 @@ impl GlobalBarrierManagerContext {
/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn resolve_actor_info(&self) -> MetaResult<InflightActorInfo> {
async fn resolve_actor_info(
&self,
all_nodes: Vec<WorkerNode>,
) -> MetaResult<InflightActorInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_nodes = mgr
.cluster_manager
.list_active_streaming_compute_nodes()
.await;
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
}
MetadataManager::V2(mgr) => {
let all_nodes = mgr
.cluster_controller
.list_active_streaming_workers()
.await
.unwrap();
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
Expand Down
Loading

0 comments on commit 8d0f414

Please sign in to comment.