diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 41b0b7f5..265e850b 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -421,10 +421,19 @@ pub(crate) mod tests { use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; - use crate::{compaction::Compactor, executor::{tokio::TokioExecutor, Executor}, fs::FileId, inmem::{immutable::Immutable, mutable::Mutable}, record::Record, scope::Scope, tests::Test, version::{edit::VersionEdit, Version}, DbOption, WriteError}; - use crate::fs::FileProvider; - use crate::timestamp::Timestamp; - use crate::wal::log::LogType; + use crate::{ + compaction::Compactor, + executor::{tokio::TokioExecutor, Executor}, + fs::{FileId, FileProvider}, + inmem::{immutable::Immutable, mutable::Mutable}, + record::Record, + scope::Scope, + tests::Test, + timestamp::Timestamp, + version::{edit::VersionEdit, Version}, + wal::log::LogType, + DbOption, WriteError, + }; async fn build_immutable( option: &DbOption, @@ -471,53 +480,75 @@ pub(crate) mod tests { let temp_dir = tempfile::tempdir().unwrap(); let option = DbOption::from(temp_dir.path()); - let batch_1 = build_immutable::(&option, vec![ - (LogType::Full, - Test { - vstring: 3.to_string(), - vu32: 0, - vbool: None, - }, - 0.into()), - (LogType::Full, - Test { - vstring: 5.to_string(), - vu32: 0, - vbool: None, - }, - 0.into()), - (LogType::Full, - Test { - vstring: 6.to_string(), - vu32: 0, - vbool: None, - }, - 0.into()), - ]).await.unwrap(); - - let batch_2 = build_immutable::(&option, vec![ - (LogType::Full, - Test { - vstring: 4.to_string(), - vu32: 0, - vbool: None, - }, - 0.into()), - (LogType::Full, - Test { - vstring: 2.to_string(), - vu32: 0, - vbool: None, - }, - 0.into()), - (LogType::Full, - Test { - vstring: 1.to_string(), - vu32: 0, - vbool: None, - }, - 0.into()), - ]).await.unwrap(); + let batch_1 = build_immutable::( + &option, + vec![ + ( + LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ], + ) + .await + .unwrap(); + + let batch_2 = build_immutable::( + &option, + vec![ + ( + LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: None, + }, + 0.into(), + ), + ], + ) + .await + .unwrap(); let scope = Compactor::::minor_compaction( &DbOption::from(temp_dir.path()), @@ -592,54 +623,74 @@ pub(crate) mod tests { // level 0 let table_gen_1 = FileId::new(); let table_gen_2 = FileId::new(); - build_parquet_table::(option, table_gen_1, vec![ - (LogType::Full, - Test { - vstring: 1.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into()), - (LogType::Full, - Test { - vstring: 2.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into()), - (LogType::Full, - Test { - vstring: 3.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - ]) + build_parquet_table::( + option, + table_gen_1, + vec![ + ( + LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into(), + ), + ( + LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into(), + ), + ( + LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ], + ) .await .unwrap(); - build_parquet_table::(option, table_gen_2, vec![ - (LogType::Full, - Test { - vstring: 4.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into()), - (LogType::Full, - Test { - vstring: 5.to_string(), - vu32: 0, - vbool: Some(true), - }, - 1.into()), - (LogType::Full, - Test { - vstring: 6.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - ]) + build_parquet_table::( + option, + table_gen_2, + vec![ + ( + LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into(), + ), + ( + LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: Some(true), + }, + 1.into(), + ), + ( + LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ], + ) .await .unwrap(); @@ -647,79 +698,109 @@ pub(crate) mod tests { let table_gen_3 = FileId::new(); let table_gen_4 = FileId::new(); let table_gen_5 = FileId::new(); - build_parquet_table::(option, table_gen_3, vec![ - (LogType::Full, - Test { - vstring: 1.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - (LogType::Full, - Test { - vstring: 2.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - (LogType::Full, - Test { - vstring: 3.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - ]) + build_parquet_table::( + option, + table_gen_3, + vec![ + ( + LogType::Full, + Test { + vstring: 1.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 2.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 3.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ], + ) .await .unwrap(); - build_parquet_table::(option, table_gen_4, vec![ - (LogType::Full, - Test { - vstring: 4.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - (LogType::Full, - Test { - vstring: 5.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - (LogType::Full, - Test { - vstring: 6.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - ]) + build_parquet_table::( + option, + table_gen_4, + vec![ + ( + LogType::Full, + Test { + vstring: 4.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 5.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 6.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ], + ) .await .unwrap(); - build_parquet_table::(option, table_gen_5, vec![ - (LogType::Full, - Test { - vstring: 7.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - (LogType::Full, - Test { - vstring: 8.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - (LogType::Full, - Test { - vstring: 9.to_string(), - vu32: 0, - vbool: Some(true), - }, - 0.into()), - ]) + build_parquet_table::( + option, + table_gen_5, + vec![ + ( + LogType::Full, + Test { + vstring: 7.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 8.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ( + LogType::Full, + Test { + vstring: 9.to_string(), + vu32: 0, + vbool: Some(true), + }, + 0.into(), + ), + ], + ) .await .unwrap(); diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 1dfbd846..c9634c70 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -28,9 +28,9 @@ pub trait FileProvider { fn create_dir_all(path: impl AsRef) -> impl Future>; - fn open(path: impl AsRef) -> impl Future>; + fn open(path: impl AsRef + Send) -> impl Future> + Send; - fn remove(path: impl AsRef) -> impl Future>; + fn remove(path: impl AsRef + Send) -> impl Future> + Send; } impl Display for FileType { diff --git a/src/fs/tokio.rs b/src/fs/tokio.rs index 117df44e..a3e7110c 100644 --- a/src/fs/tokio.rs +++ b/src/fs/tokio.rs @@ -13,7 +13,7 @@ impl FileProvider for TokioExecutor { create_dir_all(path).await } - async fn open(path: impl AsRef) -> io::Result { + async fn open(path: impl AsRef + Send) -> io::Result { OpenOptions::new() .truncate(false) .create(true) @@ -24,7 +24,7 @@ impl FileProvider for TokioExecutor { .map(TokioAsyncReadCompatExt::compat) } - async fn remove(path: impl AsRef) -> io::Result<()> { + async fn remove(path: impl AsRef + Send) -> io::Result<()> { remove_file(path).await } } diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 42a44ca3..8925019e 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -10,11 +10,11 @@ use parquet::arrow::ProjectionMask; use super::mutable::Mutable; use crate::{ + fs::FileProvider, record::{internal::InternalRecordRef, Key, Record, RecordRef}, stream::record_batch::RecordBatchEntry, timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH}, }; -use crate::fs::FileProvider; pub trait ArrowArrays: Sized { type Record: Record; @@ -32,7 +32,7 @@ pub trait ArrowArrays: Sized { fn as_record_batch(&self) -> &RecordBatch; } -pub trait Builder +pub trait Builder: Send where S: ArrowArrays, { diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index dad68d9e..fe03bbc4 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -1,6 +1,5 @@ -use std::collections::BTreeMap; -use std::intrinsics::transmute; -use std::ops::Bound; +use std::{collections::BTreeMap, intrinsics::transmute, ops::Bound}; + use async_lock::Mutex; use crossbeam_skiplist::{ map::{Entry, Range}, @@ -8,16 +7,22 @@ use crossbeam_skiplist::{ }; use futures_util::io; use ulid::Ulid; -use crate::{DbOption, record::{KeyRef, Record}, timestamp::{ - timestamped::{Timestamped, TimestampedRef}, - Timestamp, EPOCH, -}, WriteError}; -use crate::fs::{FileId, FileProvider}; -use crate::inmem::immutable::{ArrowArrays, Builder, Immutable}; -use crate::record::Key; -use crate::wal::log::{Log, LogType}; -use crate::wal::record_entry::RecordEntry; -use crate::wal::WalFile; + +use crate::{ + fs::{FileId, FileProvider}, + inmem::immutable::{ArrowArrays, Builder, Immutable}, + record::{Key, KeyRef, Record}, + timestamp::{ + timestamped::{Timestamped, TimestampedRef}, + Timestamp, EPOCH, + }, + wal::{ + log::{Log, LogType}, + record_entry::RecordEntry, + WalFile, + }, + DbOption, WriteError, +}; pub(crate) type MutableScan<'scan, R> = Range< 'scan, @@ -37,7 +42,7 @@ where FP: FileProvider, { pub(crate) data: SkipMap, Option>, - wal: Mutex> + wal: Mutex>, } impl Mutable @@ -61,23 +66,46 @@ where R: Record + Send, FP: FileProvider, { - pub(crate) async fn insert(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result> { - self.append(log_ty, record.key().to_key(), ts, Some(record), false).await + pub(crate) async fn insert( + &self, + log_ty: LogType, + record: R, + ts: Timestamp, + ) -> Result> { + self.append(log_ty, record.key().to_key(), ts, Some(record), false) + .await } - pub(crate) async fn remove(&self, log_ty: LogType, key: R::Key, ts: Timestamp) -> Result> { + pub(crate) async fn remove( + &self, + log_ty: LogType, + key: R::Key, + ts: Timestamp, + ) -> Result> { self.append(log_ty, key, ts, None, false).await } - async fn append(&self, log_ty: LogType, key: R::Key, ts: Timestamp, value: Option, is_recover: bool) -> Result> { + async fn append( + &self, + log_ty: LogType, + key: R::Key, + ts: Timestamp, + value: Option, + is_recover: bool, + ) -> Result> { let timestamped_key = Timestamped::new(key, ts); if !is_recover { let mut wal_guard = self.wal.lock().await; wal_guard - .write(log_ty, timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }), value.as_ref().map(R::as_record_ref)) - .await.unwrap(); + .write( + log_ty, + timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }), + value.as_ref().map(R::as_record_ref), + ) + .await + .unwrap(); } self.data.insert(timestamped_key, value); @@ -151,9 +179,14 @@ mod tests { use std::ops::Bound; use super::Mutable; - use crate::{DbOption, record::Record, tests::{Test, TestRef}, timestamp::Timestamped}; - use crate::executor::tokio::TokioExecutor; - use crate::wal::log::LogType; + use crate::{ + executor::tokio::TokioExecutor, + record::Record, + tests::{Test, TestRef}, + timestamp::Timestamped, + wal::log::LogType, + DbOption, + }; #[tokio::test] async fn insert_and_get() { @@ -161,26 +194,34 @@ mod tests { let key_2 = "key_2".to_owned(); let temp_dir = tempfile::tempdir().unwrap(); - let mem_table = Mutable::::new(&DbOption::from(temp_dir.path())).await.unwrap(); + let mem_table = Mutable::::new(&DbOption::from(temp_dir.path())) + .await + .unwrap(); - mem_table.insert( - LogType::Full, - Test { - vstring: key_1.clone(), - vu32: 1, - vbool: Some(true), - }, - 0_u32.into(), - ).await.unwrap(); - mem_table.insert( - LogType::Full, - Test { - vstring: key_2.clone(), - vu32: 2, - vbool: None, - }, - 1_u32.into(), - ).await.unwrap(); + mem_table + .insert( + LogType::Full, + Test { + vstring: key_1.clone(), + vu32: 1, + vbool: Some(true), + }, + 0_u32.into(), + ) + .await + .unwrap(); + mem_table + .insert( + LogType::Full, + Test { + vstring: key_2.clone(), + vu32: 2, + vbool: None, + }, + 1_u32.into(), + ) + .await + .unwrap(); let entry = mem_table.get(&key_1, 0_u32.into()).unwrap(); assert_eq!( @@ -196,13 +237,30 @@ mod tests { #[tokio::test] async fn range() { let temp_dir = tempfile::tempdir().unwrap(); - let mutable = Mutable::::new(&DbOption::from(temp_dir.path())).await.unwrap(); + let mutable = Mutable::::new(&DbOption::from(temp_dir.path())) + .await + .unwrap(); - mutable.insert(LogType::Full,"1".into(), 0_u32.into()).await.unwrap(); - mutable.insert(LogType::Full,"2".into(), 0_u32.into()).await.unwrap(); - mutable.insert(LogType::Full,"2".into(), 1_u32.into()).await.unwrap(); - mutable.insert(LogType::Full,"3".into(), 1_u32.into()).await.unwrap(); - mutable.insert(LogType::Full,"4".into(), 0_u32.into()).await.unwrap(); + mutable + .insert(LogType::Full, "1".into(), 0_u32.into()) + .await + .unwrap(); + mutable + .insert(LogType::Full, "2".into(), 0_u32.into()) + .await + .unwrap(); + mutable + .insert(LogType::Full, "2".into(), 1_u32.into()) + .await + .unwrap(); + mutable + .insert(LogType::Full, "3".into(), 1_u32.into()) + .await + .unwrap(); + mutable + .insert(LogType::Full, "4".into(), 0_u32.into()) + .await + .unwrap(); let mut scan = mutable.scan((Bound::Unbounded, Bound::Unbounded), 0_u32.into()); diff --git a/src/lib.rs b/src/lib.rs index 2ddfeee5..806c9eb9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,13 +36,13 @@ use transaction::Transaction; pub use crate::option::*; use crate::{ + compaction::{CompactTask, Compactor}, executor::Executor, + fs::FileId, stream::{merge::MergeStream, Entry, ScanStream}, version::{cleaner::Cleaner, set::VersionSet, Version, VersionError}, + wal::log::LogType, }; -use crate::compaction::{Compactor, CompactTask}; -use crate::fs::FileId; -use crate::wal::log::LogType; pub struct DB where @@ -59,7 +59,7 @@ impl DB where R: Record + Send + Sync, R::Columns: Send + Sync, - E: Executor +Send + Sync + 'static, + E: Executor + Send + Sync + 'static, { pub async fn new(option: DbOption, executor: E) -> Result> { let option = Arc::new(option); @@ -71,11 +71,8 @@ where let (mut cleaner, clean_sender) = Cleaner::new(option.clone()); let version_set = VersionSet::new(clean_sender, option.clone()).await?; - let mut compactor = Compactor::::new( - schema.clone(), - option.clone(), - version_set.clone(), - ); + let mut compactor = + Compactor::::new(schema.clone(), option.clone(), version_set.clone()); executor.spawn(async move { if let Err(err) = cleaner.listen().await { @@ -85,9 +82,10 @@ where executor.spawn(async move { while let Ok(task) = task_rx.recv_async().await { match task { - CompactTask::Flush => { + CompactTask::Flush => { if let Err(err) = compactor.check_then_compaction().await { - error!("[Compaction Error]: {}", err) + todo!(); + // error!("[Compaction Error]: {}", err) } } } @@ -137,7 +135,9 @@ where let mut last_buf = record; while let Some(record) = records.next() { - schema.write(LogType::Middle, mem::replace(&mut last_buf, record), ts).await?; + schema + .write(LogType::Middle, mem::replace(&mut last_buf, record), ts) + .await?; } schema.write(LogType::Last, last_buf, ts).await?; } else { @@ -171,19 +171,29 @@ where FP: FileProvider, { async fn new(option: Arc, compaction_tx: Sender) -> io::Result { - Ok(Schema { - mutable: Mutable::new(&option).await?, - immutables: Default::default(), - compaction_tx, - option, - }) + Ok(Schema { + mutable: Mutable::new(&option).await?, + immutables: Default::default(), + compaction_tx, + option, + }) } - async fn write(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result> { + async fn write( + &self, + log_ty: LogType, + record: R, + ts: Timestamp, + ) -> Result> { Ok(self.mutable.insert(log_ty, record, ts).await? > self.option.max_mem_table_size) } - async fn remove(&self, log_ty: LogType, key: R::Key, ts: Timestamp) -> Result> { + async fn remove( + &self, + log_ty: LogType, + key: R::Key, + ts: Timestamp, + ) -> Result> { Ok(self.mutable.remove(log_ty, key, ts).await? > self.option.max_mem_table_size) } @@ -360,16 +370,23 @@ pub(crate) mod tests { use tracing::error; use crate::{ + compaction::{CompactTask, Compactor}, executor::{tokio::TokioExecutor, Executor}, - inmem::{immutable::tests::TestImmutableArrays, mutable::Mutable}, - record::{internal::InternalRecordRef, RecordDecodeError, RecordEncodeError, RecordRef}, - serdes::{Decode, Encode}, + fs::FileId, + inmem::{ + immutable::{ArrowArrays, Builder}, + mutable::Mutable, + }, + record::{internal::InternalRecordRef, Key, RecordRef}, + serdes::{ + option::{DecodeError, EncodeError}, + Decode, Encode, + }, + timestamp::Timestamped, version::{cleaner::Cleaner, set::tests::build_version_set, Version}, + wal::log::LogType, DbOption, Immutable, Record, WriteError, DB, }; - use crate::compaction::{Compactor, CompactTask}; - use crate::fs::FileId; - use crate::wal::log::LogType; #[derive(Debug, PartialEq, Eq)] pub struct Test { @@ -588,82 +605,105 @@ pub(crate) mod tests { let mutable = mem::replace(&mut schema.mutable, Mutable::new(&option).await.unwrap()); - Immutable::<::Columns>::from(mutable.data).as_record_batch().clone() + Immutable::<::Columns>::from(mutable.data) + .as_record_batch() + .clone() } pub(crate) async fn build_schema(option: DbOption) -> io::Result<(crate::Schema, Receiver)> { let mutable = Mutable::new(&option).await?; - mutable.insert( - LogType::Full, - Test { - vstring: "alice".to_string(), - vu32: 1, - vbool: Some(true), - }, - 1_u32.into(), - ).await.unwrap(); - mutable.insert( - LogType::Full, - Test { - vstring: "ben".to_string(), - vu32: 2, - vbool: Some(true), - }, - 1_u32.into(), - ).await.unwrap(); - mutable.insert( - LogType::Full, - Test { - vstring: "carl".to_string(), - vu32: 3, - vbool: Some(true), - }, - 1_u32.into(), - ).await.unwrap(); - - let immutables = { - let mutable: Mutable = Mutable::new(&option).await?; - - mutable.insert( + mutable + .insert( LogType::Full, Test { - vstring: "dice".to_string(), - vu32: 4, + vstring: "alice".to_string(), + vu32: 1, vbool: Some(true), }, 1_u32.into(), - ).await.unwrap(); - mutable.insert( + ) + .await + .unwrap(); + mutable + .insert( LogType::Full, Test { - vstring: "erika".to_string(), - vu32: 5, + vstring: "ben".to_string(), + vu32: 2, vbool: Some(true), }, 1_u32.into(), - ).await.unwrap(); - mutable.insert( + ) + .await + .unwrap(); + mutable + .insert( LogType::Full, Test { - vstring: "funk".to_string(), - vu32: 6, + vstring: "carl".to_string(), + vu32: 3, vbool: Some(true), }, 1_u32.into(), - ).await.unwrap(); + ) + .await + .unwrap(); + + let immutables = { + let mutable: Mutable = Mutable::new(&option).await?; + + mutable + .insert( + LogType::Full, + Test { + vstring: "dice".to_string(), + vu32: 4, + vbool: Some(true), + }, + 1_u32.into(), + ) + .await + .unwrap(); + mutable + .insert( + LogType::Full, + Test { + vstring: "erika".to_string(), + vu32: 5, + vbool: Some(true), + }, + 1_u32.into(), + ) + .await + .unwrap(); + mutable + .insert( + LogType::Full, + Test { + vstring: "funk".to_string(), + vu32: 6, + vbool: Some(true), + }, + 1_u32.into(), + ) + .await + .unwrap(); VecDeque::from(vec![(FileId::new(), Immutable::from(mutable.data))]) }; let (compaction_tx, compaction_rx) = bounded(1); - Ok((crate::Schema { - mutable, - immutables, - compaction_tx, - option, - }, compaction_rx)) + Ok(( + crate::Schema { + mutable, + immutables, + compaction_tx, + option, + }, + compaction_rx, + )) } pub(crate) async fn build_db( @@ -676,7 +716,7 @@ pub(crate) mod tests { where R: Record + Send + Sync, R::Columns: Send + Sync, - E: Executor +Send + Sync + 'static, + E: Executor + Send + Sync + 'static, { E::create_dir_all(&option.path).await?; @@ -684,11 +724,8 @@ pub(crate) mod tests { let (mut cleaner, clean_sender) = Cleaner::new(option.clone()); let version_set = build_version_set(version, clean_sender, option.clone()).await?; - let mut compactor = Compactor::::new( - schema.clone(), - option.clone(), - version_set.clone(), - ); + let mut compactor = + Compactor::::new(schema.clone(), option.clone(), version_set.clone()); executor.spawn(async move { if let Err(err) = cleaner.listen().await { @@ -698,7 +735,7 @@ pub(crate) mod tests { executor.spawn(async move { while let Ok(task) = compaction_rx.recv_async().await { match task { - CompactTask::Flush => { + CompactTask::Flush => { if let Err(err) = compactor.check_then_compaction().await { error!("[Compaction Error]: {}", err) } diff --git a/src/record/mod.rs b/src/record/mod.rs index ca17f5a3..062bfe12 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -34,7 +34,7 @@ pub trait KeyRef<'r>: Clone + Encode + PartialEq + Ord { fn to_key(&self) -> Self::Key; } -pub trait Record: 'static + Sized + Decode + Debug { +pub trait Record: 'static + Sized + Decode + Debug + Send + Sync { type Columns: ArrowArrays; type Key: Key; @@ -54,7 +54,7 @@ pub trait Record: 'static + Sized + Decode + Debug { fn arrow_schema() -> &'static Arc; } -pub trait RecordRef<'r>: Clone + Sized + Copy + Encode { +pub trait RecordRef<'r>: Clone + Sized + Copy + Encode + Send + Sync { type Record: Record; fn key(self) -> <::Key as Key>::Ref<'r>; diff --git a/src/scope.rs b/src/scope.rs index 42b426f6..9824d340 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -53,7 +53,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { self.min.encode(writer).await?; self.max.encode(writer).await?; diff --git a/src/serdes/arc.rs b/src/serdes/arc.rs index 06ba9730..2b7a0809 100644 --- a/src/serdes/arc.rs +++ b/src/serdes/arc.rs @@ -26,7 +26,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { self.as_ref().encode(writer).await } diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs index 6ee5a4a1..e70c9192 100644 --- a/src/serdes/mod.rs +++ b/src/serdes/mod.rs @@ -8,12 +8,12 @@ use std::{future::Future, io}; use futures_io::{AsyncRead, AsyncWrite}; -pub trait Encode { +pub trait Encode: Send + Sync { type Error: From + std::error::Error + Send + Sync + 'static; - fn encode(&self, writer: &mut W) -> impl Future> + fn encode(&self, writer: &mut W) -> impl Future> + Send where - W: AsyncWrite + Unpin; + W: AsyncWrite + Unpin + Send; fn size(&self) -> usize; } @@ -23,7 +23,7 @@ impl Encode for &T { async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { Encode::encode(*self, writer).await } diff --git a/src/serdes/option.rs b/src/serdes/option.rs index 4e482d1e..987168ca 100644 --- a/src/serdes/option.rs +++ b/src/serdes/option.rs @@ -38,7 +38,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { match self { None => writer.write_all(&[0]).await?, diff --git a/src/serdes/string.rs b/src/serdes/string.rs index 2a7abaa1..36f9c9d6 100644 --- a/src/serdes/string.rs +++ b/src/serdes/string.rs @@ -26,7 +26,7 @@ impl Encode for String { async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { self.as_str().encode(writer).await } diff --git a/src/stream/level.rs b/src/stream/level.rs index 531a07d5..eb5452ee 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -28,9 +28,15 @@ where { Init(FileId), Ready(SsTableScan<'level, R, FP>), - OpenFile(Pin> + 'level>>), + OpenFile(Pin> + Send + 'level>>), LoadStream( - Pin, ParquetError>> + 'level>>, + Pin< + Box< + dyn Future, ParquetError>> + + Send + + 'level, + >, + >, ), } diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 19e43b89..c38619d8 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -138,26 +138,47 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::{DbOption, executor::tokio::TokioExecutor, inmem::mutable::Mutable}; - use crate::wal::log::LogType; + use crate::{ + executor::tokio::TokioExecutor, inmem::mutable::Mutable, wal::log::LogType, DbOption, + }; #[tokio::test] async fn merge_mutable() { let temp_dir = tempfile::tempdir().unwrap(); let option = DbOption::from(temp_dir.path()); - let m1 = Mutable::::new(&option).await.unwrap(); - m1.remove(LogType::Full,"b".into(), 3.into()).await.unwrap(); - m1.insert(LogType::Full,"c".into(), 4.into()).await.unwrap(); - m1.insert(LogType::Full,"d".into(), 5.into()).await.unwrap(); - - let m2 = Mutable::::new(&option).await.unwrap(); - m2.insert(LogType::Full,"a".into(), 1.into()).await.unwrap(); - m2.insert(LogType::Full,"b".into(), 2.into()).await.unwrap(); - m2.insert(LogType::Full,"c".into(), 3.into()).await.unwrap(); - - let m3 = Mutable::::new(&option).await.unwrap(); - m3.insert(LogType::Full,"e".into(), 4.into()).await.unwrap(); + let m1 = Mutable::::new(&option) + .await + .unwrap(); + m1.remove(LogType::Full, "b".into(), 3.into()) + .await + .unwrap(); + m1.insert(LogType::Full, "c".into(), 4.into()) + .await + .unwrap(); + m1.insert(LogType::Full, "d".into(), 5.into()) + .await + .unwrap(); + + let m2 = Mutable::::new(&option) + .await + .unwrap(); + m2.insert(LogType::Full, "a".into(), 1.into()) + .await + .unwrap(); + m2.insert(LogType::Full, "b".into(), 2.into()) + .await + .unwrap(); + m2.insert(LogType::Full, "c".into(), 3.into()) + .await + .unwrap(); + + let m3 = Mutable::::new(&option) + .await + .unwrap(); + m3.insert(LogType::Full, "e".into(), 4.into()) + .await + .unwrap(); let lower = "a".to_string(); let upper = "e".to_string(); @@ -183,12 +204,24 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let option = DbOption::from(temp_dir.path()); - let m1 = Mutable::::new(&option).await.unwrap(); - m1.insert(LogType::Full,"1".into(), 0_u32.into()).await.unwrap(); - m1.insert(LogType::Full,"2".into(), 0_u32.into()).await.unwrap(); - m1.insert(LogType::Full,"2".into(), 1_u32.into()).await.unwrap(); - m1.insert(LogType::Full,"3".into(), 1_u32.into()).await.unwrap(); - m1.insert(LogType::Full,"4".into(), 0_u32.into()).await.unwrap(); + let m1 = Mutable::::new(&option) + .await + .unwrap(); + m1.insert(LogType::Full, "1".into(), 0_u32.into()) + .await + .unwrap(); + m1.insert(LogType::Full, "2".into(), 0_u32.into()) + .await + .unwrap(); + m1.insert(LogType::Full, "2".into(), 1_u32.into()) + .await + .unwrap(); + m1.insert(LogType::Full, "3".into(), 1_u32.into()) + .await + .unwrap(); + m1.insert(LogType::Full, "4".into(), 0_u32.into()) + .await + .unwrap(); let lower = "1".to_string(); let upper = "4".to_string(); diff --git a/src/timestamp/mod.rs b/src/timestamp/mod.rs index 76d457b0..7827ac1f 100644 --- a/src/timestamp/mod.rs +++ b/src/timestamp/mod.rs @@ -37,11 +37,11 @@ impl Timestamp { impl Encode for Timestamp { type Error = io::Error; - fn encode(&self, writer: &mut W) -> impl Future> + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { - self.0.encode(writer) + self.0.encode(writer).await } fn size(&self) -> usize { self.0.size() diff --git a/src/timestamp/timestamped.rs b/src/timestamp/timestamped.rs index 8785bbb4..af0a3e79 100644 --- a/src/timestamp/timestamped.rs +++ b/src/timestamp/timestamped.rs @@ -145,7 +145,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { self.ts.encode(writer).await?; self.value.encode(writer).await diff --git a/src/transaction.rs b/src/transaction.rs index 80c9274d..78dd794e 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -18,9 +18,9 @@ use crate::{ stream, timestamp::{Timestamp, Timestamped}, version::{set::transaction_ts, VersionRef}, + wal::log::LogType, LockMap, Projection, Record, Scan, Schema, WriteError, }; -use crate::wal::log::LogType; pub(crate) struct TransactionScan<'scan, R: Record> { inner: Range<'scan, R::Key, Option>, @@ -143,7 +143,7 @@ where 0 => false, 1 => { let new_ts = transaction_ts(); - let (key ,record) = self.local.pop_first().unwrap(); + let (key, record) = self.local.pop_first().unwrap(); Self::append(&self.share, LogType::Full, key, record, new_ts).await? } _ => { @@ -166,7 +166,13 @@ where Ok(()) } - async fn append(schema: &Schema, log_ty: LogType, key: ::Key, record: Option, new_ts: Timestamp) -> Result> { + async fn append( + schema: &Schema, + log_ty: LogType, + key: ::Key, + record: Option, + new_ts: Timestamp, + ) -> Result> { Ok(match record { Some(record) => schema.write(log_ty, record, new_ts).await?, None => schema.remove(log_ty, key, new_ts).await?, diff --git a/src/version/edit.rs b/src/version/edit.rs index 357b5ce1..889711a1 100644 --- a/src/version/edit.rs +++ b/src/version/edit.rs @@ -40,7 +40,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { match self { VersionEdit::Add { scope, level } => { diff --git a/src/wal/log.rs b/src/wal/log.rs index 2b1a43bc..7309297d 100644 --- a/src/wal/log.rs +++ b/src/wal/log.rs @@ -46,7 +46,7 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin, + W: AsyncWrite + Unpin + Send, { writer.write_all(&[self.log_type as u8]).await?; self.record.encode(writer).await diff --git a/src/wal/mod.rs b/src/wal/mod.rs index 71174899..26d80bbd 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -17,13 +17,11 @@ use thiserror::Error; use crate::{ fs::FileId, - record::Record, + record::{Key, Record}, serdes::{Decode, Encode}, timestamp::Timestamped, + wal::{log::LogType, record_entry::RecordEntry}, }; -use crate::record::Key; -use crate::wal::log::LogType; -use crate::wal::record_entry::RecordEntry; #[derive(Debug)] pub(crate) struct WalFile { @@ -48,7 +46,7 @@ impl WalFile { impl WalFile where - F: AsyncWrite + Unpin, + F: AsyncWrite + Unpin + Send, R: Record, { pub(crate) async fn write<'r>( @@ -58,7 +56,9 @@ where value: Option>, ) -> Result<(), as Encode>::Error> { let mut writer = HashWriter::new(&mut self.file); - Log::new(log_ty, RecordEntry::::Encode((key, value))).encode(&mut writer).await?; + Log::new(log_ty, RecordEntry::::Encode((key, value))) + .encode(&mut writer) + .await?; writer.eol().await?; Ok(()) } @@ -75,8 +75,12 @@ where { fn recover( &mut self, - ) -> impl Stream, Option), RecoverError<::Error>>> + '_ - { + ) -> impl Stream< + Item = Result< + (LogType, Timestamped, Option), + RecoverError<::Error>, + >, + > + '_ { stream! { let mut file = BufReader::new(&mut self.file); @@ -121,17 +125,20 @@ mod tests { use tokio_util::compat::TokioAsyncReadCompatExt; use super::{log::LogType, FileId, Log, WalFile}; - use crate::timestamp::Timestamped; - use crate::wal::record_entry::RecordEntry; + use crate::{timestamp::Timestamped, wal::record_entry::RecordEntry}; #[tokio::test] async fn write_and_recover() { let mut file = Vec::new(); { let mut wal = WalFile::<_, String>::new(Cursor::new(&mut file).compat(), FileId::new()); - wal.write(LogType::Full, Timestamped::new("hello", 0.into()), Some("hello")) - .await - .unwrap(); + wal.write( + LogType::Full, + Timestamped::new("hello", 0.into()), + Some("hello"), + ) + .await + .unwrap(); wal.flush().await.unwrap(); } { @@ -144,9 +151,13 @@ mod tests { assert_eq!(value, Some("hello".to_string())); } - wal.write(LogType::Full, Timestamped::new("world", 1.into()), Some("world")) - .await - .unwrap(); + wal.write( + LogType::Full, + Timestamped::new("world", 1.into()), + Some("world"), + ) + .await + .unwrap(); } { diff --git a/src/wal/record_entry.rs b/src/wal/record_entry.rs index f284ee0c..a63e0880 100644 --- a/src/wal/record_entry.rs +++ b/src/wal/record_entry.rs @@ -1,12 +1,16 @@ use std::io; + use futures_io::{AsyncRead, AsyncWrite}; -use crate::record::{Key, Record}; -use crate::serdes::{Decode, Encode}; -use crate::timestamp::Timestamped; + +use crate::{ + record::{Key, Record}, + serdes::{Decode, Encode}, + timestamp::Timestamped, +}; pub(crate) enum RecordEntry<'r, R> where - R: Record + R: Record, { Encode((Timestamped<::Ref<'r>>, Option>)), Decode((Timestamped, Option)), @@ -20,20 +24,20 @@ where async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where - W: AsyncWrite + Unpin + W: AsyncWrite + Unpin + Send, { if let RecordEntry::Encode((key, recode_ref)) = self { key.encode(writer).await.unwrap(); recode_ref.encode(writer).await.unwrap(); - return Ok(()) + return Ok(()); } unreachable!() } fn size(&self) -> usize { if let RecordEntry::Encode((key, recode_ref)) = self { - return key.size() + recode_ref.size() + return key.size() + recode_ref.size(); } unreachable!() } @@ -47,7 +51,7 @@ where async fn decode(reader: &mut R) -> Result where - R: AsyncRead + Unpin + R: AsyncRead + Unpin, { let key = Timestamped::::decode(reader).await.unwrap(); let record = Option::::decode(reader).await.unwrap(); @@ -59,13 +63,17 @@ where #[cfg(test)] mod tests { use futures_util::io::Cursor; - use crate::serdes::{Decode, Encode}; - use crate::timestamp::Timestamped; - use crate::wal::record_entry::RecordEntry; + + use crate::{ + serdes::{Decode, Encode}, + timestamp::Timestamped, + wal::record_entry::RecordEntry, + }; #[tokio::test] async fn encode_and_decode() { - let entry: RecordEntry<'static, String> = RecordEntry::Encode((Timestamped::new("hello", 0.into()), Some("hello"))); + let entry: RecordEntry<'static, String> = + RecordEntry::Encode((Timestamped::new("hello", 0.into()), Some("hello"))); let bytes = { let mut cursor = Cursor::new(vec![]); @@ -76,10 +84,14 @@ mod tests { let decode_entry = { let mut cursor = Cursor::new(bytes); - RecordEntry::<'static, String>::decode(&mut cursor).await.unwrap() + RecordEntry::<'static, String>::decode(&mut cursor) + .await + .unwrap() }; - if let (RecordEntry::Encode((key_1, value_1)), RecordEntry::Decode((key_2, value_2))) = (entry, decode_entry) { + if let (RecordEntry::Encode((key_1, value_1)), RecordEntry::Decode((key_2, value_2))) = + (entry, decode_entry) + { assert_eq!(key_1.value, key_2.value.as_str()); assert_eq!(key_1.ts, key_2.ts); assert_eq!(value_1, value_2.as_ref().map(String::as_str)); @@ -88,4 +100,4 @@ mod tests { } unreachable!() } -} \ No newline at end of file +}