Skip to content

Commit

Permalink
feat: impl LevelStream
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 17, 2024
1 parent 7c20250 commit 55455af
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 71 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
Cargo.lock
/.idea
Cargo.lock
21 changes: 7 additions & 14 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@ use ulid::Ulid;
pub(crate) type FileId = Ulid;

pub(crate) enum FileType {
WAL,
PARQUET,
LOG,
Wal,
Parquet,
Log,
}

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {
fn boxed(self) -> Box<dyn AsyncFile>
where
Self: Sized,
{
Box::new(self) as Box<dyn AsyncFile>
}
}
pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

Expand All @@ -41,9 +34,9 @@ pub trait FileProvider {
impl Display for FileType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FileType::WAL => write!(f, "wal"),
FileType::PARQUET => write!(f, "parquet"),
FileType::LOG => write!(f, "log"),
FileType::Wal => write!(f, "wal"),
FileType::Parquet => write!(f, "parquet"),
FileType::Log => write!(f, "log"),
}
}
}
26 changes: 16 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use record::Record;
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry},
stream::{merge::MergeStream, Entry, ScanStream},
version::Version,
};

Expand Down Expand Up @@ -65,15 +65,15 @@ impl DbOption {
}

pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::PARQUET))
self.path.join(format!("{}.{}", gen, FileType::Parquet))
}

pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::WAL))
self.path.join(format!("{}.{}", gen, FileType::Wal))
}

pub(crate) fn version_path(&self) -> PathBuf {
self.path.join(format!("version.{}", FileType::LOG))
self.path.join(format!("version.{}", FileType::Log))
}

pub(crate) fn is_threshold_exceeded_major<R, E>(
Expand Down Expand Up @@ -166,25 +166,31 @@ where
Ok(())
}

async fn get<'get>(
async fn get<'get, E>(
&'get self,
key: &'get R::Key,
ts: Timestamp,
) -> Result<Option<Entry<'get, R>>, ParquetError> {
self.scan(Bound::Included(key), Bound::Unbounded, ts)
) -> Result<Option<Entry<'get, R>>, ParquetError>
where
E: Executor,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan>(
async fn scan<'scan, E>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError> {
let mut streams = Vec::with_capacity(self.immutables.len() + 1);
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down
18 changes: 12 additions & 6 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,36 @@ use pin_project_lite::pin_project;
use tokio_util::compat::Compat;

use crate::{
fs::AsyncFile,
executor::Executor,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R>
pub struct SsTableScan<R, E>
where
E: Executor,
{
#[pin]
stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>,
stream: ParquetRecordBatchStream<Compat<E::File>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R> SsTableScan<R> {
pub fn new(stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>) -> Self {
impl<R, E> SsTableScan<R, E>
where
E: Executor,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<E::File>>) -> Self {
SsTableScan { stream, iter: None }
}
}

impl<R> Stream for SsTableScan<R>
impl<R, E> Stream for SsTableScan<R, E>
where
R: Record,
E: Executor,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

Expand Down
22 changes: 12 additions & 10 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,28 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
use super::scan::SsTableScan;
use crate::{
arrows::get_range_filter,
executor::Executor,
fs::AsyncFile,
oracle::{timestamp::TimestampedRef, Timestamp},
record::Record,
stream::record_batch::RecordBatchEntry,
};

pub(crate) struct SsTable<R>
pub(crate) struct SsTable<R, E>
where
R: Record,
E: Executor,
{
file: Box<dyn AsyncFile>,
file: E::File,
_marker: PhantomData<R>,
}

impl<R> SsTable<R>
impl<R, E> SsTable<R, E>
where
R: Record,
E: Executor,
{
pub(crate) fn open(file: Box<dyn AsyncFile>) -> Self {
pub(crate) fn open(file: E::File) -> Self {
SsTable {
file,
_marker: PhantomData,
Expand Down Expand Up @@ -73,7 +76,7 @@ where
async fn into_parquet_builder(
self,
limit: usize,
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<Box<dyn AsyncFile>>>>> {
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<E::File>>>> {
Ok(ParquetRecordBatchStreamBuilder::new_with_options(
self.file.compat(),
ArrowReaderOptions::default().with_page_index(true),
Expand All @@ -97,7 +100,7 @@ where
self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
ts: Timestamp,
) -> Result<SsTableScan<R>, parquet::errors::ParquetError> {
) -> Result<SsTableScan<R, E>, parquet::errors::ParquetError> {
let builder = self.into_parquet_builder(1).await?;

let schema_descriptor = builder.metadata().file_metadata().schema_descr();
Expand All @@ -114,7 +117,7 @@ mod tests {
use super::SsTable;
use crate::{
executor::tokio::TokioExecutor,
fs::{AsyncFile, FileProvider},
fs::FileProvider,
oracle::timestamp::Timestamped,
tests::{get_test_record_batch, Test},
};
Expand All @@ -125,9 +128,8 @@ mod tests {
let record_batch = get_test_record_batch::<TokioExecutor>().await;
let file = TokioExecutor::open(&temp_dir.path().join("test.parquet"))
.await
.unwrap()
.boxed();
let mut sstable = SsTable::<Test>::open(file);
.unwrap();
let mut sstable = SsTable::<Test, TokioExecutor>::open(file);

sstable.write(record_batch).await.unwrap();

Expand Down
99 changes: 99 additions & 0 deletions src/stream/level.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::{
collections::{Bound, VecDeque},
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures_core::Stream;
use pin_project_lite::pin_project;

use crate::{
executor::Executor,
fs::FileId,
ondisk::{scan::SsTableScan, sstable::SsTable},
oracle::Timestamp,
record::Record,
stream::record_batch::RecordBatchEntry,
DbOption,
};

enum FutureStatus<'level, R, E>
where
R: Record,
E: Executor,
{
Ready(Pin<Box<SsTableScan<R, E>>>),
OpenFile(Pin<Box<dyn Future<Output = io::Result<E::File>> + 'level>>),
LoadStream(
Pin<
Box<
dyn Future<Output = Result<SsTableScan<R, E>, parquet::errors::ParquetError>>
+ 'level,
>,
>,
),
}

pin_project! {
pub(crate) struct LevelStream<'level, R, E>
where
R: Record,
E: Executor,
{
lower: Bound<&'level R::Key>,
upper: Bound<&'level R::Key>,
ts: Timestamp,
option: Arc<DbOption>,
gens: VecDeque<FileId>,
statue: FutureStatus<'level, R, E>,
}
}

impl<'level, R, E> Stream for LevelStream<'level, R, E>
where
R: Record,
E: Executor + 'level,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.statue {
FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) {
Poll::Ready(None) => match self.gens.pop_front() {
None => Poll::Ready(None),
Some(gen) => {
self.statue =
FutureStatus::OpenFile(Box::pin(E::open(self.option.table_path(&gen))));
self.poll_next(cx)
}
},
poll => poll,
},
FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) {
Poll::Ready(Ok(file)) => {
self.statue = FutureStatus::LoadStream(Box::pin(
SsTable::open(file).scan((self.lower, self.upper), self.ts),
));
self.poll_next(cx)
}
Poll::Ready(Err(err)) => {
Poll::Ready(Some(Err(parquet::errors::ParquetError::from(err))))
}
Poll::Pending => Poll::Pending,
},
FutureStatus::LoadStream(stream_future) => match Pin::new(stream_future).poll(cx) {
Poll::Ready(Ok(scan)) => {
self.statue = FutureStatus::Ready(Box::pin(scan));
self.poll_next(cx)
}
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Poll::Pending => Poll::Pending,
},
}
}
}

// TODO: Test Case after `Compaction`
Loading

0 comments on commit 55455af

Please sign in to comment.