diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 265e850b..219a7b37 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -27,10 +27,11 @@ use crate::{ }, DbOption, Schema, }; +use crate::inmem::mutable::Mutable; #[derive(Debug)] pub(crate) enum CompactTask { - Flush, + Freeze, } pub(crate) struct Compactor @@ -67,6 +68,14 @@ where ) -> Result<(), CompactionError> { let mut guard = self.schema.write().await; + if guard.mutable.is_empty() { + println!("WWWWWW"); + return Ok(()) + } + let mutable = mem::replace(&mut guard.mutable, Mutable::new(&self.option).await?); + let (file_id, immutable) = mutable.to_immutable().await?; + + guard.immutables.push_front((file_id, immutable)); if guard.immutables.len() > self.option.immutable_chunk_num { let excess = guard.immutables.split_off(self.option.immutable_chunk_num); diff --git a/src/executor.rs b/src/executor.rs index cafcac49..52678425 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -16,6 +16,7 @@ pub mod tokio { use super::Executor; + #[derive(Debug)] pub struct TokioExecutor { handle: Handle, } diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index fe03bbc4..b53382fe 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -143,6 +143,10 @@ where self.data.range((lower, upper)) } + pub(crate) fn is_empty(&self) -> bool { + self.data.is_empty() + } + pub(crate) fn check_conflict(&self, key: &R::Key, ts: Timestamp) -> bool { self.data .range::::Key>, _>(( diff --git a/src/lib.rs b/src/lib.rs index 806c9eb9..f018d691 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,15 +82,16 @@ where executor.spawn(async move { while let Ok(task) = task_rx.recv_async().await { match task { - CompactTask::Flush => { + CompactTask::Freeze => { if let Err(err) = compactor.check_then_compaction().await { - todo!(); - // error!("[Compaction Error]: {}", err) + // todo!(); + error!("[Compaction Error]: {}", err) } } } } }); + // TODO: Recover Ok(Self { schema, @@ -109,13 +110,10 @@ where } pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), WriteError> { - let is_excess = { - let schema = self.schema.read().await; - schema.write(LogType::Full, record, ts).await? - }; - if is_excess { - let mut schema = self.schema.write().await; - schema.freeze().await?; + let schema = self.schema.read().await; + + if schema.write(LogType::Full, record, ts).await? { + let _ = schema.compaction_tx.try_send(CompactTask::Freeze); } Ok(()) @@ -129,7 +127,7 @@ where let schema = self.schema.read().await; if let Some(first) = records.next() { - if let Some(record) = records.next() { + let is_excess = if let Some(record) = records.next() { schema.write(LogType::First, first, ts).await?; let mut last_buf = record; @@ -139,12 +137,14 @@ where .write(LogType::Middle, mem::replace(&mut last_buf, record), ts) .await?; } - schema.write(LogType::Last, last_buf, ts).await?; + schema.write(LogType::Last, last_buf, ts).await? } else { - schema.write(LogType::Full, first, ts).await?; + schema.write(LogType::Full, first, ts).await? + }; + if is_excess { + let _ = schema.compaction_tx.try_send(CompactTask::Freeze); } - } - // TODO: is_excess + }; Ok(()) } @@ -228,18 +228,6 @@ where .iter() .any(|(_, immutable)| immutable.check_conflict(key, ts)) } - - async fn freeze(&mut self) -> Result<(), WriteError> { - let mutable = mem::replace(&mut self.mutable, Mutable::new(&self.option).await?); - let (file_id, immutable) = mutable.to_immutable().await?; - - self.immutables.push_front((file_id, immutable)); - if self.immutables.len() > self.option.immutable_chunk_num { - let _ = self.compaction_tx.try_send(CompactTask::Flush); - } - - Ok(()) - } } pub struct Scan<'scan, R, FP> @@ -367,26 +355,18 @@ pub(crate) mod tests { use futures_util::io; use once_cell::sync::Lazy; use parquet::arrow::ProjectionMask; + use tempfile::TempDir; use tracing::error; - use crate::{ - compaction::{CompactTask, Compactor}, - executor::{tokio::TokioExecutor, Executor}, - fs::FileId, - inmem::{ - immutable::{ArrowArrays, Builder}, - mutable::Mutable, - }, - record::{internal::InternalRecordRef, Key, RecordRef}, - serdes::{ - option::{DecodeError, EncodeError}, - Decode, Encode, - }, - timestamp::Timestamped, - version::{cleaner::Cleaner, set::tests::build_version_set, Version}, - wal::log::LogType, - DbOption, Immutable, Record, WriteError, DB, - }; + use crate::{compaction::{CompactTask, Compactor}, executor::{tokio::TokioExecutor, Executor}, fs::FileId, inmem::{ + immutable::{ArrowArrays, Builder}, + mutable::Mutable, + }, record::{internal::InternalRecordRef, Key, RecordRef}, serdes::{ + option::{DecodeError, EncodeError}, + Decode, Encode, + }, timestamp::Timestamped, version::{cleaner::Cleaner, set::tests::build_version_set, Version}, wal::log::LogType, DbOption, Immutable, Record, WriteError, DB, Projection}; + use crate::inmem::immutable::tests::TestImmutableArrays; + use crate::record::{RecordDecodeError, RecordEncodeError}; #[derive(Debug, PartialEq, Eq)] pub struct Test { @@ -484,7 +464,7 @@ pub(crate) mod tests { async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: io::AsyncWrite + Unpin, + W: io::AsyncWrite + Unpin + Send, { self.vstring .encode(writer) @@ -735,7 +715,7 @@ pub(crate) mod tests { executor.spawn(async move { while let Ok(task) = compaction_rx.recv_async().await { match task { - CompactTask::Flush => { + CompactTask::Freeze => { if let Err(err) = compactor.check_then_compaction().await { error!("[Compaction Error]: {}", err) } @@ -751,4 +731,237 @@ pub(crate) mod tests { _p: Default::default(), }) } + + fn test_items() -> Vec { + vec![ + Test { + vstring: 0.to_string(), + vu32: 0, + vbool: Some(true), + }, + Test { + vstring: 1.to_string(), + vu32: 1, + vbool: Some(true), + }, + Test { + vstring: 2.to_string(), + vu32: 2, + vbool: Some(true), + }, + Test { + vstring: 3.to_string(), + vu32: 3, + vbool: Some(true), + }, + Test { + vstring: 4.to_string(), + vu32: 4, + vbool: Some(true), + }, + Test { + vstring: 5.to_string(), + vu32: 5, + vbool: Some(true), + }, + Test { + vstring: 6.to_string(), + vu32: 6, + vbool: Some(true), + }, + Test { + vstring: 7.to_string(), + vu32: 7, + vbool: Some(true), + }, + Test { + vstring: 8.to_string(), + vu32: 8, + vbool: Some(true), + }, + Test { + vstring: 9.to_string(), + vu32: 9, + vbool: Some(true), + }, + Test { + vstring: 10.to_string(), + vu32: 0, + vbool: Some(true), + }, + Test { + vstring: 11.to_string(), + vu32: 1, + vbool: Some(true), + }, + Test { + vstring: 12.to_string(), + vu32: 2, + vbool: Some(true), + }, + Test { + vstring: 13.to_string(), + vu32: 3, + vbool: Some(true), + }, + Test { + vstring: 14.to_string(), + vu32: 4, + vbool: Some(true), + }, + Test { + vstring: 15.to_string(), + vu32: 5, + vbool: Some(true), + }, + Test { + vstring: 16.to_string(), + vu32: 6, + vbool: Some(true), + }, + Test { + vstring: 17.to_string(), + vu32: 7, + vbool: Some(true), + }, + Test { + vstring: 18.to_string(), + vu32: 8, + vbool: Some(true), + }, + Test { + vstring: 19.to_string(), + vu32: 9, + vbool: Some(true), + }, + Test { + vstring: 20.to_string(), + vu32: 0, + vbool: Some(true), + }, + Test { + vstring: 21.to_string(), + vu32: 1, + vbool: Some(true), + }, + Test { + vstring: 22.to_string(), + vu32: 2, + vbool: Some(true), + }, + Test { + vstring: 23.to_string(), + vu32: 3, + vbool: Some(true), + }, + Test { + vstring: 24.to_string(), + vu32: 4, + vbool: Some(true), + }, + Test { + vstring: 25.to_string(), + vu32: 5, + vbool: Some(true), + }, + Test { + vstring: 26.to_string(), + vu32: 6, + vbool: Some(true), + }, + Test { + vstring: 27.to_string(), + vu32: 7, + vbool: Some(true), + }, + Test { + vstring: 28.to_string(), + vu32: 8, + vbool: Some(true), + }, + Test { + vstring: 29.to_string(), + vu32: 9, + vbool: Some(true), + }, + Test { + vstring: 30.to_string(), + vu32: 0, + vbool: Some(true), + }, + Test { + vstring: 31.to_string(), + vu32: 1, + vbool: Some(true), + }, + Test { + vstring: 32.to_string(), + vu32: 2, + vbool: Some(true), + }, + Test { + vstring: 33.to_string(), + vu32: 3, + vbool: Some(true), + }, + Test { + vstring: 34.to_string(), + vu32: 4, + vbool: Some(true), + }, + Test { + vstring: 35.to_string(), + vu32: 5, + vbool: Some(true), + }, + Test { + vstring: 36.to_string(), + vu32: 6, + vbool: Some(true), + }, + Test { + vstring: 37.to_string(), + vu32: 7, + vbool: Some(true), + }, + Test { + vstring: 38.to_string(), + vu32: 8, + vbool: Some(true), + }, + Test { + vstring: 39.to_string(), + vu32: 9, + vbool: Some(true), + }, + ] + } + + #[tokio::test] + async fn read_from_disk() { + let temp_dir = TempDir::new().unwrap(); + + let mut option = DbOption::from(temp_dir.path()); + option.max_mem_table_size = 5; + option.immutable_chunk_num = 1; + option.major_threshold_with_sst_size = 5; + option.level_sst_magnification = 10; + option.max_sst_file_size = 2 * 1024 * 1024; + + let db: DB = DB::new(Arc::new(option), TokioExecutor::new()).await.unwrap(); + + for item in test_items() { + db.write(item, 0.into()).await.unwrap(); + } + + // let tx = db.transaction().await; + // let key = 20.to_string(); + // let option1 = tx.get(&key, Projection::All).await.unwrap().unwrap(); + // + // assert_eq!(option1.get().map(|test_ref| test_ref.vstring), Some("20")); + // assert_eq!(option1.get().map(|test_ref| test_ref.vu32), Some(Some(0))); + // assert_eq!(option1.get().map(|test_ref| test_ref.vbool), Some(Some(true))); + + // dbg!(db.version_set.current().await); + } } diff --git a/src/morseldb_marco/src/lib.rs b/src/morseldb_marco/src/lib.rs index ae126b69..597d4751 100644 --- a/src/morseldb_marco/src/lib.rs +++ b/src/morseldb_marco/src/lib.rs @@ -480,7 +480,7 @@ pub fn morsel_record(_args: TokenStream, input: TokenStream) -> TokenStream { async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: ::futures_io::AsyncWrite + Unpin, + W: ::futures_io::AsyncWrite + Unpin + Send, { #(#encode_method_fields)* diff --git a/src/option.rs b/src/option.rs index 5823b40d..caae8916 100644 --- a/src/option.rs +++ b/src/option.rs @@ -29,7 +29,7 @@ where fn from(path: P) -> Self { DbOption { path: path.into(), - max_mem_table_size: 8 * 1024 * 1024, + max_mem_table_size: 3000, immutable_chunk_num: 3, major_threshold_with_sst_size: 10, level_sst_magnification: 10, diff --git a/src/transaction.rs b/src/transaction.rs index 78dd794e..cfc815cc 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -21,6 +21,7 @@ use crate::{ wal::log::LogType, LockMap, Projection, Record, Scan, Schema, WriteError, }; +use crate::compaction::CompactTask; pub(crate) struct TransactionScan<'scan, R: Record> { inner: Range<'scan, R::Key, Option>, @@ -161,8 +162,9 @@ where Self::append(&self.share, LogType::Last, key, record, new_ts).await? } }; - - // TODO: is excess + if is_excess { + let _ = self.share.compaction_tx.try_send(CompactTask::Freeze); + } Ok(()) } diff --git a/src/version/edit.rs b/src/version/edit.rs index 889711a1..b7025258 100644 --- a/src/version/edit.rs +++ b/src/version/edit.rs @@ -14,7 +14,6 @@ use crate::{ pub(crate) enum VersionEdit { Add { level: u8, scope: Scope }, Remove { level: u8, gen: FileId }, - // TODO: on compaction LatestTimeStamp { ts: Timestamp }, } diff --git a/src/version/mod.rs b/src/version/mod.rs index b15a3872..9c188f83 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -25,6 +25,7 @@ pub(crate) const MAX_LEVEL: usize = 7; pub(crate) type VersionRef = Arc>; +#[derive(Debug)] pub(crate) struct Version where R: Record,