Skip to content

Commit

Permalink
refactor(meta): reorganize code of global barrier manager (part 2) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 12, 2024
1 parent fe65509 commit 022c0a4
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 195 deletions.
4 changes: 2 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,8 @@ message EventLog {
string error = 3;
}
message EventCollectBarrierFail {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
reserved 1, 2;
reserved "prev_epoch", "cur_epoch";
string error = 3;
}
message EventWorkerNodePanic {
Expand Down
121 changes: 100 additions & 21 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use fail::fail_point;
use prometheus::HistogramTimer;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::PbSubscriptionUpstreamInfo;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::{debug, warn};

Expand All @@ -35,22 +37,32 @@ use crate::barrier::notifier::Notifier;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
use crate::barrier::rpc::{from_partial_graph_id, ControlStreamManager};
use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
use crate::barrier::utils::{collect_commit_epoch_info, collect_creating_job_commit_epoch_info};
use crate::barrier::utils::collect_creating_job_commit_epoch_info;
use crate::barrier::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
SnapshotBackfillInfo, TracedEpoch,
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::MetaResult;
use crate::{MetaError, MetaResult};

#[derive(Default)]
pub(crate) struct CheckpointControl {
pub(crate) databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
pub(crate) hummock_version_stats: HummockVersionStats,
databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
hummock_version_stats: HummockVersionStats,
}

impl CheckpointControl {
pub(crate) fn new(
databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
hummock_version_stats: HummockVersionStats,
) -> Self {
Self {
databases,
hummock_version_stats,
}
}

pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
self.hummock_version_stats = output.hummock_version_stats;
for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
Expand Down Expand Up @@ -204,23 +216,88 @@ impl CheckpointControl {
.values()
.for_each(|database| database.update_barrier_nums_metrics());
}

pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
let mut progress = HashMap::new();
for database_checkpoint_control in self.databases.values() {
// Progress of normal backfill
progress.extend(
database_checkpoint_control
.create_mview_tracker
.gen_ddl_progress(),
);
// Progress of snapshot backfill
for creating_job in database_checkpoint_control
.creating_streaming_job_controls
.values()
{
progress.extend([(
creating_job.info.table_fragments.table_id().table_id,
creating_job.gen_ddl_progress(),
)]);
}
}
progress
}

pub(crate) fn is_failed_at_worker_err(&self, worker_id: WorkerId) -> bool {
for database_checkpoint_control in self.databases.values() {
let failed_barrier =
database_checkpoint_control.barrier_wait_collect_from_worker(worker_id as _);
if failed_barrier.is_some()
|| database_checkpoint_control
.state
.inflight_graph_info
.contains_worker(worker_id as _)
|| database_checkpoint_control
.creating_streaming_job_controls
.values()
.any(|job| job.is_wait_on_worker(worker_id))
{
return true;
}
}
false
}

pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
for (_, node) in self
.databases
.values_mut()
.flat_map(|database| take(&mut database.command_ctx_queue))
{
for notifier in node.notifiers {
notifier.notify_failed(err.clone());
}
node.enqueue_time.observe_duration();
}
self.databases
.values_mut()
.for_each(|database| database.create_mview_tracker.abort_all());
}

pub(crate) fn subscriptions(&self) -> impl Iterator<Item = PbSubscriptionUpstreamInfo> + '_ {
self.databases
.values()
.flat_map(|database| &database.state.inflight_subscription_info)
}
}

/// Controls the concurrent execution of commands.
pub(crate) struct DatabaseCheckpointControl {
database_id: DatabaseId,
pub(crate) state: BarrierWorkerState,
state: BarrierWorkerState,

/// Save the state and message of barrier in order.
/// Key is the `prev_epoch`.
pub(crate) command_ctx_queue: BTreeMap<u64, EpochNode>,
command_ctx_queue: BTreeMap<u64, EpochNode>,
/// The barrier that are completing.
/// Some((`prev_epoch`, `should_pause_inject_barrier`))
completing_barrier: Option<(u64, bool)>,

pub(crate) creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,

pub(crate) create_mview_tracker: CreateMviewProgressTracker,
create_mview_tracker: CreateMviewProgressTracker,
}

impl DatabaseCheckpointControl {
Expand Down Expand Up @@ -531,9 +608,12 @@ impl DatabaseCheckpointControl {
let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
assert!(node.state.creating_jobs_to_wait.is_empty());
assert!(node.state.node_to_collect.is_empty());
let mut finished_jobs = self
.create_mview_tracker
.apply_collected_command(&node, hummock_version_stats);
let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
node.command_ctx.command.as_ref(),
&node.command_ctx.barrier_info,
&node.state.resps,
hummock_version_stats,
);
if !node.command_ctx.barrier_info.kind.is_checkpoint() {
assert!(finished_jobs.is_empty());
node.notifiers.into_iter().for_each(|notifier| {
Expand Down Expand Up @@ -561,10 +641,9 @@ impl DatabaseCheckpointControl {
}));
});
let task = task.get_or_insert_default();
collect_commit_epoch_info(
node.command_ctx.collect_commit_epoch_info(
&mut task.commit_info,
take(&mut node.state.resps),
&node.command_ctx,
self.collect_backfill_pinned_upstream_log_epoch(),
);
self.completing_barrier = Some((
Expand Down Expand Up @@ -630,25 +709,25 @@ impl DatabaseCheckpointControl {
}

/// The state and message of this barrier, a node for concurrent checkpoint.
pub(crate) struct EpochNode {
struct EpochNode {
/// Timer for recording barrier latency, taken after `complete_barriers`.
pub(crate) enqueue_time: HistogramTimer,
enqueue_time: HistogramTimer,

/// Whether this barrier is in-flight or completed.
pub(crate) state: BarrierEpochState,
state: BarrierEpochState,

/// Context of this command to generate barrier and do some post jobs.
pub(crate) command_ctx: CommandContext,
command_ctx: CommandContext,
/// Notifiers of this barrier.
pub(crate) notifiers: Vec<Notifier>,
notifiers: Vec<Notifier>,
}

#[derive(Debug)]
/// The state of barrier.
pub(crate) struct BarrierEpochState {
pub(crate) node_to_collect: HashSet<WorkerId>,
struct BarrierEpochState {
node_to_collect: HashSet<WorkerId>,

pub(crate) resps: Vec<BarrierCompleteResponse>,
resps: Vec<BarrierCompleteResponse>,

creating_jobs_to_wait: HashSet<TableId>,

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ mod control;
mod creating_job;
mod state;

pub(super) use control::{CheckpointControl, DatabaseCheckpointControl, EpochNode};
pub(super) use control::{CheckpointControl, DatabaseCheckpointControl};
pub(super) use state::BarrierWorkerState;
6 changes: 3 additions & 3 deletions src/meta/src/barrier/checkpoint/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ pub(crate) struct BarrierWorkerState {
pending_non_checkpoint_barriers: Vec<u64>,

/// Inflight running actors info.
pub(crate) inflight_graph_info: InflightDatabaseInfo,
pub(super) inflight_graph_info: InflightDatabaseInfo,

pub(crate) inflight_subscription_info: InflightSubscriptionInfo,
pub(super) inflight_subscription_info: InflightSubscriptionInfo,

/// Whether the cluster is paused and the reason.
paused_reason: Option<PausedReason>,
}

impl BarrierWorkerState {
pub fn new() -> Self {
pub(super) fn new() -> Self {
Self {
in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
pending_non_checkpoint_barriers: vec![],
Expand Down
Loading

0 comments on commit 022c0a4

Please sign in to comment.