diff --git a/Cargo.toml b/Cargo.toml index 1cf6895..9aa010b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kip_db" -version = "0.1.2-alpha.21" +version = "0.1.2-alpha.22" edition = "2021" authors = ["Kould "] description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库" @@ -60,6 +60,7 @@ parking_lot = "0.12.1" crc32fast = "1.3.2" skiplist = "0.5.1" fslock = "0.2.1" +once_cell = "1.19.0" # grpc tonic = { version = "0.10.2", optional = true } prost = { version = "0.12", optional = true } diff --git a/src/kernel/lsm/mem_table.rs b/src/kernel/lsm/mem_table.rs index 5aec558..e7db2c6 100644 --- a/src/kernel/lsm/mem_table.rs +++ b/src/kernel/lsm/mem_table.rs @@ -355,20 +355,65 @@ impl MemTable { option_seq: Option, ) -> Vec { let inner = self.inner.lock(); + let de_dupe_merge_sort_fn = |mem: Vec, immut_mem: Vec| { + assert!(mem.is_sorted_by_key(|(k, _)| k)); + assert!(mem.iter().all_unique()); + assert!(immut_mem.is_sorted_by_key(|(k, _)| k)); + assert!(immut_mem.iter().all_unique()); + + let mut merged = Vec::with_capacity(mem.len() + immut_mem.len()); + let (mut mem_iter, mut immut_mem_iter) = (mem.into_iter(), immut_mem.into_iter()); + let (mut mem_current, mut immut_mem_current) = (mem_iter.next(), immut_mem_iter.next()); + + while let (Some(mem_item), Some(immut_mem_item)) = + (mem_current.take(), immut_mem_current.take()) + { + if mem_item.0 < immut_mem_item.0 { + merged.push(mem_item); + immut_mem_current = Some(immut_mem_item); - inner - ._immut - .as_ref() - .map(|mem_map| Self::_range_scan(mem_map, min, max, option_seq)) - .unwrap_or_default() - .into_iter() - .chain(Self::_range_scan(&inner._mem, min, max, option_seq)) - .rev() - .unique_by(|(key, _)| key.clone()) - .collect_vec() + mem_current = mem_iter.next(); + if mem_current.is_none() { + break; + } + } else if mem_item.0 > immut_mem_item.0 { + merged.push(immut_mem_item); + mem_current = Some(mem_item); + + immut_mem_current = immut_mem_iter.next(); + if immut_mem_current.is_none() { + break; + } + } else { + merged.push(mem_item); + } + } + + if let Some(kv) = mem_current { + merged.push(kv) + } + if let Some(kv) = immut_mem_current { + merged.push(kv) + } + // one of the two is empty + mem_iter + .chain(immut_mem_iter) + .for_each(|kv| merged.push(kv)); + + assert!(merged.is_sorted_by_key(|(k, _)| k)); + assert!(merged.iter().all_unique()); + merged + }; + let mut mem_scan = Self::_range_scan(&inner._mem, min, max, option_seq); + + if let Some(immut) = &inner._immut { + let immut_scan = Self::_range_scan(immut, min, max, option_seq); + + mem_scan = de_dupe_merge_sort_fn(mem_scan, immut_scan); + } + mem_scan } - /// Tips: 返回的数据为倒序 fn _range_scan( mem_map: &MemMap, min: Bound<&[u8]>, @@ -395,7 +440,7 @@ impl MemTable { let min_key = to_internal_key(&min, i64::MIN, i64::MAX); let max_key = to_internal_key(&max, i64::MAX, i64::MIN); - mem_map + let mut scan = mem_map .range(min_key.as_ref(), max_key.as_ref()) .rev() .filter(|(InternalKey { seq_id, .. }, _)| { @@ -403,7 +448,12 @@ impl MemTable { }) .unique_by(|(internal_key, _)| &internal_key.key) .map(|(key, value)| (key.key.clone(), value.clone())) - .collect_vec() + .collect_vec(); + scan.reverse(); + + assert!(scan.is_sorted_by_key(|(k, _)| k)); + assert!(scan.iter().all_unique()); + scan } } diff --git a/src/kernel/lsm/mvcc.rs b/src/kernel/lsm/mvcc.rs index d9c9457..2df43b7 100644 --- a/src/kernel/lsm/mvcc.rs +++ b/src/kernel/lsm/mvcc.rs @@ -3,27 +3,23 @@ use crate::kernel::lsm::iterator::merging_iter::MergingIter; use crate::kernel::lsm::iterator::{Iter, Seek}; use crate::kernel::lsm::mem_table::{KeyValue, MemTable}; use crate::kernel::lsm::query_and_compaction; -use crate::kernel::lsm::storage::{Sequence, StoreInner}; +use crate::kernel::lsm::storage::{KipStorage, Sequence, StoreInner}; use crate::kernel::lsm::version::iter::VersionIter; use crate::kernel::lsm::version::Version; use crate::kernel::KernelResult; use crate::KernelError; use bytes::Bytes; +use core::slice::SlicePattern; use itertools::Itertools; +use once_cell::sync::OnceCell; use skiplist::SkipMap; use std::collections::Bound; -use std::iter::Map; use std::ptr::NonNull; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::Sender; -type MapIter<'a> = Map< - skiplist::skipmap::Iter<'a, Bytes, Option>, - fn((&Bytes, &Option)) -> KeyValue, ->; - unsafe impl Send for BufPtr {} unsafe impl Sync for BufPtr {} @@ -34,16 +30,33 @@ pub enum CheckType { } pub struct Transaction { - pub(crate) store_inner: Arc, - pub(crate) compactor_tx: Sender, + store_inner: Arc, + compactor_tx: Sender, + + version: Arc, + seq_id: i64, + check_type: CheckType, - pub(crate) version: Arc, - pub(crate) write_buf: Option>>, - pub(crate) seq_id: i64, - pub(crate) check_type: CheckType, + write_buf: Option>>, + mem_buf: OnceCell, } impl Transaction { + pub(crate) async fn new(storage: &KipStorage, check_type: CheckType) -> Self { + let _ = storage.mem_table().tx_count.fetch_add(1, Ordering::Release); + + Transaction { + store_inner: Arc::clone(&storage.inner), + version: storage.current_version().await, + compactor_tx: storage.compactor_tx.clone(), + + seq_id: Sequence::create(), + write_buf: None, + check_type, + mem_buf: OnceCell::new(), + } + } + fn write_buf_or_init(&mut self) -> &mut SkipMap> { self.write_buf.get_or_insert_with(SkipMap::new) } @@ -115,34 +128,6 @@ impl Transaction { Ok(()) } - #[inline] - pub fn mem_range(&self, min: Bound<&[u8]>, max: Bound<&[u8]>) -> Vec { - let mem_table_range = self.mem_table().range_scan(min, max, Some(self.seq_id)); - - if let Some(buf_iter) = self._mem_range(min, max) { - buf_iter - .chain(mem_table_range) - .unique_by(|(key, _)| key.clone()) - .sorted_by_key(|(key, _)| key.clone()) - .collect_vec() - } else { - mem_table_range - } - } - - fn _mem_range(&self, min: Bound<&[u8]>, max: Bound<&[u8]>) -> Option { - #[allow(clippy::option_map_or_none)] - self.write_buf.as_ref().map_or(None, |buf| { - Some( - buf.range( - min.map(Bytes::copy_from_slice).as_ref(), - max.map(Bytes::copy_from_slice).as_ref(), - ) - .map(|(key, value)| (key.clone(), value.clone())), - ) - }) - } - fn mem_table(&self) -> &MemTable { &self.store_inner.mem_table } @@ -158,50 +143,52 @@ impl Transaction { min: Bound<&[u8]>, max: Bound<&[u8]>, ) -> KernelResult { - let range_buf = self.mem_range(min, max); - let ptr = BufPtr(Box::leak(Box::new(range_buf)).into()); + let option_write_buf = self.write_buf.as_ref().map(|buf| { + buf.range( + min.map(Bytes::copy_from_slice).as_ref(), + max.map(Bytes::copy_from_slice).as_ref(), + ) + .map(|(key, value)| (key.clone(), value.clone())) + .collect_vec() + }); + + let mem_ptr = self.mem_buf.get_or_init(|| { + let kvs = self.mem_table().range_scan(min, max, Some(self.seq_id)); + + BufPtr(Box::leak(Box::new(kvs)).into()) + }); + let mut write_buf_ptr = None; + let mut vec_iter: Vec + 'a + Send + Sync>> = + Vec::with_capacity(3); + + if let Some(write_buf) = option_write_buf { + let buf_ptr = BufPtr(Box::leak(Box::new(write_buf)).into()); + let buf_iter = unsafe { + BufIter { + inner: buf_ptr.0.as_ref(), + pos: 0, + } + }; - let mem_iter = unsafe { + write_buf_ptr = Some(buf_ptr); + vec_iter.push(Box::new(buf_iter)); + } + vec_iter.push(Box::new(unsafe { BufIter { - inner: ptr.0.as_ref(), + inner: mem_ptr.0.as_ref(), pos: 0, } - }; - - let mut version_iter = VersionIter::new(&self.version)?; - let mut seek_buf = None; - - match min { - Bound::Included(key) => { - let ver_seek_option = version_iter.seek(Seek::Backward(key))?; - unsafe { - let op = |disk_option: Option<&KeyValue>, mem_option: Option<&KeyValue>| match ( - disk_option, - mem_option, - ) { - (Some(disk), Some(mem)) => disk.0 >= mem.0, - _ => false, - }; - - if !op(ver_seek_option.as_ref(), ptr.0.as_ref().first()) { - seek_buf = ver_seek_option; - } - } - } - Bound::Excluded(key) => { - let _ = version_iter.seek(Seek::Backward(key))?; - } - Bound::Unbounded => (), - } + })); + vec_iter.push(Box::new(VersionIter::new(&self.version)?)); - let vec_iter: Vec + 'a + Send + Sync>> = - vec![Box::new(mem_iter), Box::new(version_iter)]; + let inner = MergingIter::new(vec_iter)?; Ok(TransactionIter { - inner: MergingIter::new(vec_iter)?, + inner, + min: min.map(Bytes::copy_from_slice), max: max.map(Bytes::copy_from_slice), - ptr, - seek_buf, + write_buf_ptr, + is_seeked: false, }) } } @@ -210,6 +197,10 @@ impl Drop for Transaction { #[inline] fn drop(&mut self) { let _ = self.mem_table().tx_count.fetch_sub(1, Ordering::Release); + + if let Some(mem_ptr) = self.mem_buf.take() { + unsafe { drop(Box::from_raw(mem_ptr.0.as_ptr())) } + } } } @@ -219,9 +210,10 @@ unsafe impl Send for TransactionIter<'_> {} pub struct TransactionIter<'a> { inner: MergingIter<'a>, - ptr: BufPtr, + write_buf_ptr: Option, + min: Bound, max: Bound, - seek_buf: Option, + is_seeked: bool, } impl<'a> Iter<'a> for TransactionIter<'a> { @@ -229,8 +221,22 @@ impl<'a> Iter<'a> for TransactionIter<'a> { #[inline] fn try_next(&mut self) -> KernelResult> { - if let Some(item) = self.seek_buf.take() { - return Ok(Some(item)); + if !self.is_seeked { + self.is_seeked = true; + + match &self.min { + Bound::Included(key) => return self.inner.seek(Seek::Backward(key.as_slice())), + Bound::Excluded(key) => { + if let Some(kv) = self.inner.seek(Seek::Backward(key.as_slice()))? { + if kv.0 != key { + return Ok(Some(kv)); + } + } else { + return Ok(None); + } + } + Bound::Unbounded => (), + }; } let option = match &self.max { @@ -262,7 +268,9 @@ impl<'a> Iter<'a> for TransactionIter<'a> { impl Drop for TransactionIter<'_> { #[inline] fn drop(&mut self) { - unsafe { drop(Box::from_raw(self.ptr.0.as_ptr())) } + if let Some(buf_prt) = &self.write_buf_ptr { + unsafe { drop(Box::from_raw(buf_prt.0.as_ptr())) } + } } } diff --git a/src/kernel/lsm/storage.rs b/src/kernel/lsm/storage.rs index 31adc5a..94e8cd9 100644 --- a/src/kernel/lsm/storage.rs +++ b/src/kernel/lsm/storage.rs @@ -63,12 +63,12 @@ static GEN_BUF: AtomicI64 = AtomicI64::new(0); /// 基于LSM的KV Store存储内核 /// Leveled Compaction压缩算法 pub struct KipStorage { - inner: Arc, + pub(crate) inner: Arc, /// 多进程文件锁 /// 避免多进程进行数据读写 lock_file: LockFile, /// Compactor 通信器 - compactor_tx: Sender, + pub(crate) compactor_tx: Sender, } pub(crate) struct StoreInner { @@ -233,7 +233,7 @@ impl KipStorage { }) } - fn mem_table(&self) -> &MemTable { + pub(crate) fn mem_table(&self) -> &MemTable { &self.inner.mem_table } @@ -244,17 +244,7 @@ impl KipStorage { /// 创建事务 #[inline] pub async fn new_transaction(&self, check_type: CheckType) -> Transaction { - let _ = self.mem_table().tx_count.fetch_add(1, Ordering::Release); - - Transaction { - store_inner: Arc::clone(&self.inner), - version: self.current_version().await, - compactor_tx: self.compactor_tx.clone(), - - seq_id: Sequence::create(), - write_buf: None, - check_type, - } + Transaction::new(self, check_type).await } #[inline] diff --git a/src/lib.rs b/src/lib.rs index 5da3a4f..3005182 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![feature(cursor_remaining)] #![feature(slice_pattern)] #![feature(bound_map)] +#![feature(is_sorted)] extern crate core;