Skip to content

Commit

Permalink
chore: codefmt
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 30, 2024
1 parent 023eb80 commit 9dcd0ee
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 48 deletions.
8 changes: 5 additions & 3 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use ulid::Ulid;

use crate::{
fs::{FileId, FileProvider},
inmem::immutable::{ArrowArrays, Builder, Immutable},
inmem::{
immutable::{ArrowArrays, Builder, Immutable},
mutable::Mutable,
},
ondisk::sstable::SsTable,
record::{KeyRef, Record},
scope::Scope,
Expand All @@ -27,7 +30,6 @@ use crate::{
},
DbOption, Schema,
};
use crate::inmem::mutable::Mutable;

#[derive(Debug)]
pub(crate) enum CompactTask {
Expand Down Expand Up @@ -69,7 +71,7 @@ where
let mut guard = self.schema.write().await;

if guard.mutable.is_empty() {
return Ok(())
return Ok(());
}
let mutable = mem::replace(&mut guard.mutable, Mutable::new(&self.option).await?);
let (file_id, immutable) = mutable.to_immutable().await?;
Expand Down
2 changes: 0 additions & 2 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use arrow::array::RecordBatch;
use crossbeam_skiplist::SkipMap;
use parquet::arrow::ProjectionMask;

use super::mutable::Mutable;
use crate::{
fs::FileProvider,
record::{internal::InternalRecordRef, Key, Record, RecordRef},
stream::record_batch::RecordBatchEntry,
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
Expand Down
10 changes: 3 additions & 7 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, intrinsics::transmute, ops::Bound};
use std::{intrinsics::transmute, ops::Bound};

use async_lock::Mutex;
use crossbeam_skiplist::{
Expand All @@ -10,17 +10,13 @@ use ulid::Ulid;

use crate::{
fs::{FileId, FileProvider},
inmem::immutable::{ArrowArrays, Builder, Immutable},
inmem::immutable::Immutable,
record::{Key, KeyRef, Record},
timestamp::{
timestamped::{Timestamped, TimestampedRef},
Timestamp, EPOCH,
},
wal::{
log::{Log, LogType},
record_entry::RecordEntry,
WalFile,
},
wal::{log::LogType, WalFile},
DbOption, WriteError,
};

Expand Down
48 changes: 25 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(dead_code)]
mod arrows;
mod compaction;
pub mod executor;
pub mod fs;
Expand All @@ -17,7 +15,7 @@ mod wal;

use std::{collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, sync::Arc};

use async_lock::{Mutex, RwLock, RwLockReadGuard};
use async_lock::{RwLock, RwLockReadGuard};
use flume::{bounded, Sender};
use fs::FileProvider;
use futures_core::Stream;
Expand Down Expand Up @@ -65,10 +63,10 @@ where
let option = Arc::new(option);
E::create_dir_all(&option.path).await?;

let (task_tx, mut task_rx) = bounded(1);
let (task_tx, task_rx) = bounded(1);
let schema = Arc::new(RwLock::new(Schema::new(option.clone(), task_tx).await?));

let (mut cleaner, clean_sender) = Cleaner::new(option.clone());
let (mut cleaner, clean_sender) = Cleaner::<E>::new(option.clone());

let version_set = VersionSet::new(clean_sender, option.clone()).await?;
let mut compactor =
Expand Down Expand Up @@ -350,23 +348,25 @@ pub(crate) mod tests {
array::{Array, AsArray, RecordBatch},
datatypes::{DataType, Field, Schema, UInt32Type},
};
use async_lock::{Mutex, RwLock};
use async_lock::RwLock;
use flume::{bounded, Receiver};
use futures_util::io;
use once_cell::sync::Lazy;
use parquet::arrow::ProjectionMask;
use tempfile::TempDir;
use tracing::error;

use crate::{compaction::{CompactTask, Compactor}, executor::{tokio::TokioExecutor, Executor}, fs::FileId, inmem::{
immutable::{ArrowArrays, Builder},
mutable::Mutable,
}, record::{internal::InternalRecordRef, Key, RecordRef}, serdes::{
option::{DecodeError, EncodeError},
Decode, Encode,
}, timestamp::Timestamped, version::{cleaner::Cleaner, set::tests::build_version_set, Version}, wal::log::LogType, DbOption, Immutable, Record, WriteError, DB, Projection};
use crate::inmem::immutable::tests::TestImmutableArrays;
use crate::record::{RecordDecodeError, RecordEncodeError};
use crate::{
compaction::{CompactTask, Compactor},
executor::{tokio::TokioExecutor, Executor},
fs::FileId,
inmem::{immutable::tests::TestImmutableArrays, mutable::Mutable},
record::{internal::InternalRecordRef, RecordDecodeError, RecordEncodeError, RecordRef},
serdes::{Decode, Encode},
version::{cleaner::Cleaner, set::tests::build_version_set, Version},
wal::log::LogType,
DbOption, Immutable, Projection, Record, WriteError, DB,
};

#[derive(Debug, PartialEq, Eq)]
pub struct Test {
Expand Down Expand Up @@ -554,11 +554,11 @@ pub(crate) mod tests {
}
}

pub(crate) async fn get_test_record_batch<E: Executor>(
pub(crate) async fn get_test_record_batch<E: Executor + Send + Sync + 'static>(
option: DbOption,
executor: E,
) -> RecordBatch {
let db: DB<Test, E> = DB::new(option, executor).await.unwrap();
let db: DB<Test, E> = DB::new(option.clone(), executor).await.unwrap();

db.write(
Test {
Expand Down Expand Up @@ -590,7 +590,9 @@ pub(crate) mod tests {
.clone()
}

pub(crate) async fn build_schema(option: DbOption) -> io::Result<(crate::Schema<Test, TokioExecutor>, Receiver<CompactTask>)> {
pub(crate) async fn build_schema(
option: Arc<DbOption>,
) -> io::Result<(crate::Schema<Test, TokioExecutor>, Receiver<CompactTask>)> {
let mutable = Mutable::new(&option).await?;

mutable
Expand Down Expand Up @@ -702,7 +704,7 @@ pub(crate) mod tests {

let schema = Arc::new(RwLock::new(schema));

let (mut cleaner, clean_sender) = Cleaner::new(option.clone());
let (mut cleaner, clean_sender) = Cleaner::<E>::new(option.clone());
let version_set = build_version_set(version, clean_sender, option.clone()).await?;
let mut compactor =
Compactor::<R, E>::new(schema.clone(), option.clone(), version_set.clone());
Expand Down Expand Up @@ -948,7 +950,7 @@ pub(crate) mod tests {
option.level_sst_magnification = 10;
option.max_sst_file_size = 2 * 1024 * 1024;

let db: DB<Test, TokioExecutor> = DB::new(Arc::new(option), TokioExecutor::new()).await.unwrap();
let db: DB<Test, TokioExecutor> = DB::new(option, TokioExecutor::new()).await.unwrap();

for item in test_items() {
db.write(item, 0.into()).await.unwrap();
Expand All @@ -958,9 +960,9 @@ pub(crate) mod tests {
let key = 20.to_string();
let option1 = tx.get(&key, Projection::All).await.unwrap().unwrap();

assert_eq!(option1.get().map(|test_ref| test_ref.vstring), Some("20"));
assert_eq!(option1.get().map(|test_ref| test_ref.vu32), Some(Some(0)));
assert_eq!(option1.get().map(|test_ref| test_ref.vbool), Some(Some(true)));
assert_eq!(option1.get().vstring, "20");
assert_eq!(option1.get().vu32, Some(0));
assert_eq!(option1.get().vbool, Some(true));

dbg!(db.version_set.current().await);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where

#[cfg(test)]
pub(crate) mod tests {
use std::{borrow::Borrow, ops::Bound, path::PathBuf, sync::Arc};
use std::{borrow::Borrow, ops::Bound, path::PathBuf};

use futures_util::StreamExt;
use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask};
Expand Down
2 changes: 1 addition & 1 deletion src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
version::Version,
};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DbOption {
pub(crate) path: PathBuf,
pub(crate) max_mem_table_size: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/timestamp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod timestamped;

use std::{future::Future, io};
use std::io;

use arrow::{
array::{PrimitiveArray, Scalar},
Expand Down
2 changes: 1 addition & 1 deletion src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use parquet::errors::ParquetError;
use thiserror::Error;

use crate::{
compaction::CompactTask,
fs::FileProvider,
record::{Key, KeyRef},
stream,
Expand All @@ -21,7 +22,6 @@ use crate::{
wal::log::LogType,
LockMap, Projection, Record, Scan, Schema, WriteError,
};
use crate::compaction::CompactTask;

pub(crate) struct TransactionScan<'scan, R: Record> {
inner: Range<'scan, R::Key, Option<R>>,
Expand Down
21 changes: 15 additions & 6 deletions src/version/cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
use std::{collections::BTreeMap, io, sync::Arc};
use std::{collections::BTreeMap, io, marker::PhantomData, sync::Arc};

use flume::{Receiver, Sender};
use tokio::fs;
use crate::{fs::FileId, timestamp::Timestamp, DbOption};

use crate::{
fs::{FileId, FileProvider},
timestamp::Timestamp,
DbOption,
};

pub enum CleanTag {
Add { ts: Timestamp, gens: Vec<FileId> },
Clean { ts: Timestamp },
}

pub(crate) struct Cleaner {
pub(crate) struct Cleaner<FP: FileProvider> {
tag_recv: Receiver<CleanTag>,
gens_map: BTreeMap<Timestamp, (Vec<FileId>, bool)>,
option: Arc<DbOption>,
_p: PhantomData<FP>,
}

impl Cleaner {
impl<FP> Cleaner<FP>
where
FP: FileProvider,
{
pub(crate) fn new(option: Arc<DbOption>) -> (Self, Sender<CleanTag>) {
let (tag_send, tag_recv) = flume::bounded(option.clean_channel_buffer);

Expand All @@ -24,6 +32,7 @@ impl Cleaner {
tag_recv,
gens_map: Default::default(),
option,
_p: Default::default(),
},
tag_send,
)
Expand All @@ -45,7 +54,7 @@ impl Cleaner {
break;
}
for gen in gens {
fs::remove_file(self.option.table_path(&gen)).await?;
FP::remove(self.option.table_path(&gen)).await?;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ mod tests {
use futures_util::StreamExt;
use tokio_util::compat::TokioAsyncReadCompatExt;

use super::{log::LogType, FileId, Log, WalFile};
use crate::{timestamp::Timestamped, wal::record_entry::RecordEntry};
use super::{log::LogType, FileId, WalFile};
use crate::timestamp::Timestamped;

#[tokio::test]
async fn write_and_recover() {
Expand All @@ -146,7 +146,7 @@ mod tests {

{
let mut stream = pin!(wal.recover());
let (log_ty, key, value) = stream.next().await.unwrap().unwrap();
let (_, key, value) = stream.next().await.unwrap().unwrap();
assert_eq!(key.ts, 0.into());
assert_eq!(value, Some("hello".to_string()));
}
Expand Down

0 comments on commit 9dcd0ee

Please sign in to comment.