Skip to content

Commit

Permalink
rename to ConsumingUpstreamTableOrSource to be clearer
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 21, 2024
1 parent 8b4653e commit 5ad1867
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
5 changes: 5 additions & 0 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
break 'backfill_loop;
}
} else {
self.progress.update_for_source_backfill(
barrier.epoch,
backfill_stage.total_backfilled_rows(),
);
// yield barrier after reporting progress
yield Message::Barrier(barrier);
}
}
Expand Down
38 changes: 23 additions & 15 deletions src/stream/src/task/barrier_manager/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ type ConsumedRows = u64;

#[derive(Debug, Clone, Copy)]
pub(crate) enum BackfillState {
ConsumingUpstreamTable(ConsumedEpoch, ConsumedRows),
DoneConsumingUpstreamTable(ConsumedRows),
ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
DoneConsumingUpstreamTableOrSource(ConsumedRows),
ConsumingLogStore { pending_barrier_num: usize },
DoneConsumingLogStore,
}

impl BackfillState {
pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
let (done, consumed_epoch, consumed_rows, pending_barrier_num) = match self {
BackfillState::ConsumingUpstreamTable(consumed_epoch, consumed_rows) => {
BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
(false, consumed_epoch, consumed_rows, 0)
}
BackfillState::DoneConsumingUpstreamTable(consumed_rows) => (true, 0, consumed_rows, 0), /* unused field for done */
BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
(true, 0, consumed_rows, 0)
} // unused field for done
BackfillState::ConsumingLogStore {
pending_barrier_num,
} => (false, 0, 0, pending_barrier_num as _),
Expand All @@ -59,14 +61,14 @@ impl BackfillState {
impl Display for BackfillState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
BackfillState::ConsumingUpstreamTable(epoch, rows) => {
BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
write!(
f,
"ConsumingUpstreamTable(epoch: {}, rows: {})",
epoch, rows
)
}
BackfillState::DoneConsumingUpstreamTable(rows) => {
BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
}
BackfillState::ConsumingLogStore {
Expand Down Expand Up @@ -190,7 +192,7 @@ impl CreateMviewProgressReporter {
current_consumed_rows: ConsumedRows,
) {
match self.state {
Some(BackfillState::ConsumingUpstreamTable(last, last_consumed_rows)) => {
Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
assert!(
last < consumed_epoch,
"last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
Expand All @@ -209,18 +211,23 @@ impl CreateMviewProgressReporter {
};
self.update_inner(
epoch,
BackfillState::ConsumingUpstreamTable(consumed_epoch, current_consumed_rows),
BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
);
}

/// The difference from [`Self::update`] (MV backfill) is that we
/// don't care `ConsumedEpoch` here.
pub fn update_for_source_backfill(
&mut self,
epoch: EpochPair,
current_consumed_rows: ConsumedRows,
) {
match self.state {
Some(BackfillState::ConsumingUpstreamTable(last_epoch, _last_consumed_rows)) => {
debug_assert_eq!(last_epoch, 0);
Some(BackfillState::ConsumingUpstreamTableOrSource(
dummy_last_epoch,
_last_consumed_rows,
)) => {
debug_assert_eq!(dummy_last_epoch, 0);
}
Some(state) => {
panic!(
Expand All @@ -232,19 +239,20 @@ impl CreateMviewProgressReporter {
};
self.update_inner(
epoch,
BackfillState::ConsumingUpstreamTable(0, current_consumed_rows),
// fill a dummy ConsumedEpoch
BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
);
}

/// Finish the progress. If the progress is already finished, then perform no-op.
/// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
if let Some(BackfillState::DoneConsumingUpstreamTable(_)) = self.state {
if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
return;
}
self.update_inner(
epoch,
BackfillState::DoneConsumingUpstreamTable(current_consumed_rows),
BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
);
}

Expand All @@ -255,7 +263,7 @@ impl CreateMviewProgressReporter {
) {
assert_matches!(
self.state,
Some(BackfillState::DoneConsumingUpstreamTable(_))
Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
| Some(BackfillState::ConsumingLogStore { .. }),
"cannot update log store progress at state {:?}",
self.state
Expand All @@ -271,7 +279,7 @@ impl CreateMviewProgressReporter {
pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
assert_matches!(
self.state,
Some(BackfillState::DoneConsumingUpstreamTable(_))
Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
| Some(BackfillState::ConsumingLogStore { .. }),
"cannot finish log store progress at state {:?}",
self.state
Expand Down

0 comments on commit 5ad1867

Please sign in to comment.