Skip to content

Commit

Permalink
recovered stream job requires checkpoint barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 3, 2023
1 parent 2745d99 commit 058b343
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl CheckpointControl {
async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult<bool> {
for command in self
.finished_commands
.extract_if(|c| checkpoint || c.is_barrier())
.extract_if(|c| checkpoint || !c.is_checkpoint())
{
// The command is ready to finish. We can now call `pre_finish`.
command.pre_finish().await?;
Expand Down
6 changes: 4 additions & 2 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ impl TrackingJob {
}
}

pub(crate) fn is_barrier(&self) -> bool {
pub(crate) fn is_checkpoint(&self) -> bool {

This comment has been minimized.

Copy link
@BugenZhao

BugenZhao Nov 3, 2023

Member

Could you explain more of this method?

This comment has been minimized.

Copy link
@kwannoel

kwannoel Nov 3, 2023

Author Contributor

Added some docs for it. I think is_checkpoint is more declarative then is_barrier, since what we are checking for is whether or not a command is from checkpoint. is_barrier was previously used, because it is the complement of it (i.e. if kind is barrier, then it is not a checkpoint barrier and vice versa), but it makes the purpose unclear, because is_barrier is implicitly the complement, rather than conceptually. So I use is_checkpoint here instead.

match self {
TrackingJob::Recovered(_) => true,
TrackingJob::New(command) => command.context.kind.is_barrier(),
TrackingJob::New(command) => {
command.context.kind.is_initial() || command.context.kind.is_checkpoint()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ async fn test_background_mv_barrier_recovery() -> Result<()> {
.run("SELECT v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)")
.await?;
tracing::debug!(missing_rows);
let missing_rows_with_row_id = session
.run("SELECT _row_id, v1 FROM t WHERE v1 NOT IN (SELECT v1 FROM mv1)")
.await?;
tracing::debug!(missing_rows_with_row_id);

assert_eq!(t_count, mv1_count);

Expand Down

0 comments on commit 058b343

Please sign in to comment.