Skip to content
This repository has been archived by the owner on Aug 4, 2024. It is now read-only.

Commit

Permalink
Add rocksdb as new kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Oct 21, 2023
1 parent 9399f64 commit 6dac4ca
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tonic = "0.10.2"
prost = "0.12"
# 其他数据库内核
sled = "0.34.7"
rocksdb = "0.21.0"

[dev-dependencies]
assert_cmd = "0.11.0"
Expand Down
16 changes: 12 additions & 4 deletions src/bench/kernel_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::Relaxed;

use kip_db::kernel::lsm::storage::KipStorage;
use kip_db::kernel::rocksdb_storage::RocksdbStorage;
use kip_db::kernel::sled_storage::SledStorage;
use kip_db::kernel::Storage;

Expand Down Expand Up @@ -109,7 +110,7 @@ fn monotonic_crud<T: Storage>(c: &mut Criterion) {
let count = AtomicU32::new(0_u32);
b.iter(|| async {
db.set(
Bytes::from(count.fetch_add(1, Relaxed).to_be_bytes()),
Bytes::from(count.fetch_add(1, Relaxed).to_be_bytes().to_vec()),
Bytes::new(),
)
.await
Expand Down Expand Up @@ -150,9 +151,12 @@ fn random_crud<T: Storage>(c: &mut Criterion) {

c.bench_function(&format!("Store: {}, random inserts", T::name()), |b| {
b.iter(|| async {
db.set(Bytes::from(random(SIZE).to_be_bytes()), Bytes::new())
.await
.unwrap();
db.set(
Bytes::from(random(SIZE).to_be_bytes().to_vec()),
Bytes::new(),
)
.await
.unwrap();
})
});

Expand Down Expand Up @@ -191,21 +195,25 @@ fn empty_opens<T: Storage>(c: &mut Criterion) {
fn kv_bulk_load(c: &mut Criterion) {
bulk_load::<KipStorage>(c);
bulk_load::<SledStorage>(c);
bulk_load::<RocksdbStorage>(c);
}

fn kv_monotonic_crud(c: &mut Criterion) {
monotonic_crud::<KipStorage>(c);
monotonic_crud::<SledStorage>(c);
monotonic_crud::<RocksdbStorage>(c);
}

fn kv_random_crud(c: &mut Criterion) {
random_crud::<KipStorage>(c);
random_crud::<SledStorage>(c);
random_crud::<RocksdbStorage>(c);
}

fn kv_empty_opens(c: &mut Criterion) {
empty_opens::<KipStorage>(c);
empty_opens::<SledStorage>(c);
empty_opens::<RocksdbStorage>(c);
}

criterion_group!(
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub enum KernelError {
#[error(transparent)]
SledErr(#[from] sled::Error),

#[error(transparent)]
RocksdbErr(#[from] rocksdb::Error),

#[error("Cache size overflow")]
CacheSizeOverFlow,

Expand Down
4 changes: 2 additions & 2 deletions src/kernel/lsm/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ pub(crate) type MergeShardingVec = Vec<(i64, Vec<KeyValue>)>;
pub(crate) type DelNode = (Vec<i64>, TableMeta);
/// Major压缩时的待删除Gen封装(N为此次Major所压缩的Level),第一个为Level N级,第二个为Level N+1级
pub(crate) type DelNodeTuple = (DelNode, DelNode);
pub(crate) type SeekScope = (Scope, usize);
pub type SeekScope = (Scope, usize);

/// Store与Compactor的交互信息
#[derive(Debug)]
pub(crate) enum CompactTask {
pub enum CompactTask {
Seek(SeekScope),
Flush(Option<oneshot::Sender<()>>),
}
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/lsm/table/scope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const SEEK_COMPACTION_COUNT: u32 = 100;
/// 用于缓存SSTable中所有数据的第一个和最后一个数据的Key
/// 标明数据的范围以做到快速区域定位
#[derive(Serialize, Deserialize, Debug, Clone)]
pub(crate) struct Scope {
pub struct Scope {
pub(crate) start: Bytes,
pub(crate) end: Bytes,
gen: i64,
Expand Down
2 changes: 1 addition & 1 deletion src/kernel/lsm/version/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::mpsc::UnboundedReceiver;
use tracing::error;

#[derive(Debug)]
pub(crate) enum CleanTag {
pub enum CleanTag {
Clean(u64),
Add { version: u64, gens: Vec<i64> },
}
Expand Down
1 change: 1 addition & 0 deletions src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::KernelError;

pub mod io;
pub mod lsm;
pub mod rocksdb_storage;
pub mod sled_storage;
pub mod utils;

Expand Down
72 changes: 72 additions & 0 deletions src/kernel/rocksdb_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::kernel::Storage;
use crate::KernelError;
use async_trait::async_trait;
use bytes::Bytes;
use core::slice::SlicePattern;
use std::path::PathBuf;

#[derive(Debug)]
pub struct RocksdbStorage {
data_base: rocksdb::DB,
}

#[async_trait]
impl Storage for RocksdbStorage {
#[inline]
fn name() -> &'static str
where
Self: Sized,
{
"Rocksdb"
}

#[inline]
async fn open(path: impl Into<PathBuf> + Send) -> crate::kernel::KernelResult<Self> {
let db = rocksdb::DB::open_default(path.into())?;

Ok(RocksdbStorage { data_base: db })
}

#[inline]
async fn flush(&self) -> crate::kernel::KernelResult<()> {
let _ignore = self.data_base.flush()?;
Ok(())
}

#[inline]
async fn set(&self, key: Bytes, value: Bytes) -> crate::kernel::KernelResult<()> {
let _ignore = self.data_base.put(key.as_slice(), value.to_vec())?;
Ok(())
}

#[inline]
async fn get(&self, key: &[u8]) -> crate::kernel::KernelResult<Option<Bytes>> {
match self.data_base.get(key)? {
None => Ok(None),
Some(i_vec) => Ok(Some(Bytes::from(i_vec.to_vec()))),
}
}

#[inline]
async fn remove(&self, key: &[u8]) -> crate::kernel::KernelResult<()> {
match self.data_base.delete(key) {
Ok(_) => Ok(()),
Err(e) => Err(KernelError::RocksdbErr(e)),
}
}

#[inline]
async fn size_of_disk(&self) -> crate::kernel::KernelResult<u64> {
unimplemented!("Rocksdb does not support size_of_disk()")
}

#[inline]
async fn len(&self) -> crate::kernel::KernelResult<usize> {
unimplemented!("Rocksdb does not support len()")
}

#[inline]
async fn is_empty(&self) -> bool {
unimplemented!("Rocksdb does not support is_empty()")
}
}

0 comments on commit 6dac4ca

Please sign in to comment.