Skip to content

Commit

Permalink
fix: add sync or send in all places
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe authored and KKould committed Jul 30, 2024
1 parent 347beab commit 8598e6c
Show file tree
Hide file tree
Showing 21 changed files with 619 additions and 375 deletions.
413 changes: 247 additions & 166 deletions src/compaction/mod.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub trait FileProvider {

fn create_dir_all(path: impl AsRef<Path>) -> impl Future<Output = io::Result<()>>;

fn open(path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;
fn open(path: impl AsRef<Path> + Send) -> impl Future<Output = io::Result<Self::File>> + Send;

fn remove(path: impl AsRef<Path>) -> impl Future<Output = io::Result<()>>;
fn remove(path: impl AsRef<Path> + Send) -> impl Future<Output = io::Result<()>> + Send;
}

impl Display for FileType {
Expand Down
4 changes: 2 additions & 2 deletions src/fs/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl FileProvider for TokioExecutor {
create_dir_all(path).await
}

async fn open(path: impl AsRef<Path>) -> io::Result<Self::File> {
async fn open(path: impl AsRef<Path> + Send) -> io::Result<Self::File> {
OpenOptions::new()
.truncate(false)
.create(true)
Expand All @@ -24,7 +24,7 @@ impl FileProvider for TokioExecutor {
.map(TokioAsyncReadCompatExt::compat)
}

async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
async fn remove(path: impl AsRef<Path> + Send) -> io::Result<()> {
remove_file(path).await
}
}
4 changes: 2 additions & 2 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use parquet::arrow::ProjectionMask;

use super::mutable::Mutable;
use crate::{
fs::FileProvider,
record::{internal::InternalRecordRef, Key, Record, RecordRef},
stream::record_batch::RecordBatchEntry,
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};
use crate::fs::FileProvider;

pub trait ArrowArrays: Sized {
type Record: Record;
Expand All @@ -32,7 +32,7 @@ pub trait ArrowArrays: Sized {
fn as_record_batch(&self) -> &RecordBatch;
}

pub trait Builder<S>
pub trait Builder<S>: Send
where
S: ArrowArrays,
{
Expand Down
154 changes: 106 additions & 48 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
use std::collections::BTreeMap;
use std::intrinsics::transmute;
use std::ops::Bound;
use std::{collections::BTreeMap, intrinsics::transmute, ops::Bound};

use async_lock::Mutex;
use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};
use futures_util::io;
use ulid::Ulid;
use crate::{DbOption, record::{KeyRef, Record}, timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
}, WriteError};
use crate::fs::{FileId, FileProvider};
use crate::inmem::immutable::{ArrowArrays, Builder, Immutable};
use crate::record::Key;
use crate::wal::log::{Log, LogType};
use crate::wal::record_entry::RecordEntry;
use crate::wal::WalFile;

use crate::{
fs::{FileId, FileProvider},
inmem::immutable::{ArrowArrays, Builder, Immutable},
record::{Key, KeyRef, Record},
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
wal::{
log::{Log, LogType},
record_entry::RecordEntry,
WalFile,
},
DbOption, WriteError,
};

pub(crate) type MutableScan<'scan, R> = Range<
'scan,
Expand All @@ -37,7 +42,7 @@ where
FP: FileProvider,
{
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Mutex<WalFile<FP::File, R>>
wal: Mutex<WalFile<FP::File, R>>,
}

impl<R, FP> Mutable<R, FP>
Expand All @@ -61,23 +66,46 @@ where
R: Record + Send,
FP: FileProvider,
{
pub(crate) async fn insert(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result<usize, WriteError<R>> {
self.append(log_ty, record.key().to_key(), ts, Some(record), false).await
pub(crate) async fn insert(
&self,
log_ty: LogType,
record: R,
ts: Timestamp,
) -> Result<usize, WriteError<R>> {
self.append(log_ty, record.key().to_key(), ts, Some(record), false)
.await
}

pub(crate) async fn remove(&self, log_ty: LogType, key: R::Key, ts: Timestamp) -> Result<usize, WriteError<R>> {
pub(crate) async fn remove(
&self,
log_ty: LogType,
key: R::Key,
ts: Timestamp,
) -> Result<usize, WriteError<R>> {
self.append(log_ty, key, ts, None, false).await
}

async fn append(&self, log_ty: LogType, key: R::Key, ts: Timestamp, value: Option<R>, is_recover: bool) -> Result<usize, WriteError<R>> {
async fn append(
&self,
log_ty: LogType,
key: R::Key,
ts: Timestamp,
value: Option<R>,
is_recover: bool,
) -> Result<usize, WriteError<R>> {
let timestamped_key = Timestamped::new(key, ts);

if !is_recover {
let mut wal_guard = self.wal.lock().await;

wal_guard
.write(log_ty, timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }), value.as_ref().map(R::as_record_ref))
.await.unwrap();
.write(
log_ty,
timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }),
value.as_ref().map(R::as_record_ref),
)
.await
.unwrap();
}
self.data.insert(timestamped_key, value);

Expand Down Expand Up @@ -151,36 +179,49 @@ mod tests {
use std::ops::Bound;

use super::Mutable;
use crate::{DbOption, record::Record, tests::{Test, TestRef}, timestamp::Timestamped};
use crate::executor::tokio::TokioExecutor;
use crate::wal::log::LogType;
use crate::{
executor::tokio::TokioExecutor,
record::Record,
tests::{Test, TestRef},
timestamp::Timestamped,
wal::log::LogType,
DbOption,
};

#[tokio::test]
async fn insert_and_get() {
let key_1 = "key_1".to_owned();
let key_2 = "key_2".to_owned();

let temp_dir = tempfile::tempdir().unwrap();
let mem_table = Mutable::<Test, TokioExecutor>::new(&DbOption::from(temp_dir.path())).await.unwrap();
let mem_table = Mutable::<Test, TokioExecutor>::new(&DbOption::from(temp_dir.path()))
.await
.unwrap();

mem_table.insert(
LogType::Full,
Test {
vstring: key_1.clone(),
vu32: 1,
vbool: Some(true),
},
0_u32.into(),
).await.unwrap();
mem_table.insert(
LogType::Full,
Test {
vstring: key_2.clone(),
vu32: 2,
vbool: None,
},
1_u32.into(),
).await.unwrap();
mem_table
.insert(
LogType::Full,
Test {
vstring: key_1.clone(),
vu32: 1,
vbool: Some(true),
},
0_u32.into(),
)
.await
.unwrap();
mem_table
.insert(
LogType::Full,
Test {
vstring: key_2.clone(),
vu32: 2,
vbool: None,
},
1_u32.into(),
)
.await
.unwrap();

let entry = mem_table.get(&key_1, 0_u32.into()).unwrap();
assert_eq!(
Expand All @@ -196,13 +237,30 @@ mod tests {
#[tokio::test]
async fn range() {
let temp_dir = tempfile::tempdir().unwrap();
let mutable = Mutable::<String, TokioExecutor>::new(&DbOption::from(temp_dir.path())).await.unwrap();
let mutable = Mutable::<String, TokioExecutor>::new(&DbOption::from(temp_dir.path()))
.await
.unwrap();

mutable.insert(LogType::Full,"1".into(), 0_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"2".into(), 0_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"2".into(), 1_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"3".into(), 1_u32.into()).await.unwrap();
mutable.insert(LogType::Full,"4".into(), 0_u32.into()).await.unwrap();
mutable
.insert(LogType::Full, "1".into(), 0_u32.into())
.await
.unwrap();
mutable
.insert(LogType::Full, "2".into(), 0_u32.into())
.await
.unwrap();
mutable
.insert(LogType::Full, "2".into(), 1_u32.into())
.await
.unwrap();
mutable
.insert(LogType::Full, "3".into(), 1_u32.into())
.await
.unwrap();
mutable
.insert(LogType::Full, "4".into(), 0_u32.into())
.await
.unwrap();

let mut scan = mutable.scan((Bound::Unbounded, Bound::Unbounded), 0_u32.into());

Expand Down
Loading

0 comments on commit 8598e6c

Please sign in to comment.