Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 26, 2024
1 parent 31d1650 commit ba530f9
Show file tree
Hide file tree
Showing 9 changed files with 609 additions and 380 deletions.
418 changes: 202 additions & 216 deletions src/compaction/mod.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use arrow::array::RecordBatch;
use crossbeam_skiplist::SkipMap;
use parquet::arrow::ProjectionMask;

use super::mutable::Mutable;
Expand All @@ -13,6 +14,7 @@ use crate::{
stream::record_batch::RecordBatchEntry,
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};
use crate::fs::FileProvider;

pub trait ArrowArrays: Sized {
type Record: Record;
Expand Down Expand Up @@ -53,12 +55,12 @@ where
index: BTreeMap<Timestamped<<A::Record as Record>::Key>, u32>,
}

impl<A> From<Mutable<A::Record>> for Immutable<A>
impl<A> From<SkipMap<Timestamped<<A::Record as Record>::Key>, Option<A::Record>>> for Immutable<A>
where
A: ArrowArrays,
A::Record: Send,
{
fn from(mutable: Mutable<A::Record>) -> Self {
fn from(mutable: SkipMap<Timestamped<<A::Record as Record>::Key>, Option<A::Record>>) -> Self {
let mut index = BTreeMap::new();
let mut builder = A::builder(mutable.len());

Expand Down
135 changes: 81 additions & 54 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use std::collections::BTreeMap;
use std::intrinsics::transmute;
use std::ops::Bound;

use async_lock::Mutex;
use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};

use crate::{
record::{KeyRef, Record},
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
};
use futures_util::io;
use ulid::Ulid;
use crate::{DbOption, record::{KeyRef, Record}, timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
}, WriteError};
use crate::fs::{FileId, FileProvider};
use crate::inmem::immutable::{ArrowArrays, Builder, Immutable};
use crate::record::Key;
use crate::wal::log::{Log, LogType};
use crate::wal::record_entry::RecordEntry;
use crate::wal::WalFile;

pub(crate) type MutableScan<'scan, R> = Range<
'scan,
Expand All @@ -25,45 +31,57 @@ pub(crate) type MutableScan<'scan, R> = Range<
>;

#[derive(Debug)]
pub struct Mutable<R>
pub struct Mutable<R, FP>
where
R: Record,
FP: FileProvider,
{
data: SkipMap<Timestamped<R::Key>, Option<R>>,
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Mutex<WalFile<FP::File, R>>
}

impl<R> Default for Mutable<R>
impl<R, FP> Mutable<R, FP>
where
FP: FileProvider,
R: Record,
{
fn default() -> Self {
Mutable {
pub async fn new(option: &DbOption) -> io::Result<Self> {
let file_id = Ulid::new();
let file = FP::open(option.wal_path(&file_id)).await?;

Ok(Self {
data: Default::default(),
}
wal: Mutex::new(WalFile::new(file, file_id)),
})
}
}

impl<R> Mutable<R>
impl<R, FP> Mutable<R, FP>
where
R: Record,
R: Record + Send,
FP: FileProvider,
{
pub fn new() -> Self {
Mutable::default()
pub(crate) async fn insert(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result<usize, WriteError<R>> {
self.append(log_ty, record.key().to_key(), ts, Some(record), false).await
}
}

impl<R> Mutable<R>
where
R: Record + Send,
{
pub(crate) fn insert(&self, record: R, ts: Timestamp) {
self.data
// TODO: remove key cloning
.insert(Timestamped::new(record.key().to_key(), ts), Some(record));
pub(crate) async fn remove(&self, log_ty: LogType, key: R::Key, ts: Timestamp) -> Result<usize, WriteError<R>> {
self.append(log_ty, key, ts, None, false).await
}

pub(crate) fn remove(&self, key: R::Key, ts: Timestamp) {
self.data.insert(Timestamped::new(key, ts), None);
async fn append(&self, log_ty: LogType, key: R::Key, ts: Timestamp, value: Option<R>, is_recover: bool) -> Result<usize, WriteError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if !is_recover {
let mut wal_guard = self.wal.lock().await;

wal_guard
.write(log_ty, timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }), value.as_ref().map(R::as_record_ref))
.await.unwrap();
}
self.data.insert(timestamped_key, value);

Ok(self.data.len())
}

fn get(
Expand All @@ -86,10 +104,6 @@ where
})
}

pub(crate) fn into_iter(self) -> impl Iterator<Item = (Timestamped<R::Key>, Option<R>)> {
self.data.into_iter()
}

pub(crate) fn scan<'scan>(
&'scan self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
Expand All @@ -110,11 +124,22 @@ where
.next()
.is_some()
}

pub(crate) async fn to_immutable(self) -> io::Result<(FileId, Immutable<R::Columns>)> {
let file_id = {
let mut wal_guard = self.wal.lock().await;
wal_guard.flush().await?;
wal_guard.file_id()
};

Ok((file_id, Immutable::from(self.data)))
}
}

impl<R> Mutable<R>
impl<R, FP> Mutable<R, FP>
where
R: Record,
FP: FileProvider,
{
pub(crate) fn len(&self) -> usize {
self.data.len()
Expand All @@ -126,35 +151,36 @@ mod tests {
use std::ops::Bound;

use super::Mutable;
use crate::{
record::Record,
tests::{Test, TestRef},
timestamp::Timestamped,
};

#[test]
fn insert_and_get() {
use crate::{DbOption, record::Record, tests::{Test, TestRef}, timestamp::Timestamped};
use crate::executor::tokio::TokioExecutor;
use crate::wal::log::LogType;

#[tokio::test]
async fn insert_and_get() {
let key_1 = "key_1".to_owned();
let key_2 = "key_2".to_owned();

let mem_table = Mutable::default();
let temp_dir = tempfile::tempdir().unwrap();
let mem_table = Mutable::<Test, TokioExecutor>::new(&DbOption::from(temp_dir.path())).await.unwrap();

mem_table.insert(
LogType::Full,
Test {
vstring: key_1.clone(),
vu32: 1,
vbool: Some(true),
},
0_u32.into(),
);
).await.unwrap();
mem_table.insert(
LogType::Full,
Test {
vstring: key_2.clone(),
vu32: 2,
vbool: None,
},
1_u32.into(),
);
).await.unwrap();

let entry = mem_table.get(&key_1, 0_u32.into()).unwrap();
assert_eq!(
Expand All @@ -167,15 +193,16 @@ mod tests {
)
}

#[test]
fn range() {
let mutable = Mutable::<String>::new();
#[tokio::test]
async fn range() {
let temp_dir = tempfile::tempdir().unwrap();
let mutable = Mutable::<String, TokioExecutor>::new(&DbOption::from(temp_dir.path())).await.unwrap();

mutable.insert("1".into(), 0_u32.into());
mutable.insert("2".into(), 0_u32.into());
mutable.insert("2".into(), 1_u32.into());
mutable.insert("3".into(), 1_u32.into());
mutable.insert("4".into(), 0_u32.into());
mutable.insert(LogType::Full,"1".into(), 0_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"2".into(), 0_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"2".into(), 1_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"3".into(), 1_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"4".into(), 0_u32.into()).await.unwrap();

let mut scan = mutable.scan((Bound::Unbounded, Bound::Unbounded), 0_u32.into());

Expand Down
Loading

0 comments on commit ba530f9

Please sign in to comment.