diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index d634deb871f4..3b5ba500bd04 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -398,10 +398,9 @@ pub(crate) async fn replay_memtable( // Last entry id should start from flushed entry id since there might be no // data in the WAL. let mut last_entry_id = flushed_entry_id; - let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); - let replay_from_entry_id = flushed_entry_id + 1; let mut stale_entry_found = false; + let mut wal_stream = wal.scan(region_id, replay_from_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; @@ -417,8 +416,10 @@ pub(crate) async fn replay_memtable( } ); } - last_entry_id = last_entry_id.max(entry_id); + + let mut region_write_ctx = + RegionWriteCtx::new(region_id, version_control, wal_options.clone()); for mutation in entry.mutations { rows_replayed += mutation .rows @@ -427,11 +428,11 @@ pub(crate) async fn replay_memtable( .unwrap_or(0); region_write_ctx.push_mutation(mutation.op_type, mutation.rows, OptionOutputTx::none()); } - } - // set next_entry_id and write to memtable. - region_write_ctx.set_next_entry_id(last_entry_id + 1); - region_write_ctx.write_memtable(); + // set next_entry_id and write to memtable. + region_write_ctx.set_next_entry_id(last_entry_id + 1); + region_write_ctx.write_memtable(); + } if allow_stale_entries && stale_entry_found { wal.obsolete(region_id, flushed_entry_id, wal_options)