diff --git a/.gitignore b/.gitignore index 96ef6c0b..d3611fa8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -Cargo.lock +/.idea +Cargo.lock \ No newline at end of file diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 888c8725..0ca17174 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -14,19 +14,12 @@ use ulid::Ulid; pub(crate) type FileId = Ulid; pub(crate) enum FileType { - WAL, - PARQUET, - LOG, + Wal, + Parquet, + Log, } -pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static { - fn boxed(self) -> Box - where - Self: Sized, - { - Box::new(self) as Box - } -} +pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} impl AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} @@ -41,9 +34,9 @@ pub trait FileProvider { impl Display for FileType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - FileType::WAL => write!(f, "wal"), - FileType::PARQUET => write!(f, "parquet"), - FileType::LOG => write!(f, "log"), + FileType::Wal => write!(f, "wal"), + FileType::Parquet => write!(f, "parquet"), + FileType::Log => write!(f, "log"), } } } diff --git a/src/lib.rs b/src/lib.rs index ba3701f7..e40e5877 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ use record::Record; use crate::{ executor::Executor, fs::{FileId, FileType}, - stream::{merge::MergeStream, Entry}, + stream::{merge::MergeStream, Entry, ScanStream}, version::Version, }; @@ -65,15 +65,15 @@ impl DbOption { } pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf { - self.path.join(format!("{}.{}", gen, FileType::PARQUET)) + self.path.join(format!("{}.{}", gen, FileType::Parquet)) } pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf { - self.path.join(format!("{}.{}", gen, FileType::WAL)) + self.path.join(format!("{}.{}", gen, FileType::Wal)) } pub(crate) fn version_path(&self) -> PathBuf { - self.path.join(format!("version.{}", FileType::LOG)) + self.path.join(format!("version.{}", FileType::Log)) } pub(crate) fn is_threshold_exceeded_major( @@ -166,25 +166,31 @@ where Ok(()) } - async fn get<'get>( + async fn get<'get, E>( &'get self, key: &'get R::Key, ts: Timestamp, - ) -> Result>, ParquetError> { - self.scan(Bound::Included(key), Bound::Unbounded, ts) + ) -> Result>, ParquetError> + where + E: Executor, + { + self.scan::(Bound::Included(key), Bound::Unbounded, ts) .await? .next() .await .transpose() } - async fn scan<'scan>( + async fn scan<'scan, E>( &'scan self, lower: Bound<&'scan R::Key>, uppwer: Bound<&'scan R::Key>, ts: Timestamp, - ) -> Result, ParquetError>>, ParquetError> { - let mut streams = Vec::with_capacity(self.immutables.len() + 1); + ) -> Result, ParquetError>>, ParquetError> + where + E: Executor, + { + let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); streams.push(self.mutable.scan((lower, uppwer), ts).into()); for immutable in &self.immutables { streams.push(immutable.scan((lower, uppwer), ts).into()); diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 0705c232..4616e738 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -9,30 +9,36 @@ use pin_project_lite::pin_project; use tokio_util::compat::Compat; use crate::{ - fs::AsyncFile, + executor::Executor, record::Record, stream::record_batch::{RecordBatchEntry, RecordBatchIterator}, }; pin_project! { #[derive(Debug)] - pub struct SsTableScan + pub struct SsTableScan + where + E: Executor, { #[pin] - stream: ParquetRecordBatchStream>>, + stream: ParquetRecordBatchStream>, iter: Option>, } } -impl SsTableScan { - pub fn new(stream: ParquetRecordBatchStream>>) -> Self { +impl SsTableScan +where + E: Executor, +{ + pub fn new(stream: ParquetRecordBatchStream>) -> Self { SsTableScan { stream, iter: None } } } -impl Stream for SsTableScan +impl Stream for SsTableScan where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 0c2ddce0..59b2cb83 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -17,25 +17,28 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; use super::scan::SsTableScan; use crate::{ arrows::get_range_filter, + executor::Executor, fs::AsyncFile, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, stream::record_batch::RecordBatchEntry, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, + E: Executor, { - file: Box, + file: E::File, _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, + E: Executor, { - pub(crate) fn open(file: Box) -> Self { + pub(crate) fn open(file: E::File) -> Self { SsTable { file, _marker: PhantomData, @@ -73,7 +76,7 @@ where async fn into_parquet_builder( self, limit: usize, - ) -> parquet::errors::Result>>>> { + ) -> parquet::errors::Result>>> { Ok(ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), @@ -97,7 +100,7 @@ where self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self.into_parquet_builder(1).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); @@ -114,7 +117,7 @@ mod tests { use super::SsTable; use crate::{ executor::tokio::TokioExecutor, - fs::{AsyncFile, FileProvider}, + fs::FileProvider, oracle::timestamp::Timestamped, tests::{get_test_record_batch, Test}, }; @@ -125,9 +128,8 @@ mod tests { let record_batch = get_test_record_batch::().await; let file = TokioExecutor::open(&temp_dir.path().join("test.parquet")) .await - .unwrap() - .boxed(); - let mut sstable = SsTable::::open(file); + .unwrap(); + let mut sstable = SsTable::::open(file); sstable.write(record_batch).await.unwrap(); diff --git a/src/stream/level.rs b/src/stream/level.rs new file mode 100644 index 00000000..8294cc1d --- /dev/null +++ b/src/stream/level.rs @@ -0,0 +1,99 @@ +use std::{ + collections::{Bound, VecDeque}, + future::Future, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures_core::Stream; +use pin_project_lite::pin_project; + +use crate::{ + executor::Executor, + fs::FileId, + ondisk::{scan::SsTableScan, sstable::SsTable}, + oracle::Timestamp, + record::Record, + stream::record_batch::RecordBatchEntry, + DbOption, +}; + +enum FutureStatus<'level, R, E> +where + R: Record, + E: Executor, +{ + Ready(Pin>>), + OpenFile(Pin> + 'level>>), + LoadStream( + Pin< + Box< + dyn Future, parquet::errors::ParquetError>> + + 'level, + >, + >, + ), +} + +pin_project! { + pub(crate) struct LevelStream<'level, R, E> + where + R: Record, + E: Executor, + { + lower: Bound<&'level R::Key>, + upper: Bound<&'level R::Key>, + ts: Timestamp, + option: Arc, + gens: VecDeque, + statue: FutureStatus<'level, R, E>, + } +} + +impl<'level, R, E> Stream for LevelStream<'level, R, E> +where + R: Record, + E: Executor + 'level, +{ + type Item = Result, parquet::errors::ParquetError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut self.statue { + FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { + Poll::Ready(None) => match self.gens.pop_front() { + None => Poll::Ready(None), + Some(gen) => { + self.statue = + FutureStatus::OpenFile(Box::pin(E::open(self.option.table_path(&gen)))); + self.poll_next(cx) + } + }, + poll => poll, + }, + FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { + Poll::Ready(Ok(file)) => { + self.statue = FutureStatus::LoadStream(Box::pin( + SsTable::open(file).scan((self.lower, self.upper), self.ts), + )); + self.poll_next(cx) + } + Poll::Ready(Err(err)) => { + Poll::Ready(Some(Err(parquet::errors::ParquetError::from(err)))) + } + Poll::Pending => Poll::Pending, + }, + FutureStatus::LoadStream(stream_future) => match Pin::new(stream_future).poll(cx) { + Poll::Ready(Ok(scan)) => { + self.statue = FutureStatus::Ready(Box::pin(scan)); + self.poll_next(cx) + } + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + }, + } + } +} + +// TODO: Test Case after `Compaction` diff --git a/src/stream/merge.rs b/src/stream/merge.rs index bb1d8de3..2734572b 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -10,25 +10,27 @@ use futures_util::stream::StreamExt; use pin_project_lite::pin_project; use super::{Entry, ScanStream}; -use crate::record::Record; +use crate::{executor::Executor, record::Record}; pin_project! { - pub(crate) struct MergeStream<'merge, R> + pub(crate) struct MergeStream<'merge, R, E> where R: Record, + E: Executor, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, } } -impl<'merge, R> MergeStream<'merge, R> +impl<'merge, R, E> MergeStream<'merge, R, E> where R: Record, + E: Executor, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -49,9 +51,10 @@ where } } -impl<'merge, R> Stream for MergeStream<'merge, R> +impl<'merge, R, E> Stream for MergeStream<'merge, R, E> where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; @@ -135,7 +138,7 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::inmem::mutable::Mutable; + use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable}; #[tokio::test] async fn merge_mutable() { @@ -155,7 +158,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![ + let mut merge = MergeStream::::from_vec(vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), m3.scan(bound, 6.into()).into(), @@ -183,9 +186,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()]) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); @@ -194,9 +198,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()]) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 819601aa..9ff276e1 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod level; pub(crate) mod merge; pub(crate) mod record_batch; @@ -14,6 +15,7 @@ use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; use crate::{ + executor::Executor, inmem::{immutable::ImmutableScan, mutable::MutableScan}, ondisk::scan::SsTableScan, oracle::timestamp::Timestamped, @@ -65,9 +67,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R> + pub enum ScanStream<'scan, R, E> where R: Record, + E: Executor, { Mutable { #[pin] @@ -79,14 +82,15 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan, + inner: SsTableScan, }, } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -95,9 +99,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -106,18 +111,20 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { - fn from(inner: SsTableScan) -> Self { + fn from(inner: SsTableScan) -> Self { ScanStream::SsTable { inner } } } -impl fmt::Debug for ScanStream<'_, R> +impl fmt::Debug for ScanStream<'_, R, E> where R: Record, + E: Executor, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -128,9 +135,10 @@ where } } -impl<'scan, R> Stream for ScanStream<'scan, R> +impl<'scan, R, E> Stream for ScanStream<'scan, R, E> where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/version/mod.rs b/src/version/mod.rs index 8fbd2945..fa077e26 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -10,7 +10,7 @@ use tracing::error; use crate::{ executor::Executor, - fs::{AsyncFile, FileId}, + fs::FileId, ondisk::sstable::SsTable, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, @@ -98,10 +98,11 @@ where ) -> Result>, VersionError> { let file = E::open(self.option.table_path(gen)) .await - .map_err(VersionError::Io)? - .boxed(); - let table = SsTable::open(file); - table.get(key).await.map_err(VersionError::Parquet) + .map_err(VersionError::Io)?; + SsTable::::open(file) + .get(key) + .await + .map_err(VersionError::Parquet) } pub(crate) fn scope_search(key: &R::Key, level: &[Scope]) -> usize { @@ -116,15 +117,14 @@ where pub(crate) async fn iters<'iters>( &self, - iters: &mut Vec>, + iters: &mut Vec>, range: (Bound<&'iters R::Key>, Bound<&'iters R::Key>), ts: Timestamp, ) -> Result<(), VersionError> { for scope in self.level_slice[0].iter() { let file = E::open(self.option.table_path(&scope.gen)) .await - .map_err(VersionError::Io)? - .boxed(); + .map_err(VersionError::Io)?; let table = SsTable::open(file); iters.push(ScanStream::SsTable {