Skip to content

Commit

Permalink
feat(meta): maintain snapshot of worker and make inject-barrier non-a…
Browse files Browse the repository at this point in the history
…sync in worker loop
  • Loading branch information
wenym1 committed Feb 6, 2024
1 parent 2a98c6e commit 61162e3
Show file tree
Hide file tree
Showing 12 changed files with 568 additions and 299 deletions.
4 changes: 2 additions & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<RescheduleRequest>,
) -> Result<Response<RescheduleResponse>, Status> {
self.barrier_manager.check_status_running().await?;
self.barrier_manager.check_status_running()?;

let RescheduleRequest {
reschedules,
Expand Down Expand Up @@ -235,7 +235,7 @@ impl ScaleService for ScaleServiceImpl {
&self,
request: Request<GetReschedulePlanRequest>,
) -> Result<Response<GetReschedulePlanResponse>, Status> {
self.barrier_manager.check_status_running().await?;
self.barrier_manager.check_status_running()?;

let req = request.into_inner();

Expand Down
170 changes: 98 additions & 72 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::mem::take;
use std::sync::Arc;
use std::time::Duration;

use arc_swap::ArcSwap;
use fail::fail_point;
use itertools::Itertools;
use prometheus::HistogramTimer;
Expand All @@ -32,6 +33,7 @@ use risingwave_hummock_sdk::table_watermark::{
};
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
Expand All @@ -41,7 +43,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{info, warn, Instrument};

use self::command::CommandContext;
use self::notifier::Notifier;
Expand All @@ -54,7 +56,9 @@ use crate::barrier::state::BarrierManagerState;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
use crate::manager::{
ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId,
};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::stream::{ScaleController, ScaleControllerRef, SourceManagerRef};
Expand Down Expand Up @@ -134,7 +138,7 @@ struct Scheduled {

#[derive(Clone)]
pub struct GlobalBarrierManagerContext {
status: Arc<Mutex<BarrierManagerStatus>>,
status: Arc<ArcSwap<BarrierManagerStatus>>,

tracker: Arc<Mutex<CreateMviewProgressTracker>>,

Expand Down Expand Up @@ -183,6 +187,8 @@ pub struct GlobalBarrierManager {
checkpoint_control: CheckpointControl,

rpc_manager: BarrierRpcManager,

active_streaming_nodes: ActiveStreamingWorkerNodes,
}

/// Controls the concurrent execution of commands.
Expand Down Expand Up @@ -396,6 +402,9 @@ impl GlobalBarrierManager {
);
let checkpoint_control = CheckpointControl::new(metrics.clone());

let active_streaming_nodes =
ActiveStreamingWorkerNodes::uninitialized(metadata_manager.clone());

let tracker = CreateMviewProgressTracker::new();

let scale_controller = Arc::new(ScaleController::new(
Expand All @@ -406,7 +415,7 @@ impl GlobalBarrierManager {
));

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
status: Arc::new(ArcSwap::new(Arc::new(BarrierManagerStatus::Starting))),
metadata_manager,
hummock_manager,
source_manager,
Expand All @@ -429,6 +438,7 @@ impl GlobalBarrierManager {
state: initial_invalid_state,
checkpoint_control,
rpc_manager,
active_streaming_nodes,
}
}

Expand Down Expand Up @@ -480,6 +490,7 @@ impl GlobalBarrierManager {
let interval = Duration::from_millis(
self.env.system_params_reader().await.barrier_interval_ms() as u64,
);
self.scheduled_barriers.set_min_interval(interval);
tracing::info!(
"Starting barrier manager with: interval={:?}, enable_recovery={}, in_flight_barrier_nums={}",
interval,
Expand All @@ -504,7 +515,7 @@ impl GlobalBarrierManager {
}
}

self.state = {
{
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
assert_eq!(
latest_snapshot.committed_epoch, latest_snapshot.current_epoch,
Expand All @@ -517,23 +528,19 @@ impl GlobalBarrierManager {
// Even if there's no actor to recover, we still go through the recovery process to
// inject the first `Initial` barrier.
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap))
.await;
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap));
let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0);

let paused = self.take_pause_on_bootstrap().await.unwrap_or(false);
let paused_reason = paused.then_some(PausedReason::Manual);

self.context
.recovery(prev_epoch, paused_reason, &self.scheduled_barriers)
self.recovery(prev_epoch, paused_reason)
.instrument(span)
.await
};
.await;
}

self.context.set_status(BarrierManagerStatus::Running).await;
self.context.set_status(BarrierManagerStatus::Running);

let mut min_interval = tokio::time::interval(interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
Expand All @@ -551,16 +558,64 @@ impl GlobalBarrierManager {
tracing::info!("Barrier manager is stopped");
break;
}

changed_worker = self.active_streaming_nodes.changed() => {
#[cfg(debug_assertions)]
{
match self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
{
Ok(worker_nodes) => {
let ignore_irrelevant_info = |node: &WorkerNode| {
(
node.id,
WorkerNode {
id: node.id,
r#type: node.r#type,
host: node.host.clone(),
parallel_units: node.parallel_units.clone(),
property: node.property.clone(),
resource: node.resource.clone(),
..Default::default()
},
)
};
let worker_nodes: HashMap<_, _> =
worker_nodes.iter().map(ignore_irrelevant_info).collect();
let curr_worker_nodes: HashMap<_, _> = self
.active_streaming_nodes
.current()
.values()
.map(ignore_irrelevant_info)
.collect();
if worker_nodes != curr_worker_nodes {
warn!(
?worker_nodes,
?curr_worker_nodes,
"different to global snapshot"
);
}
}
Err(e) => {
warn!(e = ?e.as_report(), "fail to list_active_streaming_compute_nodes to compare with local snapshot");
}
}
}

info!(?changed_worker, "worker changed");

// TODO: may apply the changed worker to state
}

// Checkpoint frequency changes.
notification = local_notification_rx.recv() => {
let notification = notification.unwrap();
// Handle barrier interval and checkpoint frequency changes
if let LocalNotification::SystemParamsChange(p) = &notification {
let new_interval = Duration::from_millis(p.barrier_interval_ms() as u64);
if new_interval != min_interval.period() {
min_interval = tokio::time::interval(new_interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
}
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
}
Expand All @@ -572,42 +627,29 @@ impl GlobalBarrierManager {
)
.await;
}

// There's barrier scheduled.
_ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
min_interval.reset(); // Reset the interval as we have a new barrier.
self.handle_new_barrier().await;
}
// Minimum interval reached.
_ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
self.handle_new_barrier().await;
scheduled = self.scheduled_barriers.next_barrier(),
if self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums) => {
self.handle_new_barrier(scheduled);
}
}
self.checkpoint_control.update_barrier_nums_metrics();
}
}

/// Handle the new barrier from the scheduled queue and inject it.
async fn handle_new_barrier(&mut self) {
assert!(self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums));

fn handle_new_barrier(&mut self, scheduled: Scheduled) {
let Scheduled {
command,
mut notifiers,
send_latency_timer,
checkpoint,
span,
} = self.scheduled_barriers.pop_or_default().await;
} = scheduled;

let all_nodes = self
.context
.metadata_manager
.list_active_streaming_compute_nodes()
.await
.unwrap();
self.state.resolve_worker_nodes(all_nodes);
self.state
.resolve_worker_nodes(self.active_streaming_nodes.current().values().cloned());
let info = self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
Expand All @@ -629,15 +671,12 @@ impl GlobalBarrierManager {
command,
kind,
self.context.clone(),
span.clone(),
span,
));

send_latency_timer.observe_duration();

self.rpc_manager
.inject_barrier(command_ctx.clone())
.instrument(span)
.await;
self.rpc_manager.inject_barrier(command_ctx.clone());

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
Expand All @@ -649,7 +688,7 @@ impl GlobalBarrierManager {
prev_paused_reason,
curr_paused_reason,
};
notifiers.iter_mut().for_each(|n| n.notify_injected(info));
notifiers.iter_mut().for_each(|n| n.notify_started(info));

// Update the paused state after the barrier is injected.
self.state.set_paused_reason(curr_paused_reason);
Expand Down Expand Up @@ -728,8 +767,7 @@ impl GlobalBarrierManager {
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover(
err.clone(),
)))
.await;
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let span = tracing::info_span!(
Expand All @@ -740,12 +778,8 @@ impl GlobalBarrierManager {

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.state = self
.context
.recovery(prev_epoch, None, &self.scheduled_barriers)
.instrument(span)
.await;
self.context.set_status(BarrierManagerStatus::Running).await;
self.recovery(prev_epoch, None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
}
Expand Down Expand Up @@ -897,9 +931,9 @@ impl GlobalBarrierManager {

impl GlobalBarrierManagerContext {
/// Check the status of barrier manager, return error if it is not `Running`.
pub async fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.lock().await;
match &*status {
pub fn check_status_running(&self) -> MetaResult<()> {
let status = self.status.load();
match &**status {
BarrierManagerStatus::Starting
| BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => {
bail!("The cluster is bootstrapping")
Expand All @@ -912,33 +946,25 @@ impl GlobalBarrierManagerContext {
}

/// Set barrier manager status.
async fn set_status(&self, new_status: BarrierManagerStatus) {
let mut status = self.status.lock().await;
*status = new_status;
fn set_status(&self, new_status: BarrierManagerStatus) {
self.status.store(Arc::new(new_status));
}

/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn resolve_actor_info(&self) -> MetaResult<InflightActorInfo> {
async fn resolve_actor_info(
&self,
all_nodes: Vec<WorkerNode>,
) -> MetaResult<InflightActorInfo> {
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_nodes = mgr
.cluster_manager
.list_active_streaming_compute_nodes()
.await;
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
}
MetadataManager::V2(mgr) => {
let all_nodes = mgr
.cluster_controller
.list_active_streaming_workers()
.await
.unwrap();
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(all_nodes, all_actor_infos)
}
};
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub struct BarrierInfo {
/// Used for notifying the status of a scheduled command/barrier.
#[derive(Debug, Default)]
pub(crate) struct Notifier {
/// Get notified when scheduled barrier is injected to compute nodes.
pub injected: Option<oneshot::Sender<BarrierInfo>>,
/// Get notified when scheduled barrier has started to be handled.
pub started: Option<oneshot::Sender<BarrierInfo>>,

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<MetaResult<()>>>,
Expand All @@ -43,8 +43,8 @@ pub(crate) struct Notifier {

impl Notifier {
/// Notify when we have injected a barrier to compute nodes.
pub fn notify_injected(&mut self, info: BarrierInfo) {
if let Some(tx) = self.injected.take() {
pub fn notify_started(&mut self, info: BarrierInfo) {
if let Some(tx) = self.started.take() {
tx.send(info).ok();
}
}
Expand Down
Loading

0 comments on commit 61162e3

Please sign in to comment.