diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 3036c244..41b0b7f5 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -28,6 +28,11 @@ use crate::{ DbOption, Schema, }; +#[derive(Debug)] +pub(crate) enum CompactTask { + Flush, +} + pub(crate) struct Compactor where R: Record, @@ -103,15 +108,14 @@ where pub(crate) async fn minor_compaction( option: &DbOption, - batches: VecDeque>, + batches: VecDeque<(FileId, Immutable)>, ) -> Result>, CompactionError> { if !batches.is_empty() { let mut min = None; let mut max = None; let gen = FileId::new(); - // TODO: WAL CLEAN - // let mut wal_ids = Vec::with_capacity(batches.len()); + let mut wal_ids = Vec::with_capacity(batches.len()); let mut writer = AsyncArrowWriter::try_new( FP::open(option.table_path(&gen)).await?.compat(), @@ -119,7 +123,7 @@ where option.write_parquet_option.clone(), )?; - for batch in batches { + for (file_id, batch) in batches { if let (Some(batch_min), Some(batch_max)) = batch.scope() { if matches!(min.as_ref().map(|min| min > batch_min), Some(true) | None) { min = Some(batch_min.clone()) @@ -129,17 +133,14 @@ where } } writer.write(batch.as_record_batch()).await?; - // TODO: WAL CLEAN - // wal_ids.push(wal_id); + wal_ids.push(file_id); } writer.close().await?; return Ok(Some(Scope { min: min.ok_or(CompactionError::EmptyLevel)?, max: max.ok_or(CompactionError::EmptyLevel)?, gen, - // TODO: WAL CLEAN - wal_ids: None, - // wal_ids: Some(wal_ids), + wal_ids: Some(wal_ids), })); } Ok(None) @@ -420,33 +421,37 @@ pub(crate) mod tests { use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; - use crate::{ - compaction::Compactor, - executor::{tokio::TokioExecutor, Executor}, - fs::FileId, - inmem::{immutable::Immutable, mutable::Mutable}, - record::Record, - scope::Scope, - tests::Test, - version::{edit::VersionEdit, Version}, - DbOption, - }; - - fn build_immutable( - fn_mutable: impl FnOnce(&mut Mutable), - ) -> Immutable { - let mut mutable = Mutable::new(); - - fn_mutable(&mut mutable); - Immutable::from(mutable) + use crate::{compaction::Compactor, executor::{tokio::TokioExecutor, Executor}, fs::FileId, inmem::{immutable::Immutable, mutable::Mutable}, record::Record, scope::Scope, tests::Test, version::{edit::VersionEdit, Version}, DbOption, WriteError}; + use crate::fs::FileProvider; + use crate::timestamp::Timestamp; + use crate::wal::log::LogType; + + async fn build_immutable( + option: &DbOption, + records: Vec<(LogType, R, Timestamp)>, + ) -> Result, WriteError> + where + R: Record + Send, + FP: FileProvider, + { + let mutable: Mutable = Mutable::new(option).await?; + + for (log_ty, record, ts) in records { + let _ = mutable.insert(log_ty, record, ts).await?; + } + Ok(Immutable::from(mutable.data)) } - pub(crate) async fn build_parquet_table( + pub(crate) async fn build_parquet_table( option: &DbOption, gen: FileId, - fn_mutable: impl FnOnce(&mut Mutable), - ) -> Result<(), ParquetError> { - let immutable = build_immutable(fn_mutable); + records: Vec<(LogType, R, Timestamp)>, + ) -> Result<(), WriteError> + where + R: Record + Send, + FP: Executor, + { + let immutable = build_immutable::(option, records).await?; let mut writer = AsyncArrowWriter::try_new( FP::open(option.table_path(&gen)) .await @@ -464,63 +469,59 @@ pub(crate) mod tests { #[tokio::test] async fn minor_compaction() { let temp_dir = tempfile::tempdir().unwrap(); - - let batch_1 = build_immutable(|mutable| { - mutable.insert( - Test { - vstring: 3.to_string(), - vu32: 0, - vbool: None, - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 5.to_string(), - vu32: 0, - vbool: None, - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 6.to_string(), - vu32: 0, - vbool: None, - }, - 0.into(), - ); - }); - let batch_2 = build_immutable(|mutable| { - mutable.insert( - Test { - vstring: 4.to_string(), - vu32: 0, - vbool: None, - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 2.to_string(), - vu32: 0, - vbool: None, - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 1.to_string(), - vu32: 0, - vbool: None, - }, - 0.into(), - ); - }); + let option = DbOption::from(temp_dir.path()); + + let batch_1 = build_immutable::(&option, vec![ + (LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: None, + }, + 0.into()), + (LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: None, + }, + 0.into()), + (LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: None, + }, + 0.into()), + ]).await.unwrap(); + + let batch_2 = build_immutable::(&option, vec![ + (LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: None, + }, + 0.into()), + (LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: None, + }, + 0.into()), + (LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: None, + }, + 0.into()), + ]).await.unwrap(); let scope = Compactor::::minor_compaction( &DbOption::from(temp_dir.path()), - VecDeque::from(vec![batch_2, batch_1]), + VecDeque::from(vec![(FileId::new(), batch_2), (FileId::new(), batch_1)]), ) .await .unwrap() @@ -591,60 +592,54 @@ pub(crate) mod tests { // level 0 let table_gen_1 = FileId::new(); let table_gen_2 = FileId::new(); - build_parquet_table::(option, table_gen_1, |mutable| { - mutable.insert( - Test { - vstring: 1.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into(), - ); - mutable.insert( - Test { - vstring: 2.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into(), - ); - mutable.insert( - Test { - vstring: 3.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - }) + build_parquet_table::(option, table_gen_1, vec![ + (LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into()), + (LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into()), + (LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + ]) .await .unwrap(); - build_parquet_table::(option, table_gen_2, |mutable| { - mutable.insert( - Test { - vstring: 4.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into(), - ); - mutable.insert( - Test { - vstring: 5.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into(), - ); - mutable.insert( - Test { - vstring: 6.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - }) + build_parquet_table::(option, table_gen_2, vec![ + (LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into()), + (LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into()), + (LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + ]) .await .unwrap(); @@ -652,88 +647,79 @@ pub(crate) mod tests { let table_gen_3 = FileId::new(); let table_gen_4 = FileId::new(); let table_gen_5 = FileId::new(); - build_parquet_table::(option, table_gen_3, |mutable| { - mutable.insert( - Test { - vstring: 1.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 2.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 3.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - }) + build_parquet_table::(option, table_gen_3, vec![ + (LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + (LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + (LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + ]) .await .unwrap(); - build_parquet_table::(option, table_gen_4, |mutable| { - mutable.insert( - Test { - vstring: 4.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 5.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 6.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - }) + build_parquet_table::(option, table_gen_4, vec![ + (LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + (LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + (LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + ]) .await .unwrap(); - build_parquet_table::(option, table_gen_5, |mutable| { - mutable.insert( - Test { - vstring: 7.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 8.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - mutable.insert( - Test { - vstring: 9.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into(), - ); - }) + build_parquet_table::(option, table_gen_5, vec![ + (LogType::Full, + Test { + vstring: 7.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + (LogType::Full, + Test { + vstring: 8.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + (LogType::Full, + Test { + vstring: 9.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into()), + ]) .await .unwrap(); diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index df3a42e2..7440da5a 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -5,6 +5,7 @@ use std::{ }; use arrow::array::RecordBatch; +use crossbeam_skiplist::SkipMap; use parquet::arrow::ProjectionMask; use super::mutable::Mutable; @@ -13,6 +14,7 @@ use crate::{ stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH}, }; +use crate::fs::FileProvider; pub trait ArrowArrays: Sized { type Record: Record; @@ -53,12 +55,12 @@ where index: BTreeMap::Key>, u32>, } -impl From> for Immutable +impl From::Key>, Option>> for Immutable where A: ArrowArrays, A::Record: Send, { - fn from(mutable: Mutable) -> Self { + fn from(mutable: SkipMap::Key>, Option>) -> Self { let mut index = BTreeMap::new(); let mut builder = A::builder(mutable.len()); diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 146fd4b2..dad68d9e 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -1,17 +1,23 @@ +use std::collections::BTreeMap; +use std::intrinsics::transmute; use std::ops::Bound; - +use async_lock::Mutex; use crossbeam_skiplist::{ map::{Entry, Range}, SkipMap, }; - -use crate::{ - record::{KeyRef, Record}, - timestamp::{ - timestamped::{Timestamped, TimestampedRef}, - Timestamp, EPOCH, - }, -}; +use futures_util::io; +use ulid::Ulid; +use crate::{DbOption, record::{KeyRef, Record}, timestamp::{ + timestamped::{Timestamped, TimestampedRef}, + Timestamp, EPOCH, +}, WriteError}; +use crate::fs::{FileId, FileProvider}; +use crate::inmem::immutable::{ArrowArrays, Builder, Immutable}; +use crate::record::Key; +use crate::wal::log::{Log, LogType}; +use crate::wal::record_entry::RecordEntry; +use crate::wal::WalFile; pub(crate) type MutableScan<'scan, R> = Range< 'scan, @@ -25,45 +31,57 @@ pub(crate) type MutableScan<'scan, R> = Range< >; #[derive(Debug)] -pub struct Mutable +pub struct Mutable where R: Record, + FP: FileProvider, { - data: SkipMap, Option>, + pub(crate) data: SkipMap, Option>, + wal: Mutex> } -impl Default for Mutable +impl Mutable where + FP: FileProvider, R: Record, { - fn default() -> Self { - Mutable { + pub async fn new(option: &DbOption) -> io::Result { + let file_id = Ulid::new(); + let file = FP::open(option.wal_path(&file_id)).await?; + + Ok(Self { data: Default::default(), - } + wal: Mutex::new(WalFile::new(file, file_id)), + }) } } -impl Mutable +impl Mutable where - R: Record, + R: Record + Send, + FP: FileProvider, { - pub fn new() -> Self { - Mutable::default() + pub(crate) async fn insert(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result> { + self.append(log_ty, record.key().to_key(), ts, Some(record), false).await } -} -impl Mutable -where - R: Record + Send, -{ - pub(crate) fn insert(&self, record: R, ts: Timestamp) { - self.data - // TODO: remove key cloning - .insert(Timestamped::new(record.key().to_key(), ts), Some(record)); + pub(crate) async fn remove(&self, log_ty: LogType, key: R::Key, ts: Timestamp) -> Result> { + self.append(log_ty, key, ts, None, false).await } - pub(crate) fn remove(&self, key: R::Key, ts: Timestamp) { - self.data.insert(Timestamped::new(key, ts), None); + async fn append(&self, log_ty: LogType, key: R::Key, ts: Timestamp, value: Option, is_recover: bool) -> Result> { + let timestamped_key = Timestamped::new(key, ts); + + if !is_recover { + let mut wal_guard = self.wal.lock().await; + + wal_guard + .write(log_ty, timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }), value.as_ref().map(R::as_record_ref)) + .await.unwrap(); + } + self.data.insert(timestamped_key, value); + + Ok(self.data.len()) } fn get( @@ -86,10 +104,6 @@ where }) } - pub(crate) fn into_iter(self) -> impl Iterator, Option)> { - self.data.into_iter() - } - pub(crate) fn scan<'scan>( &'scan self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), @@ -110,11 +124,22 @@ where .next() .is_some() } + + pub(crate) async fn to_immutable(self) -> io::Result<(FileId, Immutable)> { + let file_id = { + let mut wal_guard = self.wal.lock().await; + wal_guard.flush().await?; + wal_guard.file_id() + }; + + Ok((file_id, Immutable::from(self.data))) + } } -impl Mutable +impl Mutable where R: Record, + FP: FileProvider, { pub(crate) fn len(&self) -> usize { self.data.len() @@ -126,35 +151,36 @@ mod tests { use std::ops::Bound; use super::Mutable; - use crate::{ - record::Record, - tests::{Test, TestRef}, - timestamp::Timestamped, - }; - - #[test] - fn insert_and_get() { + use crate::{DbOption, record::Record, tests::{Test, TestRef}, timestamp::Timestamped}; + use crate::executor::tokio::TokioExecutor; + use crate::wal::log::LogType; + + #[tokio::test] + async fn insert_and_get() { let key_1 = "key_1".to_owned(); let key_2 = "key_2".to_owned(); - let mem_table = Mutable::default(); + let temp_dir = tempfile::tempdir().unwrap(); + let mem_table = Mutable::::new(&DbOption::from(temp_dir.path())).await.unwrap(); mem_table.insert( + LogType::Full, Test { vstring: key_1.clone(), vu32: 1, vbool: Some(true), }, 0_u32.into(), - ); + ).await.unwrap(); mem_table.insert( + LogType::Full, Test { vstring: key_2.clone(), vu32: 2, vbool: None, }, 1_u32.into(), - ); + ).await.unwrap(); let entry = mem_table.get(&key_1, 0_u32.into()).unwrap(); assert_eq!( @@ -167,15 +193,16 @@ mod tests { ) } - #[test] - fn range() { - let mutable = Mutable::::new(); + #[tokio::test] + async fn range() { + let temp_dir = tempfile::tempdir().unwrap(); + let mutable = Mutable::::new(&DbOption::from(temp_dir.path())).await.unwrap(); - mutable.insert("1".into(), 0_u32.into()); - mutable.insert("2".into(), 0_u32.into()); - mutable.insert("2".into(), 1_u32.into()); - mutable.insert("3".into(), 1_u32.into()); - mutable.insert("4".into(), 0_u32.into()); + mutable.insert(LogType::Full,"1".into(), 0_u32.into()).await.unwrap(); + mutable.insert(LogType::Full,"2".into(), 0_u32.into()).await.unwrap(); + mutable.insert(LogType::Full,"2".into(), 1_u32.into()).await.unwrap(); + mutable.insert(LogType::Full,"3".into(), 1_u32.into()).await.unwrap(); + mutable.insert(LogType::Full,"4".into(), 0_u32.into()).await.unwrap(); let mut scan = mutable.scan((Bound::Unbounded, Bound::Unbounded), 0_u32.into()); diff --git a/src/lib.rs b/src/lib.rs index 64ad2e9f..46259f56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,8 @@ mod wal; use std::{collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, sync::Arc}; -use async_lock::{RwLock, RwLockReadGuard}; +use async_lock::{Mutex, RwLock, RwLockReadGuard}; +use flume::{bounded, Sender}; use fs::FileProvider; use futures_core::Stream; use futures_util::StreamExt; @@ -39,6 +40,9 @@ use crate::{ stream::{merge::MergeStream, Entry, ScanStream}, version::{cleaner::Cleaner, set::VersionSet, Version, VersionError}, }; +use crate::compaction::{Compactor, CompactTask}; +use crate::fs::FileId; +use crate::wal::log::LogType; pub struct DB where @@ -53,23 +57,41 @@ where impl DB where - R: Record + Send, - E: Executor, + R: Record + Send + Sync, + R::Columns: Send + Sync, + E: Executor +Send + Sync + 'static, { pub async fn new(option: Arc, executor: E) -> Result> { E::create_dir_all(&option.path).await?; - let schema = Arc::new(RwLock::new(Schema::default())); + let (task_tx, mut task_rx) = bounded(1); + let schema = Arc::new(RwLock::new(Schema::new(option.clone(), task_tx).await?)); let (mut cleaner, clean_sender) = Cleaner::new(option.clone()); let version_set = VersionSet::new(clean_sender, option.clone()).await?; + let mut compactor = Compactor::::new( + schema.clone(), + option.clone(), + version_set.clone(), + ); executor.spawn(async move { if let Err(err) = cleaner.listen().await { error!("[Cleaner Error]: {}", err) } }); + executor.spawn(async move { + while let Ok(task) = task_rx.recv_async().await { + match task { + CompactTask::Flush => { + if let Err(err) = compactor.check_then_compaction().await { + error!("[Compaction Error]: {}", err) + } + } + } + } + }); Ok(Self { schema, @@ -87,20 +109,42 @@ where ) } - pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { - let schema = self.schema.read().await; - schema.write(record, ts).await + pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), WriteError> { + let is_excess = { + let schema = self.schema.read().await; + schema.write(LogType::Full, record, ts).await? + }; + if is_excess { + let mut schema = self.schema.write().await; + schema.freeze().await?; + } + + Ok(()) } pub(crate) async fn write_batch( &self, - records: impl Iterator, + mut records: impl ExactSizeIterator, ts: Timestamp, - ) -> io::Result<()> { - let columns = self.schema.read().await; - for record in records { - columns.write(record, ts).await?; + ) -> Result<(), WriteError> { + let schema = self.schema.read().await; + + if let Some(first) = records.next() { + if let Some(record) = records.next() { + schema.write(LogType::First, first, ts).await?; + + let mut last_buf = record; + + while let Some(record) = records.next() { + schema.write(LogType::Middle, mem::replace(&mut last_buf, record), ts).await?; + } + schema.write(LogType::Last, last_buf, ts).await?; + } else { + schema.write(LogType::Full, first, ts).await?; + } } + // TODO: is_excess + Ok(()) } @@ -112,23 +156,12 @@ where pub(crate) struct Schema where R: Record, + FP: FileProvider, { - mutable: Mutable, - immutables: VecDeque>, - _marker: PhantomData, -} - -impl Default for Schema -where - R: Record, -{ - fn default() -> Self { - Self { - mutable: Mutable::default(), - immutables: VecDeque::default(), - _marker: Default::default(), - } - } + mutable: Mutable, + immutables: VecDeque<(FileId, Immutable)>, + compaction_tx: Sender, + option: Arc, } impl Schema @@ -136,14 +169,21 @@ where R: Record + Send, FP: FileProvider, { - async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { - self.mutable.insert(record, ts); - Ok(()) + async fn new(option: Arc, compaction_tx: Sender) -> io::Result { + Ok(Schema { + mutable: Mutable::new(&option).await?, + immutables: Default::default(), + compaction_tx, + option, + }) } - async fn remove(&self, key: R::Key, ts: Timestamp) -> io::Result<()> { - self.mutable.remove(key, ts); - Ok(()) + async fn write(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result> { + Ok(self.mutable.insert(log_ty, record, ts).await? > self.option.max_mem_table_size) + } + + async fn remove(&self, log_ty: LogType, key: R::Key, ts: Timestamp) -> Result> { + Ok(self.mutable.remove(log_ty, key, ts).await? > self.option.max_mem_table_size) } async fn get<'get>( @@ -175,13 +215,19 @@ where || self .immutables .iter() - .any(|immutable| immutable.check_conflict(key, ts)) + .any(|(_, immutable)| immutable.check_conflict(key, ts)) } - fn freeze(&mut self) { - let mutable = mem::replace(&mut self.mutable, Mutable::new()); - let immutable = Immutable::from(mutable); - self.immutables.push_front(immutable); + async fn freeze(&mut self) -> Result<(), WriteError> { + let mutable = mem::replace(&mut self.mutable, Mutable::new(&self.option).await?); + let (file_id, immutable) = mutable.to_immutable().await?; + + self.immutables.push_front((file_id, immutable)); + if self.immutables.len() > self.option.immutable_chunk_num { + let _ = self.compaction_tx.try_send(CompactTask::Flush); + } + + Ok(()) } } @@ -256,7 +302,7 @@ where .scan((self.lower, self.upper), self.ts) .into(), ); - for immutable in &self.schema.immutables { + for (_, immutable) in &self.schema.immutables { self.streams.push( immutable .scan((self.lower, self.upper), self.ts, self.projection.clone()) @@ -299,7 +345,7 @@ pub enum Projection { #[cfg(test)] pub(crate) mod tests { - use std::{collections::VecDeque, sync::Arc}; + use std::{collections::VecDeque, mem, sync::Arc}; use arrow::{ array::{ @@ -308,7 +354,8 @@ pub(crate) mod tests { }, datatypes::{DataType, Field, Schema, UInt32Type}, }; - use async_lock::RwLock; + use async_lock::{Mutex, RwLock}; + use flume::{bounded, Receiver}; use futures_util::io; use morseldb_marco::morsel_record; use once_cell::sync::Lazy; @@ -330,6 +377,9 @@ pub(crate) mod tests { version::{cleaner::Cleaner, set::tests::build_version_set, Version}, DbOption, Immutable, Record, WriteError, DB, }; + use crate::compaction::{Compactor, CompactTask}; + use crate::fs::FileId; + use crate::wal::log::LogType; #[morsel_record] pub struct Test { @@ -339,11 +389,11 @@ pub(crate) mod tests { pub vbool: Option, } - pub(crate) async fn get_test_record_batch( + pub(crate) async fn get_test_record_batch( option: Arc, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option, executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor).await.unwrap(); db.write( Test { @@ -368,86 +418,97 @@ pub(crate) mod tests { let mut schema = db.schema.write().await; - schema.freeze(); + let mutable = mem::replace(&mut schema.mutable, Mutable::new(&option).await.unwrap()); - schema.immutables[0].as_record_batch().clone() + Immutable::<::Columns>::from(mutable.data).as_record_batch().clone() } - pub(crate) async fn build_schema() -> crate::Schema { - let mutable = Mutable::new(); + pub(crate) async fn build_schema(option: Arc) -> io::Result<(crate::Schema, Receiver)> { + let mutable = Mutable::new(&option).await?; mutable.insert( + LogType::Full, Test { vstring: "alice".to_string(), vu32: 1, vbool: Some(true), }, 1_u32.into(), - ); + ).await.unwrap(); mutable.insert( + LogType::Full, Test { vstring: "ben".to_string(), vu32: 2, vbool: Some(true), }, 1_u32.into(), - ); + ).await.unwrap(); mutable.insert( + LogType::Full, Test { vstring: "carl".to_string(), vu32: 3, vbool: Some(true), }, 1_u32.into(), - ); + ).await.unwrap(); let immutables = { - let mutable = Mutable::new(); + let mutable: Mutable = Mutable::new(&option).await?; mutable.insert( + LogType::Full, Test { vstring: "dice".to_string(), vu32: 4, vbool: Some(true), }, 1_u32.into(), - ); + ).await.unwrap(); mutable.insert( + LogType::Full, Test { vstring: "erika".to_string(), vu32: 5, vbool: Some(true), }, 1_u32.into(), - ); + ).await.unwrap(); mutable.insert( + LogType::Full, Test { vstring: "funk".to_string(), vu32: 6, vbool: Some(true), }, 1_u32.into(), - ); + ).await.unwrap(); - VecDeque::from(vec![Immutable::from(mutable)]) + VecDeque::from(vec![(FileId::new(), Immutable::from(mutable.data))]) }; - crate::Schema { + let (compaction_tx, compaction_rx) = bounded(1); + + Ok((crate::Schema { mutable, immutables, - _marker: Default::default(), - } + compaction_tx, + option, + }, compaction_rx)) } pub(crate) async fn build_db( option: Arc, + compaction_rx: Receiver, executor: E, schema: crate::Schema, version: Version, ) -> Result, WriteError> where - R: Record, - E: Executor, + R: Record + Send + Sync, + R::Columns: Send + Sync, + E: Executor +Send + Sync + 'static, { E::create_dir_all(&option.path).await?; @@ -455,12 +516,28 @@ pub(crate) mod tests { let (mut cleaner, clean_sender) = Cleaner::new(option.clone()); let version_set = build_version_set(version, clean_sender, option.clone()).await?; + let mut compactor = Compactor::::new( + schema.clone(), + option.clone(), + version_set.clone(), + ); executor.spawn(async move { if let Err(err) = cleaner.listen().await { error!("[Cleaner Error]: {}", err) } }); + executor.spawn(async move { + while let Ok(task) = compaction_rx.recv_async().await { + match task { + CompactTask::Flush => { + if let Err(err) = compactor.check_then_compaction().await { + error!("[Compaction Error]: {}", err) + } + } + } + } + }); Ok(DB { schema, diff --git a/src/record/mod.rs b/src/record/mod.rs index 00c7e1a9..f59ba682 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -33,7 +33,7 @@ pub trait KeyRef<'r>: Clone + Encode + PartialEq + Ord { fn to_key(&self) -> Self::Key; } -pub trait Record: 'static + Sized + Decode { +pub trait Record: 'static + Sized + Decode + Debug { type Columns: ArrowArrays; type Key: Key; diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 705ce95f..19e43b89 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -138,22 +138,26 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable}; + use crate::{DbOption, executor::tokio::TokioExecutor, inmem::mutable::Mutable}; + use crate::wal::log::LogType; #[tokio::test] async fn merge_mutable() { - let m1 = Mutable::::new(); - m1.remove("b".into(), 3.into()); - m1.insert("c".into(), 4.into()); - m1.insert("d".into(), 5.into()); + let temp_dir = tempfile::tempdir().unwrap(); + let option = DbOption::from(temp_dir.path()); - let m2 = Mutable::::new(); - m2.insert("a".into(), 1.into()); - m2.insert("b".into(), 2.into()); - m2.insert("c".into(), 3.into()); + let m1 = Mutable::::new(&option).await.unwrap(); + m1.remove(LogType::Full,"b".into(), 3.into()).await.unwrap(); + m1.insert(LogType::Full,"c".into(), 4.into()).await.unwrap(); + m1.insert(LogType::Full,"d".into(), 5.into()).await.unwrap(); - let m3 = Mutable::::new(); - m3.insert("e".into(), 4.into()); + let m2 = Mutable::::new(&option).await.unwrap(); + m2.insert(LogType::Full,"a".into(), 1.into()).await.unwrap(); + m2.insert(LogType::Full,"b".into(), 2.into()).await.unwrap(); + m2.insert(LogType::Full,"c".into(), 3.into()).await.unwrap(); + + let m3 = Mutable::::new(&option).await.unwrap(); + m3.insert(LogType::Full,"e".into(), 4.into()).await.unwrap(); let lower = "a".to_string(); let upper = "e".to_string(); @@ -176,12 +180,15 @@ mod tests { #[tokio::test] async fn merge_mutable_remove_duplicates() { - let m1 = Mutable::::new(); - m1.insert("1".into(), 0_u32.into()); - m1.insert("2".into(), 0_u32.into()); - m1.insert("2".into(), 1_u32.into()); - m1.insert("3".into(), 1_u32.into()); - m1.insert("4".into(), 0_u32.into()); + let temp_dir = tempfile::tempdir().unwrap(); + let option = DbOption::from(temp_dir.path()); + + let m1 = Mutable::::new(&option).await.unwrap(); + m1.insert(LogType::Full,"1".into(), 0_u32.into()).await.unwrap(); + m1.insert(LogType::Full,"2".into(), 0_u32.into()).await.unwrap(); + m1.insert(LogType::Full,"2".into(), 1_u32.into()).await.unwrap(); + m1.insert(LogType::Full,"3".into(), 1_u32.into()).await.unwrap(); + m1.insert(LogType::Full,"4".into(), 0_u32.into()).await.unwrap(); let lower = "1".to_string(); let upper = "4".to_string(); diff --git a/src/transaction.rs b/src/transaction.rs index 542cf062..57971942 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -20,6 +20,7 @@ use crate::{ version::{set::transaction_ts, VersionRef}, LockMap, Projection, Record, Scan, Schema, WriteError, }; +use crate::wal::log::LogType; pub(crate) struct TransactionScan<'scan, R: Record> { inner: Range<'scan, R::Key, Option>, @@ -120,7 +121,7 @@ where } } - pub async fn commit(self) -> Result<(), CommitError> { + pub async fn commit(mut self) -> Result<(), CommitError> { let mut _key_guards = Vec::new(); for (key, _) in self.local.iter() { @@ -136,15 +137,41 @@ where return Err(CommitError::WriteConflict(key.clone())); } } - for (key, record) in self.local { - let new_ts = transaction_ts(); - match record { - Some(record) => self.share.write(record, new_ts).await?, - None => self.share.remove(key, new_ts).await?, + + let len = self.local.len(); + let is_excess = match len { + 0 => false, + 1 => { + let new_ts = transaction_ts(); + let (key ,record) = self.local.pop_first().unwrap(); + Self::append(&self.share, LogType::Full, key, record, new_ts).await? } - } + _ => { + let new_ts = transaction_ts(); + let mut iter = self.local.into_iter(); + + let (key, record) = iter.next().unwrap(); + Self::append(&self.share, LogType::First, key, record, new_ts).await?; + + for (key, record) in (&mut iter).take(len - 2) { + Self::append(&self.share, LogType::Middle, key, record, new_ts).await?; + } + + let (key, record) = iter.next().unwrap(); + Self::append(&self.share, LogType::Last, key, record, new_ts).await? + } + }; + + // TODO: is excess Ok(()) } + + async fn append(schema: &Schema, log_ty: LogType, key: ::Key, record: Option, new_ts: Timestamp) -> Result> { + Ok(match record { + Some(record) => schema.write(log_ty, record, new_ts).await?, + None => schema.remove(log_ty, key, new_ts).await?, + }) + } } pub enum TransactionEntry<'entry, R> @@ -179,6 +206,8 @@ where Io(#[from] io::Error), #[error("transaction parquet error {:?}", .0)] Parquet(#[from] ParquetError), + #[error("transaction write error {:?}", .0)] + Write(#[from] WriteError), #[error("transaction write conflict: {:?}", .0)] WriteConflict(R::Key), } @@ -304,8 +333,8 @@ mod tests { let option = Arc::new(DbOption::from(temp_dir.path())); let (_, version) = build_version(&option).await; - let schema = build_schema().await; - let db = build_db(option, TokioExecutor::new(), schema, version) + let (schema, compaction_rx) = build_schema(option.clone()).await.unwrap(); + let db = build_db(option, compaction_rx, TokioExecutor::new(), schema, version) .await .unwrap(); diff --git a/src/wal/mod.rs b/src/wal/mod.rs index 78b47e3f..71174899 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -1,5 +1,6 @@ mod checksum; -mod log; +pub(crate) mod log; +pub(crate) mod record_entry; use std::{io, marker::PhantomData}; @@ -20,6 +21,9 @@ use crate::{ serdes::{Decode, Encode}, timestamp::Timestamped, }; +use crate::record::Key; +use crate::wal::log::LogType; +use crate::wal::record_entry::RecordEntry; #[derive(Debug)] pub(crate) struct WalFile { @@ -47,17 +51,19 @@ where F: AsyncWrite + Unpin, R: Record, { - async fn write<'r>( + pub(crate) async fn write<'r>( &mut self, - log: Log>>, + log_ty: LogType, + key: Timestamped<::Ref<'r>>, + value: Option>, ) -> Result<(), as Encode>::Error> { let mut writer = HashWriter::new(&mut self.file); - log.encode(&mut writer).await?; + Log::new(log_ty, RecordEntry::::Encode((key, value))).encode(&mut writer).await?; writer.eol().await?; Ok(()) } - async fn flush(&mut self) -> io::Result<()> { + pub(crate) async fn flush(&mut self) -> io::Result<()> { self.file.flush().await } } @@ -69,7 +75,7 @@ where { fn recover( &mut self, - ) -> impl Stream>, RecoverError<::Error>>> + '_ + ) -> impl Stream, Option), RecoverError<::Error>>> + '_ { stream! { let mut file = BufReader::new(&mut self.file); @@ -81,14 +87,17 @@ where let mut reader = HashReader::new(&mut file); - let record = Log::decode(&mut reader).await.map_err(RecoverError::Decode)?; + let record = Log::>::decode(&mut reader).await.map_err(RecoverError::Io)?; if !reader.checksum().await? { yield Err(RecoverError::Checksum); return; } - - yield Ok(record); + if let RecordEntry::Decode((key, value)) = record.record { + yield Ok((record.log_type, key, value)); + } else { + unreachable!() + } } } } @@ -113,13 +122,14 @@ mod tests { use super::{log::LogType, FileId, Log, WalFile}; use crate::timestamp::Timestamped; + use crate::wal::record_entry::RecordEntry; #[tokio::test] async fn write_and_recover() { let mut file = Vec::new(); { let mut wal = WalFile::<_, String>::new(Cursor::new(&mut file).compat(), FileId::new()); - wal.write(Log::new(LogType::Full, Timestamped::new("hello", 0.into()))) + wal.write(LogType::Full, Timestamped::new("hello", 0.into()), Some("hello")) .await .unwrap(); wal.flush().await.unwrap(); @@ -129,12 +139,12 @@ mod tests { { let mut stream = pin!(wal.recover()); - let log = stream.next().await.unwrap().unwrap(); - assert_eq!(log.record.ts, 0.into()); - assert_eq!(log.record.value, "hello".to_string()); + let (log_ty, key, value) = stream.next().await.unwrap().unwrap(); + assert_eq!(key.ts, 0.into()); + assert_eq!(value, Some("hello".to_string())); } - wal.write(Log::new(LogType::Full, Timestamped::new("world", 1.into()))) + wal.write(LogType::Full, Timestamped::new("world", 1.into()), Some("world")) .await .unwrap(); } @@ -144,12 +154,12 @@ mod tests { { let mut stream = pin!(wal.recover()); - let log = stream.next().await.unwrap().unwrap(); - assert_eq!(log.record.ts, 0.into()); - assert_eq!(log.record.value, "hello".to_string()); - let log = stream.next().await.unwrap().unwrap(); - assert_eq!(log.record.ts, 1.into()); - assert_eq!(log.record.value, "world".to_string()); + let (_, key, value) = stream.next().await.unwrap().unwrap(); + assert_eq!(key.ts, 0.into()); + assert_eq!(value, Some("hello".to_string())); + let (_, key, value) = stream.next().await.unwrap().unwrap(); + assert_eq!(key.ts, 1.into()); + assert_eq!(value, Some("world".to_string())); } } } diff --git a/src/wal/record_entry.rs b/src/wal/record_entry.rs new file mode 100644 index 00000000..f284ee0c --- /dev/null +++ b/src/wal/record_entry.rs @@ -0,0 +1,91 @@ +use std::io; +use futures_io::{AsyncRead, AsyncWrite}; +use crate::record::{Key, Record}; +use crate::serdes::{Decode, Encode}; +use crate::timestamp::Timestamped; + +pub(crate) enum RecordEntry<'r, R> +where + R: Record +{ + Encode((Timestamped<::Ref<'r>>, Option>)), + Decode((Timestamped, Option)), +} + +impl Encode for RecordEntry<'_, R> +where + R: Record, +{ + type Error = io::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: AsyncWrite + Unpin + { + if let RecordEntry::Encode((key, recode_ref)) = self { + key.encode(writer).await.unwrap(); + recode_ref.encode(writer).await.unwrap(); + + return Ok(()) + } + unreachable!() + } + + fn size(&self) -> usize { + if let RecordEntry::Encode((key, recode_ref)) = self { + return key.size() + recode_ref.size() + } + unreachable!() + } +} + +impl Decode for RecordEntry<'_, Re> +where + Re: Record, +{ + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result + where + R: AsyncRead + Unpin + { + let key = Timestamped::::decode(reader).await.unwrap(); + let record = Option::::decode(reader).await.unwrap(); + + Ok(RecordEntry::Decode((key, record))) + } +} + +#[cfg(test)] +mod tests { + use futures_util::io::Cursor; + use crate::serdes::{Decode, Encode}; + use crate::timestamp::Timestamped; + use crate::wal::record_entry::RecordEntry; + + #[tokio::test] + async fn encode_and_decode() { + let entry: RecordEntry<'static, String> = RecordEntry::Encode((Timestamped::new("hello", 0.into()), Some("hello"))); + let bytes = { + let mut cursor = Cursor::new(vec![]); + + entry.encode(&mut cursor).await.unwrap(); + cursor.into_inner() + }; + + let decode_entry = { + let mut cursor = Cursor::new(bytes); + + RecordEntry::<'static, String>::decode(&mut cursor).await.unwrap() + }; + + if let (RecordEntry::Encode((key_1, value_1)), RecordEntry::Decode((key_2, value_2))) = (entry, decode_entry) { + assert_eq!(key_1.value, key_2.value.as_str()); + assert_eq!(key_1.ts, key_2.ts); + assert_eq!(value_1, value_2.as_ref().map(String::as_str)); + + return; + } + unreachable!() + } +} \ No newline at end of file