Skip to content

Commit

Permalink
feat(meta): collect non-checkpoint epoch for checkpoint barrier commit (
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Apr 19, 2024
1 parent edc65a0 commit 8b089f1
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
36 changes: 35 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_pb::catalog::CreateType;
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier::BarrierKind as PbBarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
use risingwave_pb::stream_plan::update_mutation::*;
Expand Down Expand Up @@ -304,6 +304,40 @@ impl Command {
}
}

#[derive(Debug)]
pub enum BarrierKind {
Initial,
Barrier,
/// Hold a list of previous non-checkpoint prev-epoch + current prev-epoch
Checkpoint(Vec<u64>),
}

impl BarrierKind {
pub fn to_protobuf(&self) -> PbBarrierKind {
match self {
BarrierKind::Initial => PbBarrierKind::Initial,
BarrierKind::Barrier => PbBarrierKind::Barrier,
BarrierKind::Checkpoint(_) => PbBarrierKind::Checkpoint,
}
}

pub fn is_checkpoint(&self) -> bool {
matches!(self, BarrierKind::Checkpoint(_))
}

pub fn is_initial(&self) -> bool {
matches!(self, BarrierKind::Initial)
}

pub fn as_str_name(&self) -> &'static str {
match self {
BarrierKind::Initial => "Initial",
BarrierKind::Barrier => "Barrier",
BarrierKind::Checkpoint(_) => "Checkpoint",
}
}
}

/// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given
/// [`Command`].
pub struct CommandContext {
Expand Down
44 changes: 24 additions & 20 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use risingwave_pb::catalog::table::TableType;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -77,7 +76,7 @@ mod schedule;
mod state;
mod trace;

pub use self::command::{Command, ReplaceTablePlan, Reschedule};
pub use self::command::{BarrierKind, Command, ReplaceTablePlan, Reschedule};
pub use self::rpc::StreamRpcManager;
pub use self::schedule::BarrierScheduler;
pub use self::trace::TracedEpoch;
Expand Down Expand Up @@ -191,6 +190,9 @@ pub struct GlobalBarrierManager {

checkpoint_control: CheckpointControl,

/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,

active_streaming_nodes: ActiveStreamingWorkerNodes,

control_stream_manager: ControlStreamManager,
Expand Down Expand Up @@ -477,6 +479,7 @@ impl GlobalBarrierManager {
env,
state: initial_invalid_state,
checkpoint_control,
pending_non_checkpoint_barriers: Vec::new(),
active_streaming_nodes,
control_stream_manager,
}
Expand Down Expand Up @@ -720,8 +723,11 @@ impl GlobalBarrierManager {
let info = self.state.apply_command(&command);

let (prev_epoch, curr_epoch) = self.state.next_epoch_pair();
self.pending_non_checkpoint_barriers
.push(prev_epoch.value().0);
let kind = if checkpoint {
BarrierKind::Checkpoint
let epochs = take(&mut self.pending_non_checkpoint_barriers);
BarrierKind::Checkpoint(epochs)
} else {
BarrierKind::Barrier
};
Expand Down Expand Up @@ -782,6 +788,7 @@ impl GlobalBarrierManager {
async fn failure_recovery(&mut self, err: MetaError) {
self.context.tracker.lock().await.abort_all(&err);
self.checkpoint_control.clear_on_err(&err).await;
self.pending_non_checkpoint_barriers.clear();

if self.enable_recovery {
self.context
Expand Down Expand Up @@ -844,8 +851,11 @@ impl GlobalBarrierManagerContext {
assert!(state.node_to_collect.is_empty());
let resps = state.resps;
let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer();
let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps, &command_ctx);
if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await {
let create_mview_progress = resps
.iter()
.flat_map(|resp| resp.create_mview_progress.iter().cloned())
.collect();
if let Err(e) = self.update_snapshot(&command_ctx, resps).await {
for notifier in notifiers {
notifier.notify_collection_failed(e.clone());
}
Expand All @@ -872,7 +882,7 @@ impl GlobalBarrierManagerContext {
async fn update_snapshot(
&self,
command_ctx: &CommandContext,
commit_info: CommitEpochInfo,
resps: Vec<BarrierCompleteResponse>,
) -> MetaResult<()> {
{
{
Expand All @@ -881,17 +891,13 @@ impl GlobalBarrierManagerContext {
// because the storage engine will query from new to old in the order in which
// the L0 layer files are generated.
// See https://github.com/risingwave-labs/risingwave/issues/1251
let kind = command_ctx.kind;
// hummock_manager commit epoch.
let mut new_snapshot = None;

match kind {
BarrierKind::Unspecified => unreachable!(),
BarrierKind::Initial => assert!(
commit_info.sstables.is_empty(),
"no sstables should be produced in the first epoch"
),
BarrierKind::Checkpoint => {
match &command_ctx.kind {
BarrierKind::Initial => {}
BarrierKind::Checkpoint(epochs) => {
let commit_info = collect_commit_epoch_info(resps, command_ctx, epochs);
new_snapshot = self
.hummock_manager
.commit_epoch(command_ctx.prev_epoch.value().0, commit_info)
Expand Down Expand Up @@ -1138,11 +1144,11 @@ pub type BarrierManagerRef = GlobalBarrierManagerContext;
fn collect_commit_epoch_info(
resps: Vec<BarrierCompleteResponse>,
command_ctx: &CommandContext,
) -> (CommitEpochInfo, Vec<CreateMviewProgress>) {
_epochs: &Vec<u64>,
) -> CommitEpochInfo {
let mut sst_to_worker: HashMap<HummockSstableObjectId, WorkerId> = HashMap::new();
let mut synced_ssts: Vec<ExtendedSstableInfo> = vec![];
let mut table_watermarks = Vec::with_capacity(resps.len());
let mut progresses = Vec::new();
for resp in resps {
let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| {
let sst_info = grouped.sst.expect("field not None");
Expand All @@ -1155,7 +1161,6 @@ fn collect_commit_epoch_info(
});
synced_ssts.extend(ssts_iter);
table_watermarks.push(resp.table_watermarks);
progresses.extend(resp.create_mview_progress);
}
let new_table_fragment_info = if let Command::CreateStreamingJob {
table_fragments, ..
Expand All @@ -1174,7 +1179,7 @@ fn collect_commit_epoch_info(
None
};

let info = CommitEpochInfo::new(
CommitEpochInfo::new(
synced_ssts,
merge_multiple_new_table_watermarks(
table_watermarks
Expand All @@ -1194,6 +1199,5 @@ fn collect_commit_epoch_info(
),
sst_to_worker,
new_table_fragment_info,
);
(info, progresses)
)
}
3 changes: 1 addition & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::{PausedReason, Recovery};
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use thiserror_ext::AsReport;
Expand All @@ -41,7 +40,7 @@ use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::state::BarrierManagerState;
use crate::barrier::{Command, GlobalBarrierManager, GlobalBarrierManagerContext};
use crate::barrier::{BarrierKind, Command, GlobalBarrierManager, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl ControlStreamManager {
command_context.curr_epoch.span(),
)
.to_protobuf(),
kind: command_context.kind as i32,
kind: command_context.kind.to_protobuf() as i32,
passed_actors: vec![],
};

Expand Down

0 comments on commit 8b089f1

Please sign in to comment.