From a8cb5926db30fcb1db2db13a5bd2806a7c535ff3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 28 Dec 2023 13:07:24 +0000 Subject: [PATCH 1/2] fix: replay memtable should from flushed_entry_id + 1 --- src/mito2/src/region/opener.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 7b969d578d00..322485256fab 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -380,7 +380,9 @@ pub(crate) async fn replay_memtable( // data in the WAL. let mut last_entry_id = flushed_entry_id; let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); - let mut wal_stream = wal.scan(region_id, flushed_entry_id, wal_options)?; + + let replay_from_entry_id = flushed_entry_id + 1; + let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; last_entry_id = last_entry_id.max(entry_id); From 5a9d9e416795efb88234553640df831be1331b5c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 28 Dec 2023 14:54:51 +0000 Subject: [PATCH 2/2] chore: apply suggestions from CR --- src/mito2/src/region/opener.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 322485256fab..9fd6e36dc898 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -257,8 +257,9 @@ impl RegionOpener { let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { info!( - "Start replaying memtable at flushed_entry_id {} for region {}", - flushed_entry_id, region_id + "Start replaying memtable at flushed_entry_id + 1 {} for region {}", + flushed_entry_id + 1, + region_id ); replay_memtable( wal, @@ -385,6 +386,7 @@ pub(crate) async fn replay_memtable( let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; + debug_assert!(entry_id > flushed_entry_id); last_entry_id = last_entry_id.max(entry_id); for mutation in entry.mutations { rows_replayed += mutation