From e30cfebb5241dbcdfc29ed6e73ff0971288e35a0 Mon Sep 17 00:00:00 2001 From: Kould <2435992353@qq.com> Date: Wed, 31 Jan 2024 00:58:29 +0800 Subject: [PATCH] refactor: cancel seek return elements and optimize copying of write_buf in transaction iterators --- Cargo.toml | 2 +- src/kernel/lsm/iterator/level_iter.rs | 41 ++-- src/kernel/lsm/iterator/merging_iter.rs | 209 ++++++++++++-------- src/kernel/lsm/iterator/mod.rs | 4 +- src/kernel/lsm/mem_table.rs | 56 +++--- src/kernel/lsm/mvcc.rs | 113 +++++------ src/kernel/lsm/table/btree_table/iter.rs | 25 ++- src/kernel/lsm/table/btree_table/mod.rs | 5 +- src/kernel/lsm/table/mod.rs | 6 +- src/kernel/lsm/table/ss_table/block_iter.rs | 67 ++++--- src/kernel/lsm/table/ss_table/iter.rs | 56 ++++-- src/kernel/lsm/table/ss_table/mod.rs | 8 +- src/kernel/lsm/version/iter.rs | 14 +- 13 files changed, 335 insertions(+), 271 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3bc4f98..2316af4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kip_db" -version = "0.1.2-alpha.23.fix5" +version = "0.1.2-alpha.24" edition = "2021" authors = ["Kould "] description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库" diff --git a/src/kernel/lsm/iterator/level_iter.rs b/src/kernel/lsm/iterator/level_iter.rs index c8f07d9..23d5311 100644 --- a/src/kernel/lsm/iterator/level_iter.rs +++ b/src/kernel/lsm/iterator/level_iter.rs @@ -1,5 +1,5 @@ use crate::kernel::lsm::compactor::LEVEL_0; -use crate::kernel::lsm::iterator::{Iter, Seek}; +use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::version::Version; use crate::kernel::KernelResult; @@ -13,7 +13,7 @@ pub(crate) struct LevelIter<'a> { level_len: usize, offset: usize, - child_iter: Box + 'a + Sync + Send>, + child_iter: Box + 'a + Sync + Send>, } impl<'a> LevelIter<'a> { @@ -32,19 +32,19 @@ impl<'a> LevelIter<'a> { }) } - fn child_iter_seek(&mut self, seek: Seek<'_>, offset: usize) -> KernelResult> { + fn child_iter_seek(&mut self, seek: Seek<'_>, offset: usize) -> KernelResult<()> { self.offset = offset; if self.is_valid() { if let Some(table) = self.version.table(self.level, offset) { self.child_iter = table.iter()?; - return self.child_iter.seek(seek); + self.child_iter.seek(seek)?; } } - Ok(None) + Ok(()) } - fn seek_ward(&mut self, key: &[u8], seek: Seek<'_>) -> KernelResult> { + fn seek_ward(&mut self, key: &[u8], seek: Seek<'_>) -> KernelResult<()> { let level = self.level; if level == LEVEL_0 { @@ -59,7 +59,10 @@ impl<'a> Iter<'a> for LevelIter<'a> { fn try_next(&mut self) -> KernelResult> { match self.child_iter.try_next()? { - None => self.child_iter_seek(Seek::First, self.offset + 1), + None => { + self.child_iter_seek(Seek::First, self.offset + 1)?; + self.child_iter.try_next() + } Some(item) => Ok(Some(item)), } } @@ -67,10 +70,12 @@ impl<'a> Iter<'a> for LevelIter<'a> { fn is_valid(&self) -> bool { self.offset < self.level_len } +} +impl<'a> SeekIter<'a> for LevelIter<'a> { /// Tips: Level 0的LevelIter不支持Seek /// 因为Level 0中的SSTable并非有序排列,其中数据范围是可能交错的 - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> { match seek { Seek::First => self.child_iter_seek(Seek::First, 0), Seek::Last => self.child_iter_seek(Seek::Last, self.level_len - 1), @@ -83,7 +88,7 @@ impl<'a> Iter<'a> for LevelIter<'a> { mod tests { use crate::kernel::io::IoType; use crate::kernel::lsm::iterator::level_iter::LevelIter; - use crate::kernel::lsm::iterator::{Iter, Seek}; + use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::log::LogLoader; use crate::kernel::lsm::mem_table::DEFAULT_WAL_PATH; use crate::kernel::lsm::storage::Config; @@ -161,19 +166,17 @@ mod tests { assert_eq!(iterator.try_next()?.unwrap(), kv.clone()); } - assert_eq!( - iterator.seek(Seek::Backward(&vec_data[114].0))?.unwrap(), - vec_data[114] - ); + iterator.seek(Seek::Backward(&vec_data[114].0))?; + assert_eq!(iterator.try_next()?.unwrap(), vec_data[114]); - assert_eq!( - iterator.seek(Seek::Backward(&vec_data[2048].0))?.unwrap(), - vec_data[2048] - ); + iterator.seek(Seek::Backward(&vec_data[2048].0))?; + assert_eq!(iterator.try_next()?.unwrap(), vec_data[2048]); - assert_eq!(iterator.seek(Seek::First)?.unwrap(), vec_data[0]); + iterator.seek(Seek::First)?; + assert_eq!(iterator.try_next()?.unwrap(), vec_data[0]); - assert_eq!(iterator.seek(Seek::Last)?.unwrap(), vec_data[3999]); + iterator.seek(Seek::Last)?; + assert_eq!(iterator.try_next()?, None); let mut iterator_level_0 = LevelIter::new(&version, 0)?; diff --git a/src/kernel/lsm/iterator/merging_iter.rs b/src/kernel/lsm/iterator/merging_iter.rs index 9298dc9..ab50488 100644 --- a/src/kernel/lsm/iterator/merging_iter.rs +++ b/src/kernel/lsm/iterator/merging_iter.rs @@ -1,10 +1,9 @@ -use crate::kernel::lsm::iterator::{Iter, Seek}; +use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::KernelResult; use bytes::Bytes; use std::cmp::Ordering; use std::collections::BTreeMap; -use std::ops::Bound::{Included, Unbounded}; /// 用于取值以及对应的Iter下标 /// 通过序号进行同值优先获取 @@ -28,14 +27,23 @@ impl Ord for IterKey { } } -pub(crate) struct MergingIter<'a> { - vec_iter: Vec + 'a + Send + Sync>>, +struct InnerIter { map_buf: BTreeMap, pre_key: Option, } +pub(crate) struct MergingIter<'a> { + vec_iter: Vec + 'a + Send + Sync>>, + inner: InnerIter, +} + +pub(crate) struct SeekMergingIter<'a> { + vec_iter: Vec + 'a + Send + Sync>>, + inner: InnerIter, +} + impl<'a> MergingIter<'a> { - #[allow(dead_code, clippy::mutable_key_type)] + #[allow(clippy::mutable_key_type)] pub(crate) fn new( mut vec_iter: Vec + 'a + Send + Sync>>, ) -> KernelResult { @@ -43,86 +51,113 @@ impl<'a> MergingIter<'a> { for (num, iter) in vec_iter.iter_mut().enumerate() { if let Some(item) = iter.try_next()? { - Self::buf_map_insert(&mut map_buf, num, item); + InnerIter::buf_map_insert(&mut map_buf, num, item); } } Ok(MergingIter { vec_iter, - map_buf, - pre_key: None, + inner: InnerIter { + map_buf, + pre_key: None, + }, }) } } -impl<'a> Iter<'a> for MergingIter<'a> { - type Item = KeyValue; - - fn try_next(&mut self) -> KernelResult> { - while let Some((IterKey { num, .. }, old_item)) = self.map_buf.pop_first() { - if let Some(item) = self.vec_iter[num].try_next()? { - Self::buf_map_insert(&mut self.map_buf, num, item); - } +impl<'a> SeekMergingIter<'a> { + #[allow(clippy::mutable_key_type)] + pub(crate) fn new( + mut vec_iter: Vec + 'a + Send + Sync>>, + ) -> KernelResult { + let mut map_buf = BTreeMap::new(); - // 跳过重复元素 - if let Some(key) = &self.pre_key { - if key == &old_item.0 { - continue; - } + for (num, iter) in vec_iter.iter_mut().enumerate() { + if let Some(item) = iter.try_next()? { + InnerIter::buf_map_insert(&mut map_buf, num, item); } - self.pre_key = Some(old_item.0.clone()); - - return Ok(Some(old_item)); } - Ok(None) + Ok(SeekMergingIter { + vec_iter, + inner: InnerIter { + map_buf, + pre_key: None, + }, + }) } +} - fn is_valid(&self) -> bool { - self.vec_iter +macro_rules! is_valid { + ($vec_iter:expr) => { + $vec_iter .iter() .map(|iter| iter.is_valid()) .all(|is_valid| is_valid) + }; +} + +impl<'a> Iter<'a> for MergingIter<'a> { + type Item = KeyValue; + + fn try_next(&mut self) -> KernelResult> { + self.inner.try_next_1(&mut self.vec_iter) } - #[allow(clippy::mutable_key_type)] - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { - let mut seek_map = BTreeMap::new(); + fn is_valid(&self) -> bool { + is_valid!(&self.vec_iter) + } +} - for (num, iter) in self.vec_iter.iter_mut().enumerate() { - if let Some(item) = iter.seek(seek)? { - Self::buf_map_insert(&mut seek_map, num, item); - } - } +impl<'a> Iter<'a> for SeekMergingIter<'a> { + type Item = KeyValue; - if let Seek::Last = seek { - self.map_buf.clear(); - - // 有点复杂不是么 - // 先弹出最小的元素 - // 当多个Iter seek至最尾端后存在最小元素且最小元素的key有重复的情况下,去num(iter序号)最小的元素 - // 当num为0时,则直接选择该最优先的iter的元素 - Ok(seek_map.pop_last().map(|(IterKey { key, num }, item)| { - (num != 0) - .then(|| { - seek_map - .range((Included(&IterKey { num: 0, key }), Unbounded)) - .next() - .map(|(_, range_item)| range_item.clone()) - }) - .flatten() - .unwrap_or(item) - })) - } else { - self.map_buf = seek_map; + fn try_next(&mut self) -> KernelResult> { + self.inner.try_next_2(&mut self.vec_iter) + } - self.try_next() - } + fn is_valid(&self) -> bool { + is_valid!(&self.vec_iter) } } -#[allow(clippy::mutable_key_type)] -impl MergingIter<'_> { +macro_rules! impl_try_next { + ($func:ident, $vec_iter:ty) => { + impl InnerIter { + fn $func(&mut self, vec_iter: &mut [$vec_iter]) -> KernelResult> { + while let Some((IterKey { num, .. }, old_item)) = self.map_buf.pop_first() { + if let Some(item) = vec_iter[num].try_next()? { + Self::buf_map_insert(&mut self.map_buf, num, item); + } + + // 跳过重复元素 + if let Some(key) = &self.pre_key { + if key == &old_item.0 { + continue; + } + } + self.pre_key = Some(old_item.0.clone()); + + return Ok(Some(old_item)); + } + + Ok(None) + } + } + }; +} + +impl_try_next!( + try_next_1, + Box + '_ + Send + Sync> +); +impl_try_next!( + try_next_2, + Box + '_ + Send + Sync> +); + +impl InnerIter { + #[allow(clippy::mutable_key_type)] fn buf_map_insert(seek_map: &mut BTreeMap, num: usize, item: KeyValue) { let _ = seek_map.insert( IterKey { @@ -134,11 +169,37 @@ impl MergingIter<'_> { } } +impl<'a> SeekIter<'a> for SeekMergingIter<'a> { + #[allow(clippy::mutable_key_type)] + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> { + if let Seek::Last = seek { + self.inner.map_buf.clear(); + } else { + let mut seek_map = BTreeMap::new(); + + for (num, iter) in self.vec_iter.iter_mut().enumerate() { + iter.seek(seek)?; + + if let Some(item) = iter.try_next()? { + InnerIter::buf_map_insert(&mut seek_map, num, item); + } + } + + self.inner.map_buf = seek_map; + } + + Ok(()) + } +} + +#[allow(clippy::mutable_key_type)] +impl MergingIter<'_> {} + #[cfg(test)] mod tests { use crate::kernel::io::{FileExtension, IoFactory, IoType}; - use crate::kernel::lsm::iterator::merging_iter::MergingIter; - use crate::kernel::lsm::iterator::{Iter, Seek}; + use crate::kernel::lsm::iterator::merging_iter::SeekMergingIter; + use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::storage::Config; use crate::kernel::lsm::table::btree_table::iter::BTreeTableIter; @@ -173,7 +234,7 @@ mod tests { Some((Bytes::from(vec![b'7']), Some(Bytes::from(vec![b'1'])))), Some((Bytes::from(vec![b'8']), None)), Some((Bytes::from(vec![b'1']), None)), - Some((Bytes::from(vec![b'8']), None)), + None, Some((Bytes::from(vec![b'6']), None)), ]; @@ -200,7 +261,7 @@ mod tests { Some((Bytes::from(vec![b'7']), Some(Bytes::from(vec![b'1'])))), Some((Bytes::from(vec![b'8']), None)), Some((Bytes::from(vec![b'1']), None)), - Some((Bytes::from(vec![b'8']), None)), + None, Some((Bytes::from(vec![b'6']), None)), ]; @@ -227,7 +288,7 @@ mod tests { None, None, Some((Bytes::from(vec![b'4']), Some(Bytes::from(vec![b'0'])))), - Some((Bytes::from(vec![b'6']), Some(Bytes::from(vec![b'0'])))), + None, Some((Bytes::from(vec![b'5']), None)), ]; @@ -270,7 +331,7 @@ mod tests { let mut sequence_iter = sequence.into_iter(); - let mut merging_iter = MergingIter::new(vec![Box::new(bt_iter), Box::new(sst_iter)])?; + let mut merging_iter = SeekMergingIter::new(vec![Box::new(bt_iter), Box::new(sst_iter)])?; assert_eq!(merging_iter.try_next()?, sequence_iter.next().flatten()); @@ -284,20 +345,14 @@ mod tests { assert_eq!(merging_iter.try_next()?, sequence_iter.next().flatten()); - assert_eq!( - merging_iter.seek(Seek::First)?, - sequence_iter.next().flatten() - ); + merging_iter.seek(Seek::First)?; + assert_eq!(merging_iter.try_next()?, sequence_iter.next().flatten()); - assert_eq!( - merging_iter.seek(Seek::Last)?, - sequence_iter.next().flatten() - ); + merging_iter.seek(Seek::Last)?; + assert_eq!(merging_iter.try_next()?, sequence_iter.next().flatten()); - assert_eq!( - merging_iter.seek(Seek::Backward(&[b'5']))?, - sequence_iter.next().flatten() - ); + merging_iter.seek(Seek::Backward(&[b'5']))?; + assert_eq!(merging_iter.try_next()?, sequence_iter.next().flatten()); Ok(()) } diff --git a/src/kernel/lsm/iterator/mod.rs b/src/kernel/lsm/iterator/mod.rs index 9b71c0d..c76bd0c 100644 --- a/src/kernel/lsm/iterator/mod.rs +++ b/src/kernel/lsm/iterator/mod.rs @@ -21,8 +21,10 @@ pub trait Iter<'a> { fn try_next(&mut self) -> KernelResult>; fn is_valid(&self) -> bool; +} - fn seek(&mut self, seek: Seek<'_>) -> KernelResult>; +pub trait SeekIter<'a>: Iter<'a> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()>; } /// 向前迭代器 diff --git a/src/kernel/lsm/mem_table.rs b/src/kernel/lsm/mem_table.rs index d5b3b48..214154b 100644 --- a/src/kernel/lsm/mem_table.rs +++ b/src/kernel/lsm/mem_table.rs @@ -1,5 +1,5 @@ use crate::kernel::io::IoWriter; -use crate::kernel::lsm::iterator::{Iter, Seek}; +use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::log::{LogLoader, LogWriter}; use crate::kernel::lsm::storage::{Config, Gen, Sequence}; use crate::kernel::lsm::table::ss_table::block::{Entry, Value}; @@ -114,30 +114,29 @@ impl<'a> Iter<'a> for MemMapIter<'a> { fn is_valid(&self) -> bool { true } +} - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { +impl<'a> SeekIter<'a> for MemMapIter<'a> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> { self.prev_item = None; - self.iter = match seek { - Seek::First => Some(self.mem_map.iter()), - Seek::Last => None, - Seek::Backward(seek_key) => Some(self.mem_map.range( - Bound::Included(&InternalKey::new_with_seq( - Bytes::copy_from_slice(seek_key), - 0, - )), - Bound::Unbounded, - )), - }; if let Seek::Last = seek { - Ok(self - .mem_map - .iter() - .last() - .map(|(InternalKey { key, .. }, value)| (key.clone(), value.clone()))) + self.iter = None; } else { - self.try_next() + self.iter = match seek { + Seek::First => Some(self.mem_map.iter()), + Seek::Last => None, + Seek::Backward(seek_key) => Some(self.mem_map.range( + Bound::Included(&InternalKey::new_with_seq( + Bytes::copy_from_slice(seek_key), + 0, + )), + Bound::Unbounded, + )), + }; } + + Ok(()) } } @@ -368,10 +367,7 @@ impl MemTable { let de_dupe_merge_sort_fn = |mem: Vec, immut_mem: Vec| { let fn_push = |results: &mut Vec, item: &mut Option, new_item| { if let Some(item) = mem::replace(item, new_item) { - if !matches!( - results.last().and_then(|(key, _)| Some(key == &item.0)), - Some(true) - ) { + if !matches!(results.last().map(|(key, _)| key == &item.0), Some(true)) { results.push(item) } } @@ -483,7 +479,7 @@ pub(crate) fn data_to_bytes(data: KeyValue) -> KernelResult> { #[cfg(test)] mod tests { - use crate::kernel::lsm::iterator::{Iter, Seek}; + use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::{ data_to_bytes, InternalKey, KeyValue, MemMap, MemMapIter, MemTable, }; @@ -721,16 +717,14 @@ mod tests { assert_eq!(iter.try_next()?, Some((key_4_2.key.clone(), None))); - assert_eq!(iter.seek(Seek::First)?, Some((key_1_2.key.clone(), None))); - - assert_eq!(iter.seek(Seek::Last)?, Some((key_4_2.key.clone(), None))); + iter.seek(Seek::First)?; + assert_eq!(iter.try_next()?, Some((key_1_2.key.clone(), None))); + iter.seek(Seek::Last)?; assert_eq!(iter.try_next()?, None); - assert_eq!( - iter.seek(Seek::Backward(&[b'3']))?, - Some((key_4_2.key.clone(), None)) - ); + iter.seek(Seek::Backward(&[b'3']))?; + assert_eq!(iter.try_next()?, Some((key_4_2.key.clone(), None))); Ok(()) } diff --git a/src/kernel/lsm/mvcc.rs b/src/kernel/lsm/mvcc.rs index 9ba1ea6..bae5ffe 100644 --- a/src/kernel/lsm/mvcc.rs +++ b/src/kernel/lsm/mvcc.rs @@ -1,6 +1,6 @@ use crate::kernel::lsm::compactor::CompactTask; use crate::kernel::lsm::iterator::merging_iter::MergingIter; -use crate::kernel::lsm::iterator::{Iter, Seek}; +use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::{KeyValue, MemTable}; use crate::kernel::lsm::query_and_compaction; use crate::kernel::lsm::storage::{KipStorage, Sequence, StoreInner}; @@ -11,8 +11,8 @@ use crate::KernelError; use bytes::Bytes; use core::slice::SlicePattern; use itertools::Itertools; -use skiplist::SkipMap; -use std::collections::Bound; +use std::collections::btree_map::Range; +use std::collections::{BTreeMap, Bound}; use std::ptr::NonNull; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -36,7 +36,7 @@ pub struct Transaction { seq_id: i64, check_type: CheckType, - write_buf: Option>>, + write_buf: Option>>, } impl Transaction { @@ -54,8 +54,8 @@ impl Transaction { } } - fn write_buf_or_init(&mut self) -> &mut SkipMap> { - self.write_buf.get_or_insert_with(SkipMap::new) + fn write_buf_or_init(&mut self) -> &mut BTreeMap> { + self.write_buf.get_or_insert_with(BTreeMap::new) } /// 通过Key获取对应的Value @@ -140,29 +140,16 @@ impl Transaction { min: Bound<&[u8]>, max: Bound<&[u8]>, ) -> KernelResult { - let mut write_buf_ptr = None; let mut vec_iter: Vec + 'a + Send + Sync>> = Vec::with_capacity(3); - let option_write_buf = self.write_buf.as_ref().map(|buf| { - buf.range( + if let Some(write_buf) = &self.write_buf { + let range = write_buf.range::, Bound<&Bytes>)>(( min.map(Bytes::copy_from_slice).as_ref(), max.map(Bytes::copy_from_slice).as_ref(), - ) - .map(|(key, value)| (key.clone(), value.clone())) - .collect_vec() - }); - if let Some(write_buf) = option_write_buf { - let buf_ptr = BufPtr(Box::leak(Box::new(write_buf)).into()); - let buf_iter = unsafe { - BufIter { - inner: buf_ptr.0.as_ref(), - pos: 0, - } - }; + )); - write_buf_ptr = Some(buf_ptr); - vec_iter.push(Box::new(buf_iter)); + vec_iter.push(Box::new(InnerIter { iter: range })); } let mem_buf = self.mem_table().range_scan(min, max, Some(self.seq_id)); @@ -174,34 +161,23 @@ impl Transaction { pos: 0, } })); - VersionIter::merging_with_version(&self.version, &mut vec_iter)?; - - let mut inner = MergingIter::new(vec_iter)?; - let mut init_buf = None; + let mut ver_iter = VersionIter::new(&self.version)?; match &min { - Bound::Included(key) => { - init_buf = inner - .seek(Seek::Backward(key.as_slice()))? - .and_then(|data| (data.0 >= key).then_some(data)); - } - Bound::Excluded(key) => { - init_buf = inner - .seek(Seek::Backward(key.as_slice()))? - .and_then(|data| (data.0 > key).then_some(data)); + Bound::Included(key) | Bound::Excluded(key) => { + ver_iter.seek(Seek::Backward(key.as_slice()))?; } Bound::Unbounded => (), }; + vec_iter.push(Box::new(ver_iter)); Ok(TransactionIter { - inner, + inner: MergingIter::new(vec_iter)?, max: max.map(Bytes::copy_from_slice), - - write_buf_ptr, mem_buf_ptr, - is_overed: false, - init_buf, + min: min.map(Bytes::copy_from_slice), + is_inited: false, }) } } @@ -219,13 +195,12 @@ unsafe impl Send for TransactionIter<'_> {} pub struct TransactionIter<'a> { inner: MergingIter<'a>, - - write_buf_ptr: Option, mem_buf_ptr: BufPtr, + min: Bound, max: Bound, - init_buf: Option, is_overed: bool, + is_inited: bool, } impl<'a> Iter<'a> for TransactionIter<'a> { @@ -236,11 +211,17 @@ impl<'a> Iter<'a> for TransactionIter<'a> { if self.is_overed { return Ok(None); } - let mut item = self.init_buf.take(); + let mut item = self.inner.try_next()?; - if item.is_none() { - item = self.inner.try_next()?; + if !self.is_inited { + item = match &self.min { + Bound::Included(key) => item.and_then(|data| (data.0 >= key).then_some(data)), + Bound::Excluded(key) => item.and_then(|data| (data.0 > key).then_some(data)), + Bound::Unbounded => item, + }; + self.is_inited = true } + let option = match &self.max { Bound::Included(key) => item.and_then(|data| (data.0 <= key).then_some(data)), Bound::Excluded(key) => item.and_then(|data| (data.0 < key).then_some(data)), @@ -255,21 +236,11 @@ impl<'a> Iter<'a> for TransactionIter<'a> { fn is_valid(&self) -> bool { self.inner.is_valid() } - - #[inline] - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { - self.init_buf = None; - - self.inner.seek(seek) - } } impl Drop for TransactionIter<'_> { #[inline] fn drop(&mut self) { - if let Some(buf_prt) = &self.write_buf_ptr { - unsafe { drop(Box::from_raw(buf_prt.0.as_ptr())) } - } unsafe { drop(Box::from_raw(self.mem_buf_ptr.0.as_ptr())) } } } @@ -279,6 +250,10 @@ struct BufIter<'a> { pos: usize, } +struct InnerIter<'a> { + iter: Range<'a, Bytes, Option>, +} + impl<'a> Iter<'a> for BufIter<'a> { type Item = KeyValue; @@ -293,20 +268,20 @@ impl<'a> Iter<'a> for BufIter<'a> { fn is_valid(&self) -> bool { self.pos < self.inner.len() } +} - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { - match seek { - Seek::First => self.pos = 0, - Seek::Last => self.pos = self.inner.len() - 1, - Seek::Backward(seek_key) => { - self.pos = self - .inner - .binary_search_by(|(key, _)| seek_key.cmp(key).reverse()) - .unwrap_or_else(|i| i); - } - }; +impl<'a> Iter<'a> for InnerIter<'a> { + type Item = KeyValue; + + fn try_next(&mut self) -> KernelResult> { + Ok(self + .iter + .next() + .map(|(key, value)| (key.clone(), value.clone()))) + } - self.try_next() + fn is_valid(&self) -> bool { + true } } diff --git a/src/kernel/lsm/table/btree_table/iter.rs b/src/kernel/lsm/table/btree_table/iter.rs index a71e0d8..26b11b5 100644 --- a/src/kernel/lsm/table/btree_table/iter.rs +++ b/src/kernel/lsm/table/btree_table/iter.rs @@ -1,4 +1,4 @@ -use crate::kernel::lsm::iterator::{Iter, Seek}; +use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::table::btree_table::BTreeTable; use bytes::Bytes; @@ -54,19 +54,13 @@ impl<'a> Iter<'a> for BTreeTableIter<'a> { fn is_valid(&self) -> bool { true } +} - fn seek(&mut self, seek: Seek<'_>) -> crate::kernel::KernelResult> { +impl<'a> SeekIter<'a> for BTreeTableIter<'a> { + fn seek(&mut self, seek: Seek<'_>) -> crate::kernel::KernelResult<()> { self._seek(seek); - if let Seek::Last = seek { - return Ok(self.table.inner.last_key_value().map(item_clone)); - } - - if let Some(iter) = self.inner.as_mut() { - Ok(iter.next().map(item_clone)) - } else { - Ok(None) - } + Ok(()) } } @@ -101,11 +95,14 @@ mod tests { assert_eq!(iter.try_next()?, None); - assert_eq!(iter.seek(Seek::First)?, Some(vec[0].clone())); + iter.seek(Seek::First)?; + assert_eq!(iter.try_next()?, Some(vec[0].clone())); - assert_eq!(iter.seek(Seek::Backward(&[b'3']))?, Some(vec[2].clone())); + iter.seek(Seek::Backward(&[b'3']))?; + assert_eq!(iter.try_next()?, Some(vec[2].clone())); - assert_eq!(iter.seek(Seek::Last)?, Some(vec[5].clone())); + iter.seek(Seek::Last)?; + assert_eq!(iter.try_next()?, None); Ok(()) } diff --git a/src/kernel/lsm/table/btree_table/mod.rs b/src/kernel/lsm/table/btree_table/mod.rs index 73c966c..c79b2e3 100644 --- a/src/kernel/lsm/table/btree_table/mod.rs +++ b/src/kernel/lsm/table/btree_table/mod.rs @@ -1,6 +1,6 @@ pub(crate) mod iter; -use crate::kernel::lsm::iterator::Iter; +use crate::kernel::lsm::iterator::SeekIter; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::table::btree_table::iter::BTreeTableIter; use crate::kernel::lsm::table::Table; @@ -55,7 +55,8 @@ impl Table for BTreeTable { #[allow(clippy::todo)] fn iter<'a>( &'a self, - ) -> crate::kernel::KernelResult + 'a + Send + Sync>> { + ) -> crate::kernel::KernelResult + 'a + Send + Sync>> + { Ok(Box::new(BTreeTableIter::new(self))) } } diff --git a/src/kernel/lsm/table/mod.rs b/src/kernel/lsm/table/mod.rs index 231af77..959a01e 100644 --- a/src/kernel/lsm/table/mod.rs +++ b/src/kernel/lsm/table/mod.rs @@ -1,4 +1,4 @@ -use crate::kernel::lsm::iterator::Iter; +use crate::kernel::lsm::iterator::SeekIter; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::table::meta::TableMeta; use crate::kernel::KernelResult; @@ -29,7 +29,9 @@ pub(crate) trait Table: Sync + Send { fn level(&self) -> usize; - fn iter<'a>(&'a self) -> KernelResult + 'a + Sync + Send>>; + fn iter<'a>( + &'a self, + ) -> KernelResult + 'a + Sync + Send>>; } /// 通过一组SSTable收集对应的Gen diff --git a/src/kernel/lsm/table/ss_table/block_iter.rs b/src/kernel/lsm/table/ss_table/block_iter.rs index 577318a..9ab7e55 100644 --- a/src/kernel/lsm/table/ss_table/block_iter.rs +++ b/src/kernel/lsm/table/ss_table/block_iter.rs @@ -1,4 +1,4 @@ -use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek}; +use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek, SeekIter}; use crate::kernel::lsm::table::ss_table::block::{Block, BlockItem, Entry}; use crate::kernel::KernelResult; use bytes::Bytes; @@ -41,21 +41,23 @@ where (item_key, item.clone()) } - fn offset_move(&mut self, offset: usize) -> Option<(Bytes, T)> { + fn offset_move(&mut self, offset: usize, is_seek: bool) -> Option<(Bytes, T)> { let block = self.block; let restart_interval = block.restart_interval(); let old_offset = self.offset; self.offset = offset; - (offset > 0).then(|| { - let real_offset = offset - 1; - if old_offset - 1 / restart_interval != real_offset / restart_interval { - self.buf_shared_key = - block.shared_key_prefix(real_offset, block.restart_shared_len(real_offset)); - } - self.item() - }) + (offset > 0 && offset < self.entry_len + 1) + .then(|| { + let real_offset = offset - 1; + if old_offset - 1 / restart_interval != real_offset / restart_interval { + self.buf_shared_key = + block.shared_key_prefix(real_offset, block.restart_shared_len(real_offset)); + } + (!is_seek).then(|| self.item()) + }) + .flatten() } } @@ -64,8 +66,8 @@ where V: Sync + Send + BlockItem, { fn try_prev(&mut self) -> KernelResult> { - Ok((self.is_valid() || self.offset == self.entry_len) - .then(|| self.offset_move(self.offset - 1)) + Ok((self.is_valid() || self.offset == self.entry_len + 1) + .then(|| self.offset_move(self.offset - 1, false)) .flatten()) } } @@ -78,30 +80,37 @@ where fn try_next(&mut self) -> KernelResult> { Ok((self.is_valid() || self.offset == 0) - .then(|| self.offset_move(self.offset + 1)) + .then(|| self.offset_move(self.offset + 1, false)) .flatten()) } fn is_valid(&self) -> bool { - self.offset > 0 && self.offset < self.entry_len + self.offset > 0 && self.offset <= self.entry_len } +} - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { - Ok(match seek { +impl<'a, V> SeekIter<'a> for BlockIter<'a, V> +where + V: Sync + Send + BlockItem, +{ + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> { + match seek { Seek::First => Some(0), - Seek::Last => Some(self.entry_len - 1), + Seek::Last => Some(self.entry_len + 1), Seek::Backward(key) => match self.block.binary_search(key) { Ok(index) => Some(index), Err(index) => (index < self.entry_len).then_some(index), }, } - .and_then(|index| self.offset_move(index + 1))) + .and_then(|index| self.offset_move(index, true)); + + Ok(()) } } #[cfg(test)] mod tests { - use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek}; + use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek, SeekIter}; use crate::kernel::lsm::table::ss_table::block::{Block, Value, DEFAULT_DATA_RESTART_INTERVAL}; use crate::kernel::lsm::table::ss_table::block_iter::BlockIter; use crate::kernel::KernelResult; @@ -145,6 +154,11 @@ mod tests { assert_eq!(iterator.try_next()?, None); + assert_eq!( + iterator.try_prev()?, + Some((Bytes::from(vec![b'4']), Value::from(None))) + ); + assert_eq!( iterator.try_prev()?, Some(( @@ -160,26 +174,27 @@ mod tests { assert_eq!(iterator.try_prev()?, None); + iterator.seek(Seek::First)?; assert_eq!( - iterator.seek(Seek::First)?, + iterator.try_next()?, Some((Bytes::from(vec![b'1']), Value::from(None))) ); - assert_eq!( - iterator.seek(Seek::Last)?, - Some((Bytes::from(vec![b'4']), Value::from(None))) - ); + iterator.seek(Seek::Last)?; + assert_eq!(iterator.try_next()?, None); + iterator.seek(Seek::Backward(&[b'2']))?; assert_eq!( - iterator.seek(Seek::Backward(&[b'2']))?, + iterator.try_next()?, Some(( Bytes::from(vec![b'2']), Value::from(Some(Bytes::from(vec![b'0']))) )) ); + iterator.seek(Seek::Backward(&[b'3']))?; assert_eq!( - iterator.seek(Seek::Backward(&[b'3']))?, + iterator.try_next()?, Some((Bytes::from(vec![b'4']), Value::from(None))) ); diff --git a/src/kernel/lsm/table/ss_table/iter.rs b/src/kernel/lsm/table/ss_table/iter.rs index aa80bc2..a2e99a1 100644 --- a/src/kernel/lsm/table/ss_table/iter.rs +++ b/src/kernel/lsm/table/ss_table/iter.rs @@ -1,4 +1,4 @@ -use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek}; +use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::table::ss_table::block::{BlockType, Index, Value}; use crate::kernel::lsm::table::ss_table::block_iter::BlockIter; @@ -44,12 +44,11 @@ impl<'a> SSTableIter<'a> { Ok(BlockIter::new(block)) } - fn data_iter_seek(&mut self, seek: Seek<'_>, index: Index) -> KernelResult> { + fn data_iter_seek(&mut self, seek: Seek<'_>, index: Index) -> KernelResult<()> { self.data_iter = Self::data_iter_init(self.ss_table, index)?; - Ok(self - .data_iter - .seek(seek)? - .map(|(key, value)| (key, value.bytes))) + self.data_iter.seek(seek)?; + + Ok(()) } } @@ -58,7 +57,12 @@ impl<'a> ForwardIter<'a> for SSTableIter<'a> { match self.data_iter.try_prev()? { None => { if let Some((_, index)) = self.index_iter.try_prev()? { - self.data_iter_seek(Seek::Last, index) + self.data_iter_seek(Seek::Last, index)?; + + Ok(self + .data_iter + .try_prev()? + .map(|(key, value)| (key, value.bytes))) } else { Ok(None) } @@ -75,7 +79,12 @@ impl<'a> Iter<'a> for SSTableIter<'a> { match self.data_iter.try_next()? { None => { if let Some((_, index)) = self.index_iter.try_next()? { - self.data_iter_seek(Seek::First, index) + self.data_iter_seek(Seek::First, index)?; + + Ok(self + .data_iter + .try_next()? + .map(|(key, value)| (key, value.bytes))) } else { Ok(None) } @@ -87,20 +96,27 @@ impl<'a> Iter<'a> for SSTableIter<'a> { fn is_valid(&self) -> bool { self.data_iter.is_valid() } +} - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { - if let Some((_, index)) = self.index_iter.seek(seek)? { - self.data_iter_seek(seek, index) - } else { - Ok(None) +impl<'a> SeekIter<'a> for SSTableIter<'a> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> { + self.index_iter.seek(seek)?; + + if let Some((_, index)) = self.index_iter.try_next()? { + self.data_iter_seek(seek, index)?; + } + if matches!(seek, Seek::Last) { + self.data_iter.seek(Seek::Last)?; } + + Ok(()) } } #[cfg(test)] mod tests { use crate::kernel::io::{FileExtension, IoFactory, IoType}; - use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek}; + use crate::kernel::lsm::iterator::{ForwardIter, Iter, Seek, SeekIter}; use crate::kernel::lsm::storage::Config; use crate::kernel::lsm::table::ss_table::iter::SSTableIter; use crate::kernel::lsm::table::ss_table::SSTable; @@ -163,14 +179,14 @@ mod tests { assert_eq!(iterator.try_prev()?.unwrap(), vec_data[i]); } - assert_eq!( - iterator.seek(Seek::Backward(&vec_data[114].0))?.unwrap(), - vec_data[114] - ); + iterator.seek(Seek::Backward(&vec_data[114].0))?; + assert_eq!(iterator.try_next()?.unwrap(), vec_data[114]); - assert_eq!(iterator.seek(Seek::First)?.unwrap(), vec_data[0]); + iterator.seek(Seek::First)?; + assert_eq!(iterator.try_next()?.unwrap(), vec_data[0]); - assert_eq!(iterator.seek(Seek::Last)?.unwrap(), vec_data[times - 1]); + iterator.seek(Seek::Last)?; + assert_eq!(iterator.try_next()?, None); Ok(()) } diff --git a/src/kernel/lsm/table/ss_table/mod.rs b/src/kernel/lsm/table/ss_table/mod.rs index c53b22a..668ed13 100644 --- a/src/kernel/lsm/table/ss_table/mod.rs +++ b/src/kernel/lsm/table/ss_table/mod.rs @@ -1,5 +1,5 @@ use crate::kernel::io::{IoFactory, IoReader, IoType}; -use crate::kernel::lsm::iterator::Iter; +use crate::kernel::lsm::iterator::SeekIter; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::storage::Config; use crate::kernel::lsm::table::ss_table::block::{ @@ -86,7 +86,7 @@ impl SSTable { footer.to_raw(&mut bytes)?; let mut writer = io_factory.writer(gen, io_type)?; - writer.write_all(&mut bytes)?; + writer.write_all(&bytes)?; writer.flush()?; info!("[SsTable: {}][create][MetaBlock]: {:?}", gen, meta); @@ -224,7 +224,9 @@ impl Table for SSTable { self.footer.level as usize } - fn iter<'a>(&'a self) -> KernelResult + 'a + Send + Sync>> { + fn iter<'a>( + &'a self, + ) -> KernelResult + 'a + Send + Sync>> { Ok(SSTableIter::new(self).map(Box::new)?) } } diff --git a/src/kernel/lsm/version/iter.rs b/src/kernel/lsm/version/iter.rs index 7f01dd0..9667901 100644 --- a/src/kernel/lsm/version/iter.rs +++ b/src/kernel/lsm/version/iter.rs @@ -1,6 +1,6 @@ use crate::kernel::lsm::iterator::level_iter::LevelIter; -use crate::kernel::lsm::iterator::merging_iter::MergingIter; -use crate::kernel::lsm::iterator::{Iter, Seek}; +use crate::kernel::lsm::iterator::merging_iter::SeekMergingIter; +use crate::kernel::lsm::iterator::{Iter, Seek, SeekIter}; use crate::kernel::lsm::mem_table::KeyValue; use crate::kernel::lsm::version::Version; use crate::kernel::lsm::MAX_LEVEL; @@ -8,7 +8,7 @@ use crate::kernel::KernelResult; /// Version键值对迭代器 pub struct VersionIter<'a> { - merge_iter: MergingIter<'a>, + merge_iter: SeekMergingIter<'a>, } impl<'a> VersionIter<'a> { @@ -17,13 +17,13 @@ impl<'a> VersionIter<'a> { Self::merging_with_version(version, &mut vec_iter)?; Ok(Self { - merge_iter: MergingIter::new(vec_iter)?, + merge_iter: SeekMergingIter::new(vec_iter)?, }) } pub(crate) fn merging_with_version( version: &'a Version, - iter_vec: &mut Vec + 'a + Send + Sync>>, + iter_vec: &mut Vec + 'a + Send + Sync>>, ) -> KernelResult<()> { for table in version.tables_by_level_0() { iter_vec.push(table.iter()?); @@ -49,8 +49,10 @@ impl<'a> Iter<'a> for VersionIter<'a> { fn is_valid(&self) -> bool { self.merge_iter.is_valid() } +} - fn seek(&mut self, seek: Seek<'_>) -> KernelResult> { +impl<'a> SeekIter<'a> for VersionIter<'a> { + fn seek(&mut self, seek: Seek<'_>) -> KernelResult<()> { self.merge_iter.seek(seek) } }