Skip to content

Commit

Permalink
rename to ConsumingUpstreamTableOrSource to be clearer
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 21, 2024
1 parent 12b8f3a commit d3ef9bb
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions src/stream/src/task/barrier_manager/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ type ConsumedRows = u64;

#[derive(Debug, Clone, Copy)]
pub(crate) enum BackfillState {
ConsumingUpstreamTable(ConsumedEpoch, ConsumedRows),
DoneConsumingUpstreamTable(ConsumedRows),
ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
DoneConsumingUpstreamTableOrSource(ConsumedRows),
ConsumingLogStore { pending_barrier_num: usize },
DoneConsumingLogStore,
}

impl BackfillState {
pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
let (done, consumed_epoch, consumed_rows, pending_barrier_num) = match self {
BackfillState::ConsumingUpstreamTable(consumed_epoch, consumed_rows) => {
BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
(false, consumed_epoch, consumed_rows, 0)
}
BackfillState::DoneConsumingUpstreamTable(consumed_rows) => (true, 0, consumed_rows, 0), /* unused field for done */
BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
(true, 0, consumed_rows, 0)
} // unused field for done
BackfillState::ConsumingLogStore {
pending_barrier_num,
} => (false, 0, 0, pending_barrier_num as _),
Expand All @@ -59,14 +61,14 @@ impl BackfillState {
impl Display for BackfillState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
BackfillState::ConsumingUpstreamTable(epoch, rows) => {
BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
write!(
f,
"ConsumingUpstreamTable(epoch: {}, rows: {})",
epoch, rows
)
}
BackfillState::DoneConsumingUpstreamTable(rows) => {
BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
}
BackfillState::ConsumingLogStore {
Expand Down Expand Up @@ -190,7 +192,7 @@ impl CreateMviewProgressReporter {
current_consumed_rows: ConsumedRows,
) {
match self.state {
Some(BackfillState::ConsumingUpstreamTable(last, last_consumed_rows)) => {
Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
assert!(
last < consumed_epoch,
"last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
Expand All @@ -209,18 +211,23 @@ impl CreateMviewProgressReporter {
};
self.update_inner(
epoch,
BackfillState::ConsumingUpstreamTable(consumed_epoch, current_consumed_rows),
BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
);
}

/// The difference from [`Self::update`] (MV backfill) is that we
/// don't care `ConsumedEpoch` here.
pub fn update_for_source_backfill(
&mut self,
epoch: EpochPair,
current_consumed_rows: ConsumedRows,
) {
match self.state {
Some(BackfillState::ConsumingUpstreamTable(last_epoch, _last_consumed_rows)) => {
debug_assert_eq!(last_epoch, 0);
Some(BackfillState::ConsumingUpstreamTableOrSource(
dummy_last_epoch,
_last_consumed_rows,
)) => {
debug_assert_eq!(dummy_last_epoch, 0);
}
Some(state) => {
panic!(
Expand All @@ -232,19 +239,20 @@ impl CreateMviewProgressReporter {
};
self.update_inner(
epoch,
BackfillState::ConsumingUpstreamTable(0, current_consumed_rows),
// fill a dummy ConsumedEpoch
BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
);
}

/// Finish the progress. If the progress is already finished, then perform no-op.
/// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
if let Some(BackfillState::DoneConsumingUpstreamTable(_)) = self.state {
if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
return;
}
self.update_inner(
epoch,
BackfillState::DoneConsumingUpstreamTable(current_consumed_rows),
BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
);
}

Expand All @@ -255,7 +263,7 @@ impl CreateMviewProgressReporter {
) {
assert_matches!(
self.state,
Some(BackfillState::DoneConsumingUpstreamTable(_))
Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
| Some(BackfillState::ConsumingLogStore { .. }),
"cannot update log store progress at state {:?}",
self.state
Expand All @@ -271,7 +279,7 @@ impl CreateMviewProgressReporter {
pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
assert_matches!(
self.state,
Some(BackfillState::DoneConsumingUpstreamTable(_))
Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
| Some(BackfillState::ConsumingLogStore { .. }),
"cannot finish log store progress at state {:?}",
self.state
Expand Down

0 comments on commit d3ef9bb

Please sign in to comment.