Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot-backfill): control log store back pressure in backfill executor #18798

Merged
merged 31 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d58a70
feat(snapshot-backfill): control log store back pressure in backfill …
wenym1 Oct 4, 2024
648a770
merge create mview progress
wenym1 Oct 6, 2024
b6cb330
refactor and refine
wenym1 Oct 7, 2024
1b359b0
refactor
wenym1 Oct 8, 2024
07a33c3
refine
wenym1 Oct 9, 2024
f50cb2b
fix timeout
wenym1 Oct 9, 2024
c2789e8
avoid calling now
wenym1 Oct 9, 2024
f86c5c6
refactor
wenym1 Oct 9, 2024
f8df42d
refactor
wenym1 Oct 9, 2024
d85167e
temp revert to 1b359b0cb7 for test timeout
wenym1 Oct 10, 2024
a734be6
Revert "temp revert to 1b359b0cb7 for test timeout"
wenym1 Oct 10, 2024
9b7a63a
force checkpoint in normal barrier
wenym1 Oct 10, 2024
c613991
Merge branch 'main' into yiming/commit-multi-graph-together
wenym1 Oct 10, 2024
1e82a19
rename
wenym1 Oct 10, 2024
f645580
Merge branch 'main' into yiming/snapshot-backfill-executor-backpressure
wenym1 Oct 11, 2024
62106c4
transit to log store in collect
wenym1 Oct 12, 2024
22a5e3a
add comment
wenym1 Oct 12, 2024
f3c28f9
remove trait
wenym1 Oct 14, 2024
8f54f20
refactor to store pending barriers
wenym1 Oct 14, 2024
604b5b6
Merge branch 'main' into yiming/commit-multi-graph-together
wenym1 Oct 14, 2024
af34fd8
fix
wenym1 Oct 14, 2024
43c944c
Merge branch 'yiming/commit-multi-graph-together' into yiming/temp-me…
wenym1 Oct 15, 2024
1b617b3
Merge branch 'main' into yiming/snapshot-backfill-executor-backpressure
wenym1 Oct 15, 2024
1b013f1
Merge branch 'main' into yiming/commit-multi-graph-together
wenym1 Oct 15, 2024
0b01544
Merge branch 'yiming/snapshot-backfill-executor-backpressure' into yi…
wenym1 Oct 15, 2024
34c9c9b
Merge branch 'yiming/commit-multi-graph-together' into yiming/temp-me…
wenym1 Oct 15, 2024
7d32768
Merge branch 'main' into yiming/commit-multi-graph-together
wenym1 Oct 16, 2024
3a41584
Merge branch 'yiming/commit-multi-graph-together' into yiming/snapsho…
wenym1 Oct 16, 2024
8d2cde2
Merge branch 'main' into yiming/snapshot-backfill-executor-backpressure
wenym1 Oct 17, 2024
80df807
address comment
wenym1 Oct 17, 2024
72274be
Merge branch 'main' into yiming/snapshot-backfill-executor-backpressure
wenym1 Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ message BarrierCompleteResponse {
bool done = 2;
uint64 consumed_epoch = 3;
uint64 consumed_rows = 4;
uint32 pending_barrier_num = 5;
}
string request_id = 1;
common.Status status = 2;
Expand Down
106 changes: 26 additions & 80 deletions src/meta/src/barrier/creating_job/barrier_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::Bound::{Excluded, Unbounded};
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::mem::take;
use std::ops::Bound::Unbounded;
use std::ops::{Bound, RangeBounds};
use std::time::Instant;

use prometheus::HistogramTimer;
Expand All @@ -26,54 +27,44 @@ use tracing::debug;
use crate::manager::WorkerId;
use crate::rpc::metrics::MetaMetrics;

#[derive(Debug)]
pub(super) enum CreatingStreamingJobBarrierType {
Snapshot,
LogStore,
Upstream,
}

#[derive(Debug)]
struct CreatingStreamingJobEpochState {
epoch: u64,
node_to_collect: HashSet<WorkerId>,
resps: Vec<BarrierCompleteResponse>,
upstream_epoch_to_notify: Option<u64>,
is_checkpoint: bool,
enqueue_time: Instant,
barrier_type: CreatingStreamingJobBarrierType,
}

#[derive(Debug)]
pub(super) struct CreatingStreamingJobBarrierControl {
table_id: TableId,
// key is prev_epoch of barrier
inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
backfill_epoch: u64,
initial_epoch: Option<u64>,
max_collected_epoch: Option<u64>,
max_attached_epoch: Option<u64>,
// newer epoch at the front. should all be checkpoint barrier
// newer epoch at the front.
pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,

// metrics
consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>,
consuming_log_store_barrier_latency: LabelGuardedHistogram<2>,
consuming_upstream_barrier_latency: LabelGuardedHistogram<2>,

wait_commit_latency: LabelGuardedHistogram<1>,
inflight_barrier_num: LabelGuardedIntGauge<1>,
}

impl CreatingStreamingJobBarrierControl {
pub(super) fn new(table_id: TableId, metrics: &MetaMetrics) -> Self {
pub(super) fn new(table_id: TableId, backfill_epoch: u64, metrics: &MetaMetrics) -> Self {
let table_id_str = format!("{}", table_id.table_id);
Self {
table_id,
inflight_barrier_queue: Default::default(),
backfill_epoch,
initial_epoch: None,
max_collected_epoch: None,
max_attached_epoch: None,
pending_barriers_to_complete: Default::default(),
completing_barrier: None,

Expand All @@ -83,9 +74,6 @@ impl CreatingStreamingJobBarrierControl {
consuming_log_store_barrier_latency: metrics
.snapshot_backfill_barrier_latency
.with_guarded_label_values(&[&table_id_str, "consuming_log_store"]),
consuming_upstream_barrier_latency: metrics
.snapshot_backfill_barrier_latency
.with_guarded_label_values(&[&table_id_str, "consuming_upstream"]),
wait_commit_latency: metrics
.snapshot_backfill_wait_commit_latency
.with_guarded_label_values(&[&table_id_str]),
Expand Down Expand Up @@ -127,7 +115,6 @@ impl CreatingStreamingJobBarrierControl {
epoch: u64,
node_to_collect: HashSet<WorkerId>,
is_checkpoint: bool,
barrier_type: CreatingStreamingJobBarrierType,
) {
debug!(
epoch,
Expand All @@ -142,17 +129,12 @@ impl CreatingStreamingJobBarrierControl {
if let Some(latest_epoch) = self.latest_epoch() {
assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
}
if let Some(max_attached_epoch) = self.max_attached_epoch {
assert!(epoch > max_attached_epoch);
}
let epoch_state = CreatingStreamingJobEpochState {
epoch,
node_to_collect,
resps: vec![],
upstream_epoch_to_notify: None,
is_checkpoint,
enqueue_time: Instant::now(),
barrier_type,
};
if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
self.add_collected(epoch_state);
Expand All @@ -163,41 +145,6 @@ impl CreatingStreamingJobBarrierControl {
.set(self.inflight_barrier_queue.len() as _);
}

pub(super) fn unattached_epochs(&self) -> impl Iterator<Item = (u64, bool)> + '_ {
let range_start = if let Some(max_attached_epoch) = self.max_attached_epoch {
Excluded(max_attached_epoch)
} else {
Unbounded
};
self.inflight_barrier_queue
.range((range_start, Unbounded))
.map(|(epoch, state)| (*epoch, state.is_checkpoint))
}

/// Attach an `upstream_epoch` to the `epoch` of the creating job.
///
/// The `upstream_epoch` won't be completed until the `epoch` of the creating job is completed so that
/// the `upstream_epoch` should wait for the progress of creating job, and we can ensure that the downstream
/// creating job can eventually catch up with the upstream.
pub(super) fn attach_upstream_epoch(&mut self, epoch: u64, upstream_epoch: u64) {
debug!(
epoch,
upstream_epoch,
table_id = ?self.table_id.table_id,
"attach epoch"
);
if let Some(max_attached_epoch) = self.max_attached_epoch {
assert!(epoch > max_attached_epoch);
}
self.max_attached_epoch = Some(epoch);
let epoch_state = self
.inflight_barrier_queue
.get_mut(&epoch)
.expect("should exist");
assert!(epoch_state.upstream_epoch_to_notify.is_none());
epoch_state.upstream_epoch_to_notify = Some(upstream_epoch);
}

pub(super) fn collect(
&mut self,
epoch: u64,
Expand Down Expand Up @@ -228,46 +175,45 @@ impl CreatingStreamingJobBarrierControl {
.set(self.inflight_barrier_queue.len() as _);
}

#[expect(clippy::type_complexity)]
/// Return (`upstream_epochs_to_notify`, Some((epoch, resps, `is_first_commit`)))
/// Return Some((epoch, resps, `is_first_commit`))
///
/// `upstream_epochs_to_notify` is the upstream epochs of non-checkpoint barriers to be notified about barrier completing.
/// These non-checkpoint barriers does not need to call `commit_epoch` and therefore can be completed as long as collected.
/// Only epoch within the `epoch_end_bound` can be started.
pub(super) fn start_completing(
&mut self,
) -> (Vec<u64>, Option<(u64, Vec<BarrierCompleteResponse>, bool)>) {
if self.completing_barrier.is_some() {
return (vec![], None);
}
let mut upstream_epochs_to_notify = vec![];
while let Some(mut epoch_state) = self.pending_barriers_to_complete.pop_back() {
epoch_end_bound: Bound<u64>,
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
assert!(self.completing_barrier.is_none());
let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
while let Some(epoch_state) = self.pending_barriers_to_complete.back()
&& epoch_range.contains(&epoch_state.epoch)
{
let mut epoch_state = self
.pending_barriers_to_complete
.pop_back()
.expect("non-empty");
let epoch = epoch_state.epoch;
let is_first = self.initial_epoch.expect("should have set") == epoch;
if is_first {
assert!(epoch_state.is_checkpoint);
} else if !epoch_state.is_checkpoint {
if let Some(upstream_epoch) = epoch_state.upstream_epoch_to_notify {
upstream_epochs_to_notify.push(upstream_epoch);
}
continue;
}

let resps = take(&mut epoch_state.resps);
self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
return (upstream_epochs_to_notify, Some((epoch, resps, is_first)));
return Some((epoch, resps, is_first));
}
(upstream_epochs_to_notify, None)
None
}

/// Ack on completing a checkpoint barrier.
///
/// Return the upstream epoch to be notified when there is any.
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
pub(super) fn ack_completed(&mut self, completed_epoch: u64) -> Option<u64> {
pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
let (epoch_state, wait_commit_timer) =
self.completing_barrier.take().expect("should exist");
wait_commit_timer.observe_duration();
assert_eq!(epoch_state.epoch, completed_epoch);
epoch_state.upstream_epoch_to_notify
}

fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
Expand All @@ -280,10 +226,10 @@ impl CreatingStreamingJobBarrierControl {
}
self.max_collected_epoch = Some(epoch_state.epoch);
let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
let barrier_latency_metrics = match &epoch_state.barrier_type {
CreatingStreamingJobBarrierType::Snapshot => &self.consuming_snapshot_barrier_latency,
CreatingStreamingJobBarrierType::LogStore => &self.consuming_log_store_barrier_latency,
CreatingStreamingJobBarrierType::Upstream => &self.consuming_upstream_barrier_latency,
let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch {
&self.consuming_snapshot_barrier_latency
} else {
&self.consuming_log_store_barrier_latency
};
barrier_latency_metrics.observe(barrier_latency);
self.pending_barriers_to_complete.push_front(epoch_state);
Expand Down
Loading
Loading