From 2c766903a863ececa1c66e85ce1a98e9fa0b5298 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Fri, 16 Jun 2023 10:59:32 +0800 Subject: [PATCH] fix data loss caused by atomic group being partially purged (#314) * remember atomic group in memtable Signed-off-by: tabokie * add tests Signed-off-by: tabokie * patch changelog Signed-off-by: tabokie --------- Signed-off-by: tabokie --- CHANGELOG.md | 3 +- src/engine.rs | 3 +- src/memtable.rs | 69 ++++++++++++++++++++++++++++++--- src/purge.rs | 42 ++++++++++++++++---- tests/failpoints/test_engine.rs | 48 ++++++++++++++++++++++- 5 files changed, 150 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 205d4efe..89b97e61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * `LogBatch::put` returns a `Result<()>` instead of `()`. It errs when the key is reserved for internal use. * Possible to specify a permission in `FileSystem::open`. +* Prometheus counter `raft_engine_log_file_count` no longer includes retired log files that are stashed for recycling. Those files are now tracked by a new counter `raft_engine_recycled_file_count`. ### Bug Fixes @@ -13,7 +14,7 @@ ### New Features -* Support preparing prefilled logs to enable log recycling when start-up. +* Support preparing prefilled logs to enable log recycling when start-up. The amount of logs to prepare is controlled by `Config::prefill_limit`. * Add a new configuration `spill-dir` to allow automatic placement of logs into an auxiliary directory when `dir` is full. * Add a new method `Engine::fork` to duplicate an `Engine` to a new place, with a few disk file copies. diff --git a/src/engine.rs b/src/engine.rs index f9e93648..42e55821 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -2417,6 +2417,7 @@ pub(crate) mod tests { dir: dir.path().to_str().unwrap().to_owned(), // Make sure each file gets replayed individually. recovery_threads: 100, + target_file_size: ReadableSize(1), ..Default::default() }; let fs = Arc::new(ObfuscatedFileSystem::default()); @@ -2550,7 +2551,7 @@ pub(crate) mod tests { assert!(data.remove(&rid), "{}", rid); assert_eq!(engine.get(rid, &key).unwrap(), value); } - assert!(data.is_empty()); + assert!(data.is_empty(), "data loss {:?}", data); } #[test] diff --git a/src/memtable.rs b/src/memtable.rs index 4a9dfd11..d46ba68b 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -155,6 +155,16 @@ pub struct MemTable { /// A map of active key value pairs. kvs: BTreeMap, (Vec, FileId)>, + /// (start_seq, end_seq). + /// If there's an active entry stored before end_seq, it possibly belongs to + /// an atomic group. In order to not lose this entry, We cannot delete any + /// other entries in that group. + /// Only applies to Rewrite queue. Each Raft Group has at most one atomic + /// group at a time, because we only use atomic group for rewrite-rewrite + /// operation, a group always contains all the Rewrite entries in a Raft + /// Group. + atomic_group: Option<(FileSeq, FileSeq)>, + /// Shared statistics. global_stats: Arc, @@ -183,6 +193,7 @@ impl MemTable { first_index: 0, rewrite_count: 0, kvs: BTreeMap::default(), + atomic_group: None, global_stats, _phantom: PhantomData, } @@ -216,6 +227,11 @@ impl MemTable { self.put(key.clone(), value.clone(), *file_id); } + if let Some(g) = rhs.atomic_group.take() { + assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0)); + self.atomic_group = Some(g); + } + let deleted = rhs.global_stats.deleted_rewrite_entries(); self.global_stats.add(LogQueue::Rewrite, deleted); self.global_stats.delete(LogQueue::Rewrite, deleted); @@ -251,6 +267,8 @@ impl MemTable { self.put(key.clone(), value.clone(), *file_id); } + assert!(rhs.atomic_group.is_none()); + let deleted = rhs.global_stats.deleted_rewrite_entries(); self.global_stats.add(LogQueue::Rewrite, deleted); self.global_stats.delete(LogQueue::Rewrite, deleted); @@ -526,6 +544,11 @@ impl MemTable { count as u64 } + pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) { + assert!(self.atomic_group.map_or(true, |(_, b)| b <= start)); + self.atomic_group = Some((start, end)); + } + /// Removes all entry indexes with index greater than or equal to `index`. /// Assumes `index` <= `last`. /// @@ -719,12 +742,20 @@ impl MemTable { Some(v.1.seq) } }); - match (ents_min, kvs_min) { - (Some(ents_min), Some(kvs_min)) => Some(std::cmp::min(kvs_min, ents_min)), - (Some(ents_min), None) => Some(ents_min), - (None, Some(kvs_min)) => Some(kvs_min), - (None, None) => None, + let res = match (ents_min, kvs_min) { + (Some(ents_min), Some(kvs_min)) => std::cmp::min(kvs_min, ents_min), + (Some(ents_min), None) => ents_min, + (None, Some(kvs_min)) => kvs_min, + (None, None) => return None, + }; + if queue == LogQueue::Rewrite { + if let Some((start, end)) = self.atomic_group { + if res <= end { + return Some(std::cmp::min(start, res)); + } + } } + Some(res) } #[inline] @@ -1154,6 +1185,11 @@ impl MemTableAccessor { } } + pub fn apply_rewrite_atomic_group(&self, raft: u64, start: FileSeq, end: FileSeq) { + let memtable = self.get_or_insert(raft); + memtable.write().apply_rewrite_atomic_group(start, end); + } + #[inline] fn slot_index(id: u64) -> usize { debug_assert!(MEMTABLE_SLOT_COUNT.is_power_of_two()); @@ -1171,6 +1207,8 @@ struct PendingAtomicGroup { status: AtomicGroupStatus, items: Vec, tombstone_items: Vec, + start: FileSeq, + end: FileSeq, } pub struct MemTableRecoverContext { @@ -1256,20 +1294,39 @@ impl MemTableRecoverContext { | (AtomicGroupStatus::Middle, AtomicGroupStatus::Middle) => { group.items.append(&mut new_group.items); group.tombstone_items.append(&mut new_group.tombstone_items); + assert!(group.end <= new_group.start); + group.end = new_group.end; } (AtomicGroupStatus::Middle, AtomicGroupStatus::End) => { group.items.append(&mut new_group.items); group.tombstone_items.append(&mut new_group.tombstone_items); group.status = new_group.status; + assert!(group.end <= new_group.start); + group.end = new_group.end; } (AtomicGroupStatus::Begin, AtomicGroupStatus::End) => { let mut group = groups.pop().unwrap(); + let mut rids = HashSet::with_capacity(1); + for item in group + .items + .iter() + .chain(group.tombstone_items.iter()) + .chain(new_group.items.iter()) + .chain(new_group.tombstone_items.iter()) + { + rids.insert(item.raft_group_id); + } self.tombstone_items.append(&mut group.tombstone_items); self.tombstone_items.append(&mut new_group.tombstone_items); self.memtables .replay_rewrite_writes(group.items.into_iter()); self.memtables .replay_rewrite_writes(new_group.items.into_iter()); + assert!(group.end <= new_group.start); + for rid in rids { + self.memtables + .apply_rewrite_atomic_group(rid, group.start, new_group.end); + } } } if groups.is_empty() { @@ -1329,6 +1386,8 @@ impl ReplayMachine for MemTableRecoverContext { status, items, tombstone_items: new_tombstones, + start: file_id.seq, + end: file_id.seq, }, ); } else { diff --git a/src/purge.rs b/src/purge.rs index 3e6b73b1..cb76f776 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -167,6 +167,22 @@ where .unwrap(); } + pub fn must_purge_all_stale(&self) { + let _lk = self.force_rewrite_candidates.try_lock().unwrap(); + self.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + self.rescan_memtables_and_purge_stale_files( + LogQueue::Rewrite, + self.pipe_log.file_span(LogQueue::Rewrite).1, + ) + .unwrap(); + self.pipe_log.rotate(LogQueue::Append).unwrap(); + self.rescan_memtables_and_purge_stale_files( + LogQueue::Append, + self.pipe_log.file_span(LogQueue::Append).1, + ) + .unwrap(); + } + pub(crate) fn needs_rewrite_log_files(&self, queue: LogQueue) -> bool { let (first_file, active_file) = self.pipe_log.file_span(queue); if active_file == first_file { @@ -281,7 +297,8 @@ where &mut log_batch, None, /* rewrite_watermark */ true, /* sync */ - ) + )?; + Ok(()) } // Exclusive. @@ -335,6 +352,7 @@ where let mut previous_size = log_batch.approximate_size(); let mut atomic_group = None; + let mut atomic_group_start = None; let mut current_entry_indexes = Vec::new(); let mut current_entries = Vec::new(); let mut current_size = 0; @@ -380,7 +398,10 @@ where )?; current_size = 0; previous_size = 0; - self.rewrite_impl(&mut log_batch, rewrite, false)?; + let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap(); + if needs_atomicity && atomic_group_start.is_none() { + atomic_group_start = Some(handle.id.seq); + } } } log_batch.add_raw_entries(region_id, current_entry_indexes, current_entries)?; @@ -389,12 +410,18 @@ where } if let Some(g) = atomic_group.as_mut() { g.end(&mut log_batch); - self.rewrite_impl(&mut log_batch, rewrite, false)?; + let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap(); + self.memtables.apply_rewrite_atomic_group( + region_id, + atomic_group_start.unwrap(), + handle.id.seq, + ); } else if log_batch.approximate_size() > max_batch_bytes() { self.rewrite_impl(&mut log_batch, rewrite, false)?; } } - self.rewrite_impl(&mut log_batch, rewrite, true) + self.rewrite_impl(&mut log_batch, rewrite, true)?; + Ok(()) } fn rewrite_impl( @@ -402,10 +429,11 @@ where log_batch: &mut LogBatch, rewrite_watermark: Option, sync: bool, - ) -> Result<()> { + ) -> Result> { if log_batch.is_empty() { debug_assert!(sync); - return self.pipe_log.sync(LogQueue::Rewrite); + self.pipe_log.sync(LogQueue::Rewrite)?; + return Ok(None); } log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; @@ -430,7 +458,7 @@ where .append .observe(file_handle.len as f64); } - Ok(()) + Ok(Some(file_handle)) } } diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index 5ed24155..4a29e9d8 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -886,7 +886,7 @@ fn test_split_rewrite_batch_imp(regions: u64, region_size: u64, split_size: u64, }) .unwrap(); let r = catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()); - fail::remove("atomic_group::begin"); + fail::remove("atomic_group::add"); fail::remove("log_fd::write::err"); if r.is_ok() { break; @@ -972,6 +972,52 @@ fn test_split_rewrite_batch_with_only_kvs() { } } +// issue-315 +#[test] +fn test_split_rewrite_batch_then_delete_some() { + let dir = tempfile::Builder::new() + .prefix("test_split_rewrite_batch_then_delete_some") + .tempdir() + .unwrap(); + let _f = FailGuard::new("max_rewrite_batch_bytes", "return(1)"); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(1), + ..Default::default() + }; + let mut log_batch = LogBatch::default(); + let value = vec![b'y'; 8]; + + let rid = 1; + let engine = Engine::open(cfg.clone()).unwrap(); + for i in 0..=5 { + append(&engine, rid, i * 2, i * 2 + 2, Some(&value)); + engine.purge_manager().must_rewrite_append_queue(None, None); + } + engine.purge_manager().must_rewrite_rewrite_queue(); + log_batch.add_command(rid, Command::Compact { index: 7 }); + log_batch.delete(rid, b"last_index".to_vec()); + engine.write(&mut log_batch, true).unwrap(); + engine.purge_manager().must_purge_all_stale(); + + drop(engine); + let engine = Engine::open(cfg.clone()).unwrap(); + // The Compact mark is dropped during `must_purge_all_stale`. + assert_eq!(engine.first_index(rid).unwrap(), 0); + assert_eq!(engine.last_index(rid).unwrap(), 11); + + // Removes all rewrite entries. + log_batch.add_command(rid, Command::Compact { index: 100 }); + engine.write(&mut log_batch, false).unwrap(); + append(&engine, rid, 5, 11, Some(&value)); + engine.purge_manager().must_rewrite_append_queue(None, None); + engine.purge_manager().must_purge_all_stale(); + drop(engine); + let engine = Engine::open(cfg).unwrap(); + assert_eq!(engine.first_index(rid).unwrap(), 5); + assert_eq!(engine.last_index(rid).unwrap(), 10); +} + #[test] fn test_build_engine_with_recycling_and_multi_dirs() { let dir = tempfile::Builder::new()