Skip to content

Commit

Permalink
fix: fix log store truncate offset incorrect check (#18165)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 21, 2024
1 parent ff55fcf commit dd65156
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
7 changes: 7 additions & 0 deletions src/stream/src/common/log_store_impl/kv_log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,13 @@ mod tests {
let chunk_ids = check_reader(&mut reader, data[1..].iter()).await;
assert_eq!(2, chunk_ids.len());

reader
.truncate(TruncateOffset::Chunk {
epoch: epoch2,
chunk_id: chunk_ids[0],
})
.unwrap();

reader
.truncate(TruncateOffset::Barrier { epoch: epoch2 })
.unwrap();
Expand Down
30 changes: 20 additions & 10 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,24 +591,34 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
self.latest_offset
));
}
if let Some(truncate_offset) = &self.truncate_offset {
if offset <= *truncate_offset {
return Err(anyhow!(
"truncate offset {:?} earlier than prev truncate offset {:?}",
offset,
truncate_offset
));
}
}
if offset.epoch() >= self.first_write_epoch.expect("should have init") {
if let Some(truncate_offset) = &self.truncate_offset {
if offset <= *truncate_offset {
return Err(anyhow!(
"truncate offset {:?} earlier than prev truncate offset {:?}",
offset,
truncate_offset
));
}
}
self.rx.truncate_buffer(offset);
self.truncate_offset = Some(offset);
} else {
// For historical data, no need to truncate at seq id level. Only truncate at barrier.
if let TruncateOffset::Barrier { epoch } = &offset {
if let Some(truncate_offset) = &self.truncate_offset {
if offset <= *truncate_offset {
return Err(anyhow!(
"truncate offset {:?} earlier than prev truncate offset {:?}",
offset,
truncate_offset
));
}
}
self.rx.truncate_historical(*epoch);
self.truncate_offset = Some(offset);
}
}
self.truncate_offset = Some(offset);
Ok(())
}

Expand Down

0 comments on commit dd65156

Please sign in to comment.