Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
feat: support check_key_conflict for mvcc
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 30, 2023
1 parent a8b17ff commit 4cf9071
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 67 deletions.
3 changes: 2 additions & 1 deletion examples/mvcc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use kip_db::kernel::lsm::mvcc::CheckType;
use kip_db::kernel::lsm::storage::{Config, KipStorage};
use kip_db::kernel::Storage;
use kip_db::KernelError;
Expand All @@ -11,7 +12,7 @@ async fn main() -> Result<(), KernelError> {
let kip_storage = KipStorage::open_with_config(config).await?;

println!("New Transaction");
let mut tx = kip_storage.new_transaction().await;
let mut tx = kip_storage.new_transaction(CheckType::None).await;

println!("Set KeyValue after the transaction -> (key_1, value_1)");
kip_storage
Expand Down
3 changes: 2 additions & 1 deletion examples/scan_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::Bytes;
use kip_db::kernel::lsm::iterator::Iter;
use kip_db::kernel::lsm::mvcc::CheckType;
use kip_db::kernel::lsm::storage::{Config, KipStorage};
use kip_db::kernel::Storage;
use kip_db::KernelError;
Expand Down Expand Up @@ -35,7 +36,7 @@ async fn main() -> Result<(), KernelError> {
.await?;

println!("New Transaction");
let tx = kip_storage.new_transaction().await;
let tx = kip_storage.new_transaction(CheckType::None).await;

println!("Iter without key_3 By Transaction:");
let mut iter = tx.iter(Bound::Unbounded, Bound::Excluded(b"key_3"))?;
Expand Down
5 changes: 4 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,17 @@ pub enum KernelError {
#[error("Process already exists")]
ProcessExists,

#[error("channel is closed")]
#[error("Channel is closed")]
ChannelClose,

#[error("{0}")]
NotSupport(&'static str),

#[error("The number of caches cannot be divisible by the number of shards")]
ShardingNotAlign,

#[error("Same write in different transactions")]
RepeatedWrite,
}

#[derive(Error, Debug)]
Expand Down
44 changes: 44 additions & 0 deletions src/kernel/lsm/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,24 @@ impl MemTable {
})
}

pub(crate) fn check_key_conflict(&self, kvs: &[KeyValue], seq_id: i64) -> bool {
let inner = self.inner.lock();

for (key, _) in kvs {
let internal_key = InternalKey::new_with_seq(key.clone(), seq_id);

if let Some(true) = inner
._mem
.lower_bound(Bound::Excluded(&internal_key))
.map(|(lower_key, _)| lower_key.key == key)
{
return true;
}
}

false
}

/// 插入并判断是否溢出
///
/// 插入时不会去除重复键值,而是进行追加
Expand Down Expand Up @@ -506,6 +524,32 @@ mod tests {
Ok(())
}

#[test]
fn test_mem_table_check_key_conflict() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");

let mem_table = MemTable::new(&Config::new(temp_dir.path()))?;

let key1 = vec![b'k', b'1'];
let bytes_key1 = Bytes::copy_from_slice(&key1);
let kv_1 = (bytes_key1.clone(), Some(bytes_key1.clone()));

let key2 = vec![b'k', b'2'];
let bytes_key2 = Bytes::copy_from_slice(&key2);
let kv_2 = (bytes_key2.clone(), Some(bytes_key2.clone()));

let _ = mem_table.insert_data_with_seq(kv_1.clone(), 0)?;
let _ = mem_table.insert_data_with_seq(kv_1.clone(), 1)?;
let _ = mem_table.insert_data_with_seq(kv_1.clone(), 2)?;
let _ = mem_table.insert_data_with_seq(kv_2.clone(), 3)?;

assert!(mem_table.check_key_conflict(&vec![kv_1.clone()], 1));

assert!(!mem_table.check_key_conflict(&vec![kv_1.clone()], 2));

Ok(())
}

#[test]
fn test_mem_table_range_scan() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
Expand Down
155 changes: 97 additions & 58 deletions src/kernel/lsm/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@ unsafe impl Sync for BufPtr {}

struct BufPtr(NonNull<Vec<KeyValue>>);

pub enum CheckType {
None,
Optimistic,
}

pub struct Transaction {
pub(crate) store_inner: Arc<StoreInner>,
pub(crate) compactor_tx: Sender<CompactTask>,

pub(crate) version: Arc<Version>,
pub(crate) write_buf: Option<SkipMap<Bytes, Option<Bytes>>>,
pub(crate) seq_id: i64,
pub(crate) check_type: CheckType,
}

impl Transaction {
Expand Down Expand Up @@ -80,9 +86,19 @@ impl Transaction {
#[inline]
pub async fn commit(mut self) -> KernelResult<()> {
if let Some(buf) = self.write_buf.take() {
let batch_data = buf
.into_iter()
.collect_vec();
let batch_data = buf.into_iter().collect_vec();

match self.check_type {
CheckType::None => (),
CheckType::Optimistic => {
if self
.mem_table()
.check_key_conflict(&batch_data, self.seq_id)
{
return Err(KernelError::RepeatedWrite);
}
}
}

let is_exceeds = self
.store_inner
Expand Down Expand Up @@ -292,90 +308,113 @@ impl<'a> Iter<'a> for BufIter<'a> {
#[cfg(test)]
mod tests {
use crate::kernel::lsm::iterator::Iter;
use crate::kernel::lsm::mvcc::CheckType;
use crate::kernel::lsm::storage::{Config, KipStorage};
use crate::kernel::{KernelResult, Storage};
use crate::KernelError;
use bincode::Options;
use bytes::Bytes;
use itertools::Itertools;
use std::collections::Bound;
use tempfile::TempDir;

#[test]
fn test_transaction() -> KernelResult<()> {
#[tokio::test]
async fn test_transaction() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");

tokio_test::block_on(async move {
let times = 5000;
let times = 5000;

let value = b"0";
let value = b"0";

let config = Config::new(temp_dir.into_path()).major_threshold_with_sst_size(4);
let kv_store = KipStorage::open_with_config(config).await?;
let config = Config::new(temp_dir.into_path()).major_threshold_with_sst_size(4);
let kv_store = KipStorage::open_with_config(config).await?;

let mut vec_kv = Vec::new();
let mut vec_kv = Vec::new();

for i in 0..times {
let vec_u8 = bincode::options().with_big_endian().serialize(&i)?;
vec_kv.push((
Bytes::from(vec_u8.clone()),
Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()),
));
}
for i in 0..times {
let vec_u8 = bincode::options().with_big_endian().serialize(&i)?;
vec_kv.push((
Bytes::from(vec_u8.clone()),
Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()),
));
}

// 模拟数据分布在MemTable以及SSTable中
for kv in vec_kv.iter().take(50) {
kv_store.set(kv.0.clone(), kv.1.clone()).await?;
}
// 模拟数据分布在MemTable以及SSTable中
for kv in vec_kv.iter().take(50) {
kv_store.set(kv.0.clone(), kv.1.clone()).await?;
}

kv_store.flush().await?;
kv_store.flush().await?;

for kv in vec_kv.iter().take(100).skip(50) {
kv_store.set(kv.0.clone(), kv.1.clone()).await?;
}
for kv in vec_kv.iter().take(100).skip(50) {
kv_store.set(kv.0.clone(), kv.1.clone()).await?;
}

let mut tx_1 = kv_store.new_transaction().await;
let mut tx_1 = kv_store.new_transaction(CheckType::None).await;

for kv in vec_kv.iter().take(times).skip(100) {
tx_1.set(kv.0.clone(), kv.1.clone());
}
for kv in vec_kv.iter().take(times).skip(100) {
tx_1.set(kv.0.clone(), kv.1.clone());
}

tx_1.remove(&vec_kv[times - 1].0)?;
tx_1.remove(&vec_kv[times - 1].0)?;

// 事务在提交前事务可以读取到自身以及Store已写入的数据
for kv in vec_kv.iter().take(times - 1) {
assert_eq!(tx_1.get(&kv.0)?, Some(kv.1.clone()));
}
// 事务在提交前事务可以读取到自身以及Store已写入的数据
for kv in vec_kv.iter().take(times - 1) {
assert_eq!(tx_1.get(&kv.0)?, Some(kv.1.clone()));
}

assert_eq!(tx_1.get(&vec_kv[times - 1].0)?, None);
assert_eq!(tx_1.get(&vec_kv[times - 1].0)?, None);

// 事务在提交前Store不应该读取到事务中的数据
for kv in vec_kv.iter().take(times).skip(100) {
assert_eq!(kv_store.get(&kv.0).await?, None);
}
// 事务在提交前Store不应该读取到事务中的数据
for kv in vec_kv.iter().take(times).skip(100) {
assert_eq!(kv_store.get(&kv.0).await?, None);
}

let vec_test = vec_kv[25..]
.iter()
.cloned()
.map(|(key, value)| (key, Some(value)))
.collect_vec();
let vec_test = vec_kv[25..]
.iter()
.cloned()
.map(|(key, value)| (key, Some(value)))
.collect_vec();

let mut iter = tx_1.iter(Bound::Included(&vec_kv[25].0), Bound::Unbounded)?;
let mut iter = tx_1.iter(Bound::Included(&vec_kv[25].0), Bound::Unbounded)?;

// -1是因为最后一个元素在之前tx中删除了,因此为None
for kv in vec_test.iter().take(vec_test.len() - 1) {
// 元素太多,因此这里就单个对比,否则会导致报错时日志过多
assert_eq!(iter.try_next()?.unwrap(), kv.clone());
}
// -1是因为最后一个元素在之前tx中删除了,因此为None
for kv in vec_test.iter().take(vec_test.len() - 1) {
// 元素太多,因此这里就单个对比,否则会导致报错时日志过多
assert_eq!(iter.try_next()?.unwrap(), kv.clone());
}

drop(iter);
drop(iter);

tx_1.commit().await?;
tx_1.commit().await?;

for kv in vec_kv.iter().take(times - 1) {
assert_eq!(kv_store.get(&kv.0).await?, Some(kv.1.clone()));
}
for kv in vec_kv.iter().take(times - 1) {
assert_eq!(kv_store.get(&kv.0).await?, Some(kv.1.clone()));
}

Ok(())
})
Ok(())
}

#[tokio::test]
async fn test_transaction_check_optimistic() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");

let config = Config::new(temp_dir.into_path()).major_threshold_with_sst_size(4);
let kv_store = KipStorage::open_with_config(config).await?;

let mut tx_1 = kv_store.new_transaction(CheckType::None).await;
let mut tx_2 = kv_store.new_transaction(CheckType::Optimistic).await;

tx_1.set(Bytes::from("same_key"), Bytes::new());
tx_2.set(Bytes::from("same_key"), Bytes::new());

tx_1.commit().await?;

assert!(matches!(
tx_2.commit().await,
Err(KernelError::RepeatedWrite)
));

Ok(())
}
}
5 changes: 3 additions & 2 deletions src/kernel/lsm/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::kernel::io::IoType;
use crate::kernel::lsm::compactor::{CompactTask, Compactor};
use crate::kernel::lsm::mem_table::{KeyValue, MemTable};
use crate::kernel::lsm::mvcc::Transaction;
use crate::kernel::lsm::mvcc::{CheckType, Transaction};
use crate::kernel::lsm::table::scope::Scope;
use crate::kernel::lsm::table::ss_table::block;
use crate::kernel::lsm::table::TableType;
Expand Down Expand Up @@ -243,7 +243,7 @@ impl KipStorage {

/// 创建事务
#[inline]
pub async fn new_transaction(&self) -> Transaction {
pub async fn new_transaction(&self, check_type: CheckType) -> Transaction {
let _ = self.mem_table().tx_count.fetch_add(1, Ordering::Release);

Transaction {
Expand All @@ -253,6 +253,7 @@ impl KipStorage {

seq_id: Sequence::create(),
write_buf: None,
check_type,
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/kernel/lsm/table/btree_table/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ mod tests {

assert_eq!(iter.seek(Seek::First)?, Some(vec[0].clone()));

assert_eq!(
iter.seek(Seek::Backward(&[b'3']))?,
Some(vec[2].clone())
);
assert_eq!(iter.seek(Seek::Backward(&[b'3']))?, Some(vec[2].clone()));

assert_eq!(iter.seek(Seek::Last)?, Some(vec[5].clone()));

Expand Down

0 comments on commit 4cf9071

Please sign in to comment.