Skip to content

Commit

Permalink
use file as trait object
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 16, 2024
1 parent b0f18dc commit cf55a25
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 84 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resolver = "2"
version = "0.1.0"

[features]
tokio = ["dep:tokio", "dep:tokio-util"]
tokio = ["dep:tokio"]

[dependencies]
arrow = "52"
Expand All @@ -21,7 +21,7 @@ parquet = { version = "52", features = ["async"] }
pin-project-lite = "0.2"
thiserror = "1"
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
ulid = "1"

Expand Down
6 changes: 6 additions & 0 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub mod tokio {
handle: Handle,
}

impl Default for TokioExecutor {
fn default() -> Self {
Self::new()
}
}

impl TokioExecutor {
pub fn new() -> Self {
Self {
Expand Down
9 changes: 8 additions & 1 deletion src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ pub enum FileType {
LOG,
}

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}
pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {
fn to_file(self) -> Box<dyn AsyncFile>
where
Self: Sized,
{
Box::new(self) as Box<dyn AsyncFile>
}
}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

Expand Down
22 changes: 8 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use record::Record;
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry, ScanStream},
stream::{merge::MergeStream, Entry},
version::Version,
};

Expand Down Expand Up @@ -106,7 +106,7 @@ where
impl<R, E> DB<R, E>
where
R: Record + Send + Sync,
R::Key: Send + Sync,
R::Key: Send,
E: Executor,
{
pub fn empty() -> Self {
Expand Down Expand Up @@ -168,31 +168,25 @@ where
Ok(())
}

async fn get<'get, E>(
async fn get<'get>(
&'get self,
key: &'get R::Key,
ts: Timestamp,
) -> Result<Option<Entry<'get, R>>, ParquetError>
where
E: Executor,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
) -> Result<Option<Entry<'get, R>>, ParquetError> {
self.scan(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan, E>(
async fn scan<'scan>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError> {
let mut streams = Vec::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down
18 changes: 6 additions & 12 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,30 @@ use pin_project_lite::pin_project;
use tokio_util::compat::Compat;

use crate::{
executor::Executor,
fs::AsyncFile,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R, E>
where
E: Executor
pub struct SsTableScan<R>
{
#[pin]
stream: ParquetRecordBatchStream<Compat<E::File>>,
stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R, E> SsTableScan<R, E>
where
E: Executor,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<E::File>>) -> Self {
impl<R> SsTableScan<R> {
pub fn new(stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>) -> Self {
SsTableScan { stream, iter: None }
}
}

impl<R, E> Stream for SsTableScan<R, E>
impl<R> Stream for SsTableScan<R>
where
R: Record,
E: Executor,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

Expand Down
29 changes: 14 additions & 15 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,32 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};
use super::scan::SsTableScan;
use crate::{
arrows::get_range_filter,
executor::Executor,
fs::AsyncFile,
oracle::{timestamp::TimestampedRef, Timestamp},
record::Record,
stream::record_batch::RecordBatchEntry,
};

pub(crate) struct SsTable<R, E>
pub(crate) struct SsTable<R>
where
R: Record,
E: Executor,
{
file: E::File,
_marker: PhantomData<(R, E)>,
file: Box<dyn AsyncFile>,
_marker: PhantomData<R>,
}

impl<R, E> SsTable<R, E>
impl<R> SsTable<R>
where
R: Record,
E: Executor,
{
pub(crate) fn open(file: E::File) -> Self {
pub(crate) fn open(file: Box<dyn AsyncFile>) -> Self {
SsTable {
file,
_marker: PhantomData,
}
}

fn create_writer(&mut self) -> AsyncArrowWriter<Compat<&mut E::File>> {
fn create_writer(&mut self) -> AsyncArrowWriter<Compat<&mut dyn AsyncFile>> {
// TODO: expose writer options
let options = ArrowWriterOptions::new().with_properties(
WriterProperties::builder()
Expand All @@ -53,7 +51,7 @@ where
.build(),
);
AsyncArrowWriter::try_new_with_options(
(&mut self.file).compat(),
(&mut self.file as &mut dyn AsyncFile).compat(),
R::arrow_schema().clone(),
options,
)
Expand All @@ -75,7 +73,7 @@ where
async fn into_parquet_builder(
self,
limit: usize,
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<E::File>>>> {
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<Box<dyn AsyncFile>>>>> {
Ok(ParquetRecordBatchStreamBuilder::new_with_options(
self.file.compat(),
ArrowReaderOptions::default().with_page_index(true),
Expand All @@ -99,7 +97,7 @@ where
self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
ts: Timestamp,
) -> Result<SsTableScan<R, E>, parquet::errors::ParquetError> {
) -> Result<SsTableScan<R>, parquet::errors::ParquetError> {
let builder = self.into_parquet_builder(1).await?;

let schema_descriptor = builder.metadata().file_metadata().schema_descr();
Expand All @@ -116,7 +114,7 @@ mod tests {
use super::SsTable;
use crate::{
executor::tokio::TokioExecutor,
fs::Fs,
fs::{AsyncFile, Fs},
oracle::timestamp::Timestamped,
tests::{get_test_record_batch, Test},
};
Expand All @@ -127,8 +125,9 @@ mod tests {
let record_batch = get_test_record_batch::<TokioExecutor>().await;
let file = TokioExecutor::open(&temp_dir.path().join("test.parquet"))
.await
.unwrap();
let mut sstable = SsTable::<Test, TokioExecutor>::open(file);
.unwrap()
.to_file();
let mut sstable = SsTable::<Test>::open(file);

sstable.write(record_batch).await.unwrap();

Expand Down
33 changes: 14 additions & 19 deletions src/stream/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,25 @@ use futures_util::stream::StreamExt;
use pin_project_lite::pin_project;

use super::{Entry, ScanStream};
use crate::{executor::Executor, record::Record};
use crate::record::Record;

pin_project! {
pub(crate) struct MergeStream<'merge, R, E>
pub(crate) struct MergeStream<'merge, R>
where
R: Record,
E: Executor,
{
streams: Vec<ScanStream<'merge, R, E>>,
streams: Vec<ScanStream<'merge, R>>,
peeked: BinaryHeap<CmpEntry<'merge, R>>,
buf: Option<Entry<'merge, R>>,
}
}

impl<'merge, R, E> MergeStream<'merge, R, E>
impl<'merge, R> MergeStream<'merge, R>
where
R: Record,
E: Executor,
{
pub(crate) async fn from_vec(
mut streams: Vec<ScanStream<'merge, R, E>>,
mut streams: Vec<ScanStream<'merge, R>>,
) -> Result<Self, parquet::errors::ParquetError> {
let mut peeked = BinaryHeap::with_capacity(streams.len());

Expand All @@ -51,10 +49,9 @@ where
}
}

impl<'merge, R, E> Stream for MergeStream<'merge, R, E>
impl<'merge, R> Stream for MergeStream<'merge, R>
where
R: Record,
E: Executor,
{
type Item = Result<Entry<'merge, R>, parquet::errors::ParquetError>;

Expand Down Expand Up @@ -138,7 +135,7 @@ mod tests {
use futures_util::StreamExt;

use super::MergeStream;
use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable};
use crate::inmem::mutable::Mutable;

#[tokio::test]
async fn merge_mutable() {
Expand All @@ -158,7 +155,7 @@ mod tests {
let lower = "a".to_string();
let upper = "e".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge = MergeStream::<String, TokioExecutor>::from_vec(vec![
let mut merge = MergeStream::from_vec(vec![
m1.scan(bound, 6.into()).into(),
m2.scan(bound, 6.into()).into(),
m3.scan(bound, 6.into()).into(),
Expand Down Expand Up @@ -186,10 +183,9 @@ mod tests {
let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 0.into()).into()])
.await
.unwrap();
let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()])
.await
.unwrap();

dbg!(merge.next().await);
dbg!(merge.next().await);
Expand All @@ -198,10 +194,9 @@ mod tests {
let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 1.into()).into()])
.await
.unwrap();
let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()])
.await
.unwrap();

dbg!(merge.next().await);
dbg!(merge.next().await);
Expand Down
Loading

0 comments on commit cf55a25

Please sign in to comment.