From 3cda5df18a1162c4bac081a6674f55d4f018d475 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 28 Dec 2023 13:02:42 +0000 Subject: [PATCH] fix: replay memtable should from flushed_entry_id + 1 --- src/log-store/src/kafka/log_store.rs | 4 +--- src/mito2/src/region/opener.rs | 4 +++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 73b0fe1de2a9..1f3100abfeb1 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -193,10 +193,8 @@ impl LogStore for KafkaLogStore { && entry.ns.region_id == region_id { yield Ok(entries); - } else { - yield Ok(vec![]); } - + // Terminates the stream if the entry with the end offset was read. if record_offset >= end_offset { debug!( diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 7b969d578d00..322485256fab 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -380,7 +380,9 @@ pub(crate) async fn replay_memtable( // data in the WAL. let mut last_entry_id = flushed_entry_id; let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); - let mut wal_stream = wal.scan(region_id, flushed_entry_id, wal_options)?; + + let replay_from_entry_id = flushed_entry_id + 1; + let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; last_entry_id = last_entry_id.max(entry_id);