diff --git a/src/lib.rs b/src/lib.rs index 94435155..b8fb68e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,25 +208,26 @@ where async fn get<'get>( &'get self, + version: &'get Version, key: &'get R::Key, ts: Timestamp, projection: Projection, - ) -> Result>, ParquetError> { - let mut scan = self.scan(Bound::Included(key), Bound::Unbounded, ts); + ) -> Result>, WriteError> + where + FP: FileProvider, + { + let mut scan = Scan::new( + self, + (Bound::Included(key), Bound::Unbounded), + ts, + version, + vec![], + ); - if let Projection::Parts(projection) = projection { - scan = scan.projection(projection) + if let Projection::Parts(mask) = projection { + scan = scan.projection(mask); } - scan.take().await?.next().await.transpose() - } - - fn scan<'scan>( - &'scan self, - lower: Bound<&'scan R::Key>, - uppwer: Bound<&'scan R::Key>, - ts: Timestamp, - ) -> Scan<'scan, R, FP> { - Scan::new(self, lower, uppwer, ts) + Ok(scan.take().await?.next().await.transpose()?) } fn check_conflict(&self, key: &R::Key, ts: Timestamp) -> bool { @@ -251,9 +252,13 @@ where { schema: &'scan Schema, lower: Bound<&'scan R::Key>, - uppwer: Bound<&'scan R::Key>, + upper: Bound<&'scan R::Key>, ts: Timestamp, + version: &'scan Version, + streams: Vec>, + + limit: Option, projection: ProjectionMask, } @@ -264,25 +269,33 @@ where { fn new( schema: &'scan Schema, - lower: Bound<&'scan R::Key>, - uppwer: Bound<&'scan R::Key>, + (lower, upper): (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, + version: &'scan Version, + streams: Vec>, ) -> Self { Self { schema, lower, - uppwer, + upper, ts, + version, + streams, + limit: None, projection: ProjectionMask::all(), } } + pub fn limit(self, limit: Option) -> Self { + Self { limit, ..self } + } + pub fn projection(self, mut projection: Vec) -> Self { // skip two columns: _null and _ts for p in &mut projection { *p += 2; } - + projection.extend([0, 1, 2]); let mask = ProjectionMask::roots( &arrow_to_parquet_schema(R::arrow_schema()).unwrap(), projection, @@ -295,25 +308,32 @@ where } pub async fn take( - self, - ) -> Result, ParquetError>>, ParquetError> { - let mut streams = Vec::>::with_capacity(self.schema.immutables.len() + 1); - streams.push( + mut self, + ) -> Result, ParquetError>>, WriteError> { + self.streams.push( self.schema .mutable - .scan((self.lower, self.uppwer), self.ts) + .scan((self.lower, self.upper), self.ts) .into(), ); for immutable in &self.schema.immutables { - streams.push( + self.streams.push( immutable - .scan((self.lower, self.uppwer), self.ts, self.projection.clone()) + .scan((self.lower, self.upper), self.ts, self.projection.clone()) .into(), ); } - // TODO: sstable scan - - MergeStream::from_vec(streams).await + self.version + .streams( + &mut self.streams, + (self.lower, self.upper), + self.ts, + self.limit, + self.projection, + ) + .await?; + + Ok(MergeStream::from_vec(self.streams).await?) } } @@ -326,24 +346,32 @@ where Io(#[from] io::Error), #[error("write version error: {0}")] Version(#[from] VersionError), + #[error("write parquet error: {0}")] + Parquet(#[from] ParquetError), } #[cfg(test)] pub(crate) mod tests { - use std::sync::Arc; + use std::{collections::VecDeque, sync::Arc}; use arrow::{ array::{Array, AsArray, RecordBatch}, datatypes::{DataType, Field, Schema, UInt32Type}, }; + use async_lock::RwLock; use once_cell::sync::Lazy; use parquet::arrow::ProjectionMask; + use tracing::error; use crate::{ - executor::Executor, - inmem::immutable::tests::TestImmutableArrays, + executor::{tokio::TokioExecutor, Executor}, + inmem::{ + immutable::{tests::TestImmutableArrays, Immutable}, + mutable::Mutable, + }, record::{internal::InternalRecordRef, RecordRef}, - DbOption, Record, DB, + version::{cleaner::Cleaner, set::tests::build_version_set, Version}, + DbOption, Record, WriteError, DB, }; #[derive(Debug, PartialEq, Eq)] @@ -438,6 +466,7 @@ pub(crate) mod tests { if !vbool_array.is_null(offset) { vbool = Some(vbool_array.value(offset)); } + column_i += 1; } let record = TestRef { @@ -482,4 +511,101 @@ pub(crate) mod tests { schema.immutables[0].as_record_batch().clone() } + + pub(crate) async fn build_schema() -> crate::Schema { + let mutable = Mutable::new(); + + mutable.insert( + Test { + vstring: "alice".to_string(), + vu32: 1, + vobool: Some(true), + }, + 1_u32.into(), + ); + mutable.insert( + Test { + vstring: "ben".to_string(), + vu32: 2, + vobool: Some(true), + }, + 1_u32.into(), + ); + mutable.insert( + Test { + vstring: "carl".to_string(), + vu32: 3, + vobool: Some(true), + }, + 1_u32.into(), + ); + + let immutables = { + let mutable = Mutable::new(); + + mutable.insert( + Test { + vstring: "dice".to_string(), + vu32: 4, + vobool: Some(true), + }, + 1_u32.into(), + ); + mutable.insert( + Test { + vstring: "erika".to_string(), + vu32: 5, + vobool: Some(true), + }, + 1_u32.into(), + ); + mutable.insert( + Test { + vstring: "funk".to_string(), + vu32: 6, + vobool: Some(true), + }, + 1_u32.into(), + ); + + VecDeque::from(vec![Immutable::from(mutable)]) + }; + + crate::Schema { + mutable, + immutables, + _marker: Default::default(), + } + } + + pub(crate) async fn build_db( + option: Arc, + executor: E, + schema: crate::Schema, + version: Version, + ) -> Result, WriteError> + where + R: Record, + E: Executor, + { + E::create_dir_all(&option.path).await?; + + let schema = Arc::new(RwLock::new(schema)); + + let (mut cleaner, clean_sender) = Cleaner::new(option.clone()); + let version_set = build_version_set(version, clean_sender, option.clone()).await?; + + executor.spawn(async move { + if let Err(err) = cleaner.listen().await { + error!("[Cleaner Error]: {}", err) + } + }); + + Ok(DB { + schema, + version_set, + lock_map: Arc::new(Default::default()), + _p: Default::default(), + }) + } } diff --git a/src/record/mod.rs b/src/record/mod.rs index a0d8da92..53c326d9 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -1,7 +1,7 @@ pub(crate) mod internal; mod str; -use std::{hash::Hash, sync::Arc}; +use std::{fmt::Debug, hash::Hash, sync::Arc}; use arrow::{ array::{Datum, RecordBatch}, @@ -16,7 +16,7 @@ use crate::{ }; pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Hash + std::fmt::Debug { - type Ref<'r>: KeyRef<'r, Key = Self> + Copy + type Ref<'r>: KeyRef<'r, Key = Self> + Copy + Debug where Self: 'r; diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 739ad469..705ce95f 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -13,7 +13,7 @@ use super::{Entry, ScanStream}; use crate::{fs::FileProvider, record::Record}; pin_project! { - pub(crate) struct MergeStream<'merge, R, FP> + pub struct MergeStream<'merge, R, FP> where R: Record, FP: FileProvider, diff --git a/src/stream/mod.rs b/src/stream/mod.rs index b021c700..ed2b50a4 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -21,12 +21,14 @@ use crate::{ record::{Key, Record}, stream::level::LevelStream, timestamp::Timestamped, + transaction::TransactionScan, }; pub enum Entry<'entry, R> where R: Record, { + Transaction((Timestamped<::Ref<'entry>>, &'entry Option)), Mutable(crossbeam_skiplist::map::Entry<'entry, Timestamped, Option>), Immutable(RecordBatchEntry), SsTable(RecordBatchEntry), @@ -39,6 +41,10 @@ where { pub(crate) fn key(&self) -> Timestamped<::Ref<'_>> { match self { + Entry::Transaction((key, _)) => { + // Safety: shorter lifetime must be safe + unsafe { transmute(*key) } + } Entry::Mutable(entry) => entry.key().map(|key| { // Safety: shorter lifetime must be safe unsafe { transmute(key.as_key_ref()) } @@ -51,6 +57,7 @@ where pub(crate) fn value(&self) -> R::Ref<'_> { match self { + Entry::Transaction((_, value)) => value.as_ref().map(R::as_record_ref).unwrap(), Entry::Mutable(entry) => entry.value().as_ref().map(R::as_record_ref).unwrap(), Entry::SsTable(entry) => entry.get(), Entry::Immutable(entry) => entry.get(), @@ -66,6 +73,9 @@ where { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { + Entry::Transaction((key, value)) => { + write!(f, "Entry::Transaction({:?} -> {:?})", key, value) + } Entry::Mutable(mutable) => write!( f, "Entry::Mutable({:?} -> {:?})", @@ -86,6 +96,10 @@ pin_project! { R: Record, FP: FileProvider, { + Transaction { + #[pin] + inner: stream::Iter>, + }, Mutable { #[pin] inner: stream::Iter>, @@ -105,6 +119,18 @@ pin_project! { } } +impl<'scan, R, FP> From> for ScanStream<'scan, R, FP> +where + R: Record, + FP: FileProvider, +{ + fn from(inner: TransactionScan<'scan, R>) -> Self { + ScanStream::Transaction { + inner: stream::iter(inner), + } + } +} + impl<'scan, R, FP> From> for ScanStream<'scan, R, FP> where R: Record, @@ -146,6 +172,7 @@ where { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { + ScanStream::Transaction { .. } => write!(f, "ScanStream::Transaction"), ScanStream::Mutable { .. } => write!(f, "ScanStream::Mutable"), ScanStream::SsTable { .. } => write!(f, "ScanStream::SsTable"), ScanStream::Immutable { .. } => write!(f, "ScanStream::Immutable"), @@ -163,6 +190,9 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { + ScanStreamProject::Transaction { inner } => { + Poll::Ready(ready!(inner.poll_next(cx)).map(Entry::Transaction).map(Ok)) + } ScanStreamProject::Mutable { inner } => { Poll::Ready(ready!(inner.poll_next(cx)).map(Entry::Mutable).map(Ok)) } diff --git a/src/transaction.rs b/src/transaction.rs index 84008ae9..83114d05 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,5 +1,8 @@ use std::{ - collections::{btree_map::Entry, BTreeMap}, + collections::{ + btree_map::{Entry, Range}, + BTreeMap, Bound, + }, io, mem::transmute, }; @@ -11,13 +14,31 @@ use thiserror::Error; use crate::{ fs::FileProvider, - record::KeyRef, + record::{Key, KeyRef}, stream, - timestamp::Timestamp, + timestamp::{Timestamp, Timestamped}, version::{set::transaction_ts, VersionRef}, - LockMap, Projection, Record, Schema, + LockMap, Projection, Record, Scan, Schema, WriteError, }; +pub(crate) struct TransactionScan<'scan, R: Record> { + inner: Range<'scan, R::Key, Option>, + ts: Timestamp, +} + +impl<'scan, R> Iterator for TransactionScan<'scan, R> +where + R: Record, +{ + type Item = (Timestamped<::Ref<'scan>>, &'scan Option); + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|(key, value)| (Timestamped::new(key.as_key_ref(), self.ts), value)) + } +} + pub struct Transaction<'txn, R, FP> where R: Record, @@ -53,17 +74,29 @@ where &'get self, key: &'get R::Key, projection: Projection, - ) -> Result>, ParquetError> { + ) -> Result>, WriteError> { Ok(match self.local.get(key).and_then(|v| v.as_ref()) { Some(v) => Some(TransactionEntry::Local(v.as_record_ref())), None => self .share - .get(key, self.ts, projection) + .get(&self.version, key, self.ts, projection) .await? .map(TransactionEntry::Stream), }) } + pub async fn scan<'scan>( + &'scan self, + range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + ) -> Scan<'scan, R, FP> { + let streams = vec![TransactionScan { + inner: self.local.range(range), + ts: self.ts, + } + .into()]; + Scan::new(&self.share, range, self.ts, &self.version, streams) + } + pub fn set(&mut self, value: R) { self.entry(value.key().to_key(), Some(value)) } @@ -146,13 +179,17 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{collections::Bound, sync::Arc}; + use futures_util::StreamExt; use tempfile::TempDir; use crate::{ - executor::tokio::TokioExecutor, tests::Test, transaction::CommitError, DbOption, - Projection, DB, + compaction::tests::build_version, + executor::tokio::TokioExecutor, + tests::{build_db, build_schema, Test}, + transaction::CommitError, + DbOption, Projection, DB, }; #[tokio::test] @@ -254,4 +291,73 @@ mod tests { txn1.commit().await.unwrap(); } + + #[tokio::test] + async fn transaction_scan() { + let temp_dir = TempDir::new().unwrap(); + let option = Arc::new(DbOption::new(temp_dir.path())); + + let (_, version) = build_version(&option).await; + let schema = build_schema().await; + let db = build_db(option, TokioExecutor::new(), schema, version) + .await + .unwrap(); + + let mut txn = db.transaction().await; + txn.set(Test { + vstring: "king".to_string(), + vu32: 8, + vobool: Some(true), + }); + + let mut stream = txn + .scan((Bound::Unbounded, Bound::Unbounded)) + .await + .projection(vec![1]) + .take() + .await + .unwrap(); + + let entry_0 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_0.key().value, "1"); + assert!(entry_0.value().vbool.is_none()); + let entry_1 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_1.key().value, "2"); + assert!(entry_1.value().vbool.is_none()); + let entry_2 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_2.key().value, "3"); + assert!(entry_2.value().vbool.is_none()); + let entry_3 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_3.key().value, "4"); + assert!(entry_3.value().vbool.is_none()); + let entry_4 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_4.key().value, "5"); + assert!(entry_4.value().vbool.is_none()); + let entry_5 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_5.key().value, "6"); + assert!(entry_5.value().vbool.is_none()); + let entry_6 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_6.key().value, "7"); + assert!(entry_6.value().vbool.is_none()); + let entry_7 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_7.key().value, "8"); + assert!(entry_7.value().vbool.is_none()); + let entry_8 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_8.key().value, "9"); + assert!(entry_8.value().vbool.is_none()); + let entry_9 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_9.key().value, "alice"); + let entry_10 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_10.key().value, "ben"); + let entry_11 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_11.key().value, "carl"); + let entry_12 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_12.key().value, "dice"); + let entry_13 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_13.key().value, "erika"); + let entry_14 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_14.key().value, "funk"); + let entry_15 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_15.key().value, "king"); + } } diff --git a/src/version/mod.rs b/src/version/mod.rs index 2cdf2a8e..b15a3872 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -15,7 +15,7 @@ use crate::{ record::Record, scope::Scope, serdes::Encode, - stream::{record_batch::RecordBatchEntry, ScanStream}, + stream::{level::LevelStream, record_batch::RecordBatchEntry, ScanStream}, timestamp::{Timestamp, TimestampedRef}, version::cleaner::CleanTag, DbOption, @@ -144,10 +144,10 @@ where self.level_slice[level].len() } - pub(crate) async fn iters<'iters>( + pub(crate) async fn streams<'streams>( &self, - iters: &mut Vec>, - range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>), + streams: &mut Vec>, + range: (Bound<&'streams R::Key>, Bound<&'streams R::Key>), ts: Timestamp, limit: Option, projection_mask: ProjectionMask, @@ -158,22 +158,31 @@ where .map_err(VersionError::Io)?; let table = SsTable::open(file); - iters.push(ScanStream::SsTable { + streams.push(ScanStream::SsTable { inner: table .scan(range, ts, limit, projection_mask.clone()) .await .map_err(VersionError::Parquet)?, }) } - for scopes in self.level_slice[1..].iter() { + for (i, scopes) in self.level_slice[1..].iter().enumerate() { if scopes.is_empty() { continue; } - let _gens = scopes.iter().map(|scope| scope.gen).collect::>(); - todo!("level stream") - // iters.push(EStreamImpl::Level( - // LevelStream::new(option, gens, lower, upper).await?, - // )); + streams.push(ScanStream::Level { + // SAFETY: checked scopes no empty + inner: LevelStream::new( + self, + i + 1, + 0, + scopes.len() - 1, + range, + ts, + limit, + projection_mask.clone(), + ) + .unwrap(), + }); } Ok(()) } diff --git a/src/version/set.rs b/src/version/set.rs index 218fce0d..8a208c53 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -157,20 +157,51 @@ pub(crate) fn transaction_ts() -> Timestamp { #[cfg(test)] pub(crate) mod tests { - use std::sync::{atomic::Ordering, Arc}; + use std::{ + io::SeekFrom, + sync::{atomic::Ordering, Arc}, + }; - use flume::bounded; + use async_lock::RwLock; + use flume::{bounded, Sender}; + use futures_util::AsyncSeekExt; use tempfile::TempDir; use crate::{ executor::tokio::TokioExecutor, + fs::FileProvider, + record::Record, version::{ + cleaner::CleanTag, edit::VersionEdit, - set::{transaction_ts, VersionSet, GLOBAL_TIMESTAMP}, + set::{transaction_ts, VersionSet, VersionSetInner, GLOBAL_TIMESTAMP}, + Version, VersionError, }, DbOption, }; + pub(crate) async fn build_version_set( + version: Version, + clean_sender: Sender, + option: Arc, + ) -> Result, VersionError> + where + R: Record, + FP: FileProvider, + { + let mut log = FP::open(option.version_path()).await?; + log.seek(SeekFrom::End(0)).await?; + + Ok(VersionSet:: { + inner: Arc::new(RwLock::new(VersionSetInner { + current: Arc::new(version), + log, + })), + clean_sender, + option, + }) + } + #[tokio::test] async fn timestamp_persistence() { let temp_dir = TempDir::new().unwrap();