Skip to content

Commit

Permalink
feat: cherry pick #18798, #18819 and #18893 to release-2.1 (#19000)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Oct 18, 2024
1 parent 77b445f commit 55dc238
Show file tree
Hide file tree
Showing 27 changed files with 1,346 additions and 1,542 deletions.
6 changes: 5 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ message InputLevel {
repeated SstableInfo table_infos = 3;
}

message NewL0SubLevel {
repeated SstableInfo inserted_table_infos = 1;
}

message IntraLevelDelta {
uint32 level_idx = 1;
uint64 l0_sub_level_id = 2;
Expand Down Expand Up @@ -112,6 +116,7 @@ message GroupDelta {
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMerge group_merge = 6;
NewL0SubLevel new_l0_sub_level = 7;
}
}

Expand Down Expand Up @@ -528,7 +533,6 @@ message ReportCompactionTaskResponse {
message ValidationTask {
repeated SstableInfo sst_infos = 1;
map<uint64, uint32> sst_id_to_worker_id = 2;
uint64 epoch = 3;
}

// Delete SSTs in object store
Expand Down
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ message BarrierCompleteResponse {
bool done = 2;
uint64 consumed_epoch = 3;
uint64 consumed_rows = 4;
uint32 pending_barrier_num = 5;
}
string request_id = 1;
common.Status status = 2;
Expand Down
24 changes: 16 additions & 8 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,22 @@ pub async fn print_version_delta_in_archive(
}

fn match_delta(delta: &DeltaType, sst_id: HummockSstableObjectId) -> bool {
let DeltaType::IntraLevel(delta) = delta else {
return false;
};
delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id)
|| delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
match delta {
DeltaType::GroupConstruct(_) | DeltaType::GroupDestroy(_) | DeltaType::GroupMerge(_) => {
false
}
DeltaType::IntraLevel(delta) => {
delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id)
|| delta.removed_table_ids.iter().any(|sst| *sst == sst_id)
}
DeltaType::NewL0SubLevel(delta) => delta
.inserted_table_infos
.iter()
.any(|sst| sst.sst_id == sst_id),
}
}

fn print_delta(delta: &DeltaType) {
Expand Down
108 changes: 28 additions & 80 deletions src/meta/src/barrier/creating_job/barrier_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::Bound::{Excluded, Unbounded};
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::mem::take;
use std::ops::Bound::Unbounded;
use std::ops::{Bound, RangeBounds};
use std::time::Instant;

use prometheus::HistogramTimer;
Expand All @@ -26,54 +27,44 @@ use tracing::debug;

use crate::rpc::metrics::MetaMetrics;

#[derive(Debug)]
pub(super) enum CreatingStreamingJobBarrierType {
Snapshot,
LogStore,
Upstream,
}

#[derive(Debug)]
struct CreatingStreamingJobEpochState {
epoch: u64,
node_to_collect: HashSet<WorkerId>,
resps: Vec<BarrierCompleteResponse>,
upstream_epoch_to_notify: Option<u64>,
is_checkpoint: bool,
enqueue_time: Instant,
barrier_type: CreatingStreamingJobBarrierType,
}

#[derive(Debug)]
pub(super) struct CreatingStreamingJobBarrierControl {
table_id: TableId,
// key is prev_epoch of barrier
inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
backfill_epoch: u64,
initial_epoch: Option<u64>,
max_collected_epoch: Option<u64>,
max_attached_epoch: Option<u64>,
// newer epoch at the front. should all be checkpoint barrier
// newer epoch at the front.
pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,

// metrics
consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>,
consuming_log_store_barrier_latency: LabelGuardedHistogram<2>,
consuming_upstream_barrier_latency: LabelGuardedHistogram<2>,

wait_commit_latency: LabelGuardedHistogram<1>,
inflight_barrier_num: LabelGuardedIntGauge<1>,
}

impl CreatingStreamingJobBarrierControl {
pub(super) fn new(table_id: TableId, metrics: &MetaMetrics) -> Self {
pub(super) fn new(table_id: TableId, backfill_epoch: u64, metrics: &MetaMetrics) -> Self {
let table_id_str = format!("{}", table_id.table_id);
Self {
table_id,
inflight_barrier_queue: Default::default(),
backfill_epoch,
initial_epoch: None,
max_collected_epoch: None,
max_attached_epoch: None,
pending_barriers_to_complete: Default::default(),
completing_barrier: None,

Expand All @@ -83,9 +74,6 @@ impl CreatingStreamingJobBarrierControl {
consuming_log_store_barrier_latency: metrics
.snapshot_backfill_barrier_latency
.with_guarded_label_values(&[&table_id_str, "consuming_log_store"]),
consuming_upstream_barrier_latency: metrics
.snapshot_backfill_barrier_latency
.with_guarded_label_values(&[&table_id_str, "consuming_upstream"]),
wait_commit_latency: metrics
.snapshot_backfill_wait_commit_latency
.with_guarded_label_values(&[&table_id_str]),
Expand Down Expand Up @@ -127,7 +115,6 @@ impl CreatingStreamingJobBarrierControl {
epoch: u64,
node_to_collect: HashSet<WorkerId>,
is_checkpoint: bool,
barrier_type: CreatingStreamingJobBarrierType,
) {
debug!(
epoch,
Expand All @@ -142,17 +129,12 @@ impl CreatingStreamingJobBarrierControl {
if let Some(latest_epoch) = self.latest_epoch() {
assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
}
if let Some(max_attached_epoch) = self.max_attached_epoch {
assert!(epoch > max_attached_epoch);
}
let epoch_state = CreatingStreamingJobEpochState {
epoch,
node_to_collect,
resps: vec![],
upstream_epoch_to_notify: None,
is_checkpoint,
enqueue_time: Instant::now(),
barrier_type,
};
if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
self.add_collected(epoch_state);
Expand All @@ -163,41 +145,6 @@ impl CreatingStreamingJobBarrierControl {
.set(self.inflight_barrier_queue.len() as _);
}

pub(super) fn unattached_epochs(&self) -> impl Iterator<Item = (u64, bool)> + '_ {
let range_start = if let Some(max_attached_epoch) = self.max_attached_epoch {
Excluded(max_attached_epoch)
} else {
Unbounded
};
self.inflight_barrier_queue
.range((range_start, Unbounded))
.map(|(epoch, state)| (*epoch, state.is_checkpoint))
}

/// Attach an `upstream_epoch` to the `epoch` of the creating job.
///
/// The `upstream_epoch` won't be completed until the `epoch` of the creating job is completed so that
/// the `upstream_epoch` should wait for the progress of creating job, and we can ensure that the downstream
/// creating job can eventually catch up with the upstream.
pub(super) fn attach_upstream_epoch(&mut self, epoch: u64, upstream_epoch: u64) {
debug!(
epoch,
upstream_epoch,
table_id = ?self.table_id.table_id,
"attach epoch"
);
if let Some(max_attached_epoch) = self.max_attached_epoch {
assert!(epoch > max_attached_epoch);
}
self.max_attached_epoch = Some(epoch);
let epoch_state = self
.inflight_barrier_queue
.get_mut(&epoch)
.expect("should exist");
assert!(epoch_state.upstream_epoch_to_notify.is_none());
epoch_state.upstream_epoch_to_notify = Some(upstream_epoch);
}

pub(super) fn collect(
&mut self,
epoch: u64,
Expand Down Expand Up @@ -228,46 +175,47 @@ impl CreatingStreamingJobBarrierControl {
.set(self.inflight_barrier_queue.len() as _);
}

#[expect(clippy::type_complexity)]
/// Return (`upstream_epochs_to_notify`, Some((epoch, resps, `is_first_commit`)))
/// Return Some((epoch, resps, `is_first_commit`))
///
/// `upstream_epochs_to_notify` is the upstream epochs of non-checkpoint barriers to be notified about barrier completing.
/// These non-checkpoint barriers does not need to call `commit_epoch` and therefore can be completed as long as collected.
/// Only epoch within the `epoch_end_bound` can be started.
/// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
/// the creating job won't have higher committed epoch than the upstream.
pub(super) fn start_completing(
&mut self,
) -> (Vec<u64>, Option<(u64, Vec<BarrierCompleteResponse>, bool)>) {
if self.completing_barrier.is_some() {
return (vec![], None);
}
let mut upstream_epochs_to_notify = vec![];
while let Some(mut epoch_state) = self.pending_barriers_to_complete.pop_back() {
epoch_end_bound: Bound<u64>,
) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
assert!(self.completing_barrier.is_none());
let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
while let Some(epoch_state) = self.pending_barriers_to_complete.back()
&& epoch_range.contains(&epoch_state.epoch)
{
let mut epoch_state = self
.pending_barriers_to_complete
.pop_back()
.expect("non-empty");
let epoch = epoch_state.epoch;
let is_first = self.initial_epoch.expect("should have set") == epoch;
if is_first {
assert!(epoch_state.is_checkpoint);
} else if !epoch_state.is_checkpoint {
if let Some(upstream_epoch) = epoch_state.upstream_epoch_to_notify {
upstream_epochs_to_notify.push(upstream_epoch);
}
continue;
}

let resps = take(&mut epoch_state.resps);
self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
return (upstream_epochs_to_notify, Some((epoch, resps, is_first)));
return Some((epoch, resps, is_first));
}
(upstream_epochs_to_notify, None)
None
}

/// Ack on completing a checkpoint barrier.
///
/// Return the upstream epoch to be notified when there is any.
pub(super) fn ack_completed(&mut self, completed_epoch: u64) -> Option<u64> {
pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
let (epoch_state, wait_commit_timer) =
self.completing_barrier.take().expect("should exist");
wait_commit_timer.observe_duration();
assert_eq!(epoch_state.epoch, completed_epoch);
epoch_state.upstream_epoch_to_notify
}

fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
Expand All @@ -280,10 +228,10 @@ impl CreatingStreamingJobBarrierControl {
}
self.max_collected_epoch = Some(epoch_state.epoch);
let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
let barrier_latency_metrics = match &epoch_state.barrier_type {
CreatingStreamingJobBarrierType::Snapshot => &self.consuming_snapshot_barrier_latency,
CreatingStreamingJobBarrierType::LogStore => &self.consuming_log_store_barrier_latency,
CreatingStreamingJobBarrierType::Upstream => &self.consuming_upstream_barrier_latency,
let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch {
&self.consuming_snapshot_barrier_latency
} else {
&self.consuming_log_store_barrier_latency
};
barrier_latency_metrics.observe(barrier_latency);
self.pending_barriers_to_complete.push_front(epoch_state);
Expand Down
Loading

0 comments on commit 55dc238

Please sign in to comment.