Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
refactor(mvcc): Implementing iterators within a transaction (#42)
Browse files Browse the repository at this point in the history
* refactor(mvcc): Implementing iterators within a transaction

* fix(mvcc): fix `BufIter` pos miss

* doc: version up

* doc: version up

* doc: version up

* feat(mvcc): impl `Sync` + `Send` for `TransactionIter`

* doc: version up

* style: open lru_cache.rs

* fix: ignore some test about SkipTable

* fix: `Compaction` when drop

* fix: `Transaction` blocking

* fix: `VersionLog` overwrites data after restarting

* fix: `test_writer`

* style: code format

* fix: `VersionIter` data overwriting
mistaken

* fix: `TransactionIter` seek duplicate value

* fix: `TransactionIter` seek data coverage

* style: code format

* feat: adjust the parameters of `Storage::set`

* pref: optimize the structure of write buf to avoid frequent copying of keys

* fix: test

* fix
  • Loading branch information
KKould authored Sep 10, 2023
1 parent 596a90c commit d790f06
Show file tree
Hide file tree
Showing 33 changed files with 666 additions and 642 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kip_db"
version = "0.1.2-alpha.0"
version = "0.1.2-alpha.15"
edition = "2021"
authors = ["Kould <[email protected]>"]
description = "轻量级、异步 基于LSM Leveled Compaction K-V数据库"
Expand Down
10 changes: 8 additions & 2 deletions examples/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ async fn main() -> Result<(), KernelError> {

println!("Set KeyValue after the transaction -> (key_1, value_1)");
kip_storage
.set(b"key_1", Bytes::copy_from_slice(b"value_1"))
.set(
Bytes::copy_from_slice(b"key_1"),
Bytes::copy_from_slice(b"value_1"),
)
.await?;

println!("Read key_1 on the transaction: {:?}", tx.get(b"key_1")?);

println!("Set KeyValue on the transaction -> (key_2, value_2)");
tx.set(b"key_2", Bytes::copy_from_slice(b"value_2"));
tx.set(
Bytes::copy_from_slice(b"key_2"),
Bytes::copy_from_slice(b"value_2"),
);

println!("Read key_2 on the transaction: {:?}", tx.get(b"key_2")?);

Expand Down
26 changes: 19 additions & 7 deletions examples/scan_read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use kip_db::kernel::lsm::iterator::Iter;
use kip_db::kernel::lsm::storage::{Config, KipStorage};
use kip_db::kernel::Storage;
use kip_db::KernelError;
Expand All @@ -13,24 +14,35 @@ async fn main() -> Result<(), KernelError> {

println!("Set KeyValue -> (key_1, value_1)");
kip_storage
.set(b"key_1", Bytes::copy_from_slice(b"value_1"))
.set(
Bytes::copy_from_slice(b"key_1"),
Bytes::copy_from_slice(b"value_1"),
)
.await?;
println!("Set KeyValue -> (key_2, value_2)");
kip_storage
.set(b"key_2", Bytes::copy_from_slice(b"value_2"))
.set(
Bytes::copy_from_slice(b"key_2"),
Bytes::copy_from_slice(b"value_2"),
)
.await?;
println!("Set KeyValue -> (key_3, value_3)");
kip_storage
.set(b"key_3", Bytes::copy_from_slice(b"value_3"))
.set(
Bytes::copy_from_slice(b"key_3"),
Bytes::copy_from_slice(b"value_3"),
)
.await?;

println!("New Transaction");
let tx = kip_storage.new_transaction().await;

println!(
"RangeScan without key_3 By Transaction: {:?}",
tx.range_scan(Bound::Unbounded, Bound::Excluded(b"key_3"))?
);
println!("Iter without key_3 By Transaction:");
let mut iter = tx.iter(Bound::Unbounded, Bound::Excluded(b"key_3"))?;

while let Some(item) = iter.try_next()? {
println!("Item: {:?}", item);
}

Ok(())
}
5 changes: 4 additions & 1 deletion examples/simple_crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ async fn main() -> Result<(), KernelError> {

println!("Set KeyValue -> (apple, banana)");
kip_storage
.set(b"apple", Bytes::copy_from_slice(b"banana"))
.set(
Bytes::copy_from_slice(b"apple"),
Bytes::copy_from_slice(b"banana"),
)
.await?;

println!(
Expand Down
13 changes: 8 additions & 5 deletions src/bench/kernel_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn bulk_load<T: Storage>(c: &mut Criterion) {
),
|b| {
b.to_async(&rt).iter(|| async {
db.set(&bytes(key_len), Bytes::from(bytes(val_len)))
db.set(Bytes::from(bytes(key_len)), Bytes::from(bytes(val_len)))
.await
.unwrap();
})
Expand Down Expand Up @@ -108,9 +108,12 @@ fn monotonic_crud<T: Storage>(c: &mut Criterion) {
c.bench_function(&format!("Store: {}, monotonic inserts", T::name()), |b| {
let count = AtomicU32::new(0_u32);
b.iter(|| async {
db.set(&count.fetch_add(1, Relaxed).to_be_bytes(), Bytes::new())
.await
.unwrap();
db.set(
Bytes::from(count.fetch_add(1, Relaxed).to_be_bytes()),
Bytes::new(),
)
.await
.unwrap();
})
});

Expand Down Expand Up @@ -147,7 +150,7 @@ fn random_crud<T: Storage>(c: &mut Criterion) {

c.bench_function(&format!("Store: {}, random inserts", T::name()), |b| {
b.iter(|| async {
db.set(&random(SIZE).to_be_bytes(), Bytes::new())
db.set(Bytes::from(random(SIZE).to_be_bytes()), Bytes::new())
.await
.unwrap();
})
Expand Down
6 changes: 6 additions & 0 deletions src/kernel/io/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ impl Write for BufIoWriter {
}
}

impl Seek for BufIoWriter {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.writer.seek(pos)
}
}

impl IoWriter for BufIoWriter {
fn current_pos(&mut self) -> Result<u64> {
Ok(self.writer.pos)
Expand Down
6 changes: 6 additions & 0 deletions src/kernel/io/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl Write for DirectIoWriter {
}
}

impl Seek for DirectIoWriter {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.fs.seek(pos)
}
}

impl IoWriter for DirectIoWriter {
fn current_pos(&mut self) -> Result<u64> {
Ok(self.fs.stream_position()?)
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ pub trait IoReader: Send + Sync + 'static + Read + Seek {
fn get_type(&self) -> IoType;
}

pub trait IoWriter: Send + Sync + 'static + Write {
pub trait IoWriter: Send + Sync + 'static + Write + Seek {
fn current_pos(&mut self) -> Result<u64>;
}
128 changes: 68 additions & 60 deletions src/kernel/lsm/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl Compactor {
{
let mut iter = table.iter()?;
let mut vec_cmd = Vec::with_capacity(table.len());
while let Some(item) = iter.next_err()? {
while let Some(item) = iter.try_next()? {
if fn_is_filter(&item.0) {
vec_cmd.push(item)
}
Expand Down Expand Up @@ -321,81 +321,89 @@ mod tests {
use std::time::Instant;
use tempfile::TempDir;

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_lsm_major_compactor() -> Result<()> {
#[test]
fn test_lsm_major_compactor() -> Result<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");

let times = 30_000;
let value = b"Stray birds of summer come to my window to sing and fly away.
tokio_test::block_on(async move {
let times = 30_000;

let value = b"Stray birds of summer come to my window to sing and fly away.
And yellow leaves of autumn, which have no songs, flutter and fall
there with a sign.";

// Tips: 此处由于倍率为1且阈值固定为4,因此容易导致Level 1高出阈值时候导致归并转移到Level 2时,
// 重复触发阈值,导致迁移到Level6之中,此情况是理想之中的
// 普通场景下每个Level之间的阈值数量是有倍数递增的,因此除了极限情况以外,不会发送这种逐级转移的现象
let config = Config::new(temp_dir.path().to_str().unwrap())
.major_threshold_with_sst_size(4)
.level_sst_magnification(1)
.minor_trigger_with_threshold(TriggerType::Count, 1000);
let kv_store = KipStorage::open_with_config(config).await?;
let mut vec_kv = Vec::new();

for i in 0..times {
let vec_u8 = bincode::serialize(&i)?;
vec_kv.push((
Bytes::from(vec_u8.clone()),
Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()),
));
}
// Tips: 此处由于倍率为1且阈值固定为4,因此容易导致Level 1高出阈值时候导致归并转移到Level 2时,
// 重复触发阈值,导致迁移到Level6之中,此情况是理想之中的
// 普通场景下每个Level之间的阈值数量是有倍数递增的,因此除了极限情况以外,不会发送这种逐级转移的现象
let config = Config::new(temp_dir.path().to_str().unwrap())
.major_threshold_with_sst_size(4)
.level_sst_magnification(1)
.minor_trigger_with_threshold(TriggerType::Count, 1000);
let kv_store = KipStorage::open_with_config(config).await?;
let mut vec_kv = Vec::new();

for i in 0..times {
let vec_u8 = bincode::serialize(&i)?;
vec_kv.push((
Bytes::from(vec_u8.clone()),
Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()),
));
}

let start = Instant::now();
let start = Instant::now();

assert_eq!(times % 1000, 0);
assert_eq!(times % 1000, 0);

for i in 0..times / 1000 {
for j in 0..1000 {
kv_store
.set(&vec_kv[i * 1000 + j].0, vec_kv[i * 1000 + j].1.clone())
.await?;
for i in 0..times / 1000 {
for j in 0..1000 {
kv_store
.set(
vec_kv[i * 1000 + j].0.clone(),
vec_kv[i * 1000 + j].1.clone(),
)
.await?;
}
kv_store.flush().await?;
}
kv_store.flush().await?;
}
println!("[set_for][Time: {:?}]", start.elapsed());

let version = kv_store.current_version().await;
let level_slice = &version.level_slice;
println!("MajorCompaction Test: {:#?}", level_slice);
assert!(!level_slice[0].is_empty());
assert!(
!level_slice[1].is_empty()
|| !level_slice[2].is_empty()
|| !level_slice[3].is_empty()
|| !level_slice[4].is_empty()
|| !level_slice[5].is_empty()
|| !level_slice[6].is_empty()
);
println!("[set_for][Time: {:?}]", start.elapsed());

let version = kv_store.current_version().await;
let level_slice = &version.level_slice;
println!("MajorCompaction Test: {:#?}", level_slice);
assert!(!level_slice[0].is_empty());
assert!(
!level_slice[1].is_empty()
|| !level_slice[2].is_empty()
|| !level_slice[3].is_empty()
|| !level_slice[4].is_empty()
|| !level_slice[5].is_empty()
|| !level_slice[6].is_empty()
);

for (level, slice) in level_slice.into_iter().enumerate() {
if !slice.is_empty() && level != LEVEL_0 {
let mut tmp_scope: Option<&Scope> = None;
for (level, slice) in level_slice.into_iter().enumerate() {
if !slice.is_empty() && level != LEVEL_0 {
let mut tmp_scope: Option<&Scope> = None;

for scope in slice {
if let Some(last_scope) = tmp_scope {
assert!(last_scope.end < scope.start);
for scope in slice {
if let Some(last_scope) = tmp_scope {
assert!(last_scope.end < scope.start);
}
tmp_scope = Some(scope);
}
tmp_scope = Some(scope);
}
}
}

let start = Instant::now();
for i in 0..times {
assert_eq!(kv_store.get(&vec_kv[i].0).await?, Some(vec_kv[i].1.clone()));
}
println!("[get_for][Time: {:?}]", start.elapsed());
kv_store.flush().await?;
assert_eq!(kv_store.len().await?, times);

Ok(())
let start = Instant::now();
for i in 0..times {
assert_eq!(kv_store.get(&vec_kv[i].0).await?, Some(vec_kv[i].1.clone()));
}
println!("[get_for][Time: {:?}]", start.elapsed());
kv_store.flush().await?;

Ok(())
})
}

#[test]
Expand Down
Loading

0 comments on commit d790f06

Please sign in to comment.