From b526d159c3dd3bb9309720eadc96464ddb2e9b56 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 29 Dec 2023 01:12:07 +0900 Subject: [PATCH] fix: replay memtable should from `flushed_entry_id + 1` (#3038) * fix: replay memtable should from flushed_entry_id + 1 * chore: apply suggestions from CR --- src/mito2/src/region/opener.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 7b969d578d00..9fd6e36dc898 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -257,8 +257,9 @@ impl RegionOpener { let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { info!( - "Start replaying memtable at flushed_entry_id {} for region {}", - flushed_entry_id, region_id + "Start replaying memtable at flushed_entry_id + 1 {} for region {}", + flushed_entry_id + 1, + region_id ); replay_memtable( wal, @@ -380,9 +381,12 @@ pub(crate) async fn replay_memtable( // data in the WAL. let mut last_entry_id = flushed_entry_id; let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); - let mut wal_stream = wal.scan(region_id, flushed_entry_id, wal_options)?; + + let replay_from_entry_id = flushed_entry_id + 1; + let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; + debug_assert!(entry_id > flushed_entry_id); last_entry_id = last_entry_id.max(entry_id); for mutation in entry.mutations { rows_replayed += mutation