Skip to content

Commit

Permalink
use file as trait object
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 16, 2024
1 parent b0f18dc commit 8a2764d
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 76 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resolver = "2"
version = "0.1.0"

[features]
tokio = ["dep:tokio", "dep:tokio-util"]
tokio = ["dep:tokio"]

[dependencies]
arrow = "52"
Expand All @@ -21,7 +21,7 @@ parquet = { version = "52", features = ["async"] }
pin-project-lite = "0.2"
thiserror = "1"
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
ulid = "1"

Expand Down
6 changes: 6 additions & 0 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub mod tokio {
handle: Handle,
}

impl Default for TokioExecutor {
fn default() -> Self {
Self::new()
}
}

impl TokioExecutor {
pub fn new() -> Self {
Self {
Expand Down
9 changes: 8 additions & 1 deletion src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ pub enum FileType {
LOG,
}

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}
pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {
fn to_file(self) -> Box<dyn AsyncFile>
where
Self: Sized,
{
Box::new(self) as Box<dyn AsyncFile>
}
}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

Expand Down
13 changes: 5 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use record::Record;
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry, ScanStream},
stream::{merge::MergeStream, Entry},
version::Version,
};

Expand Down Expand Up @@ -176,23 +176,20 @@ where
where
E: Executor,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
self.scan(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan, E>(
async fn scan<'scan>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError> {
let mut streams = Vec::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down
18 changes: 6 additions & 12 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,31 @@ use parquet::arrow::async_reader::ParquetRecordBatchStream;
use pin_project_lite::pin_project;
use tokio_util::compat::Compat;

use crate::fs::AsyncFile;
use crate::{
executor::Executor,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R, E>
where
E: Executor
pub struct SsTableScan<R>
{
#[pin]
stream: ParquetRecordBatchStream<Compat<E::File>>,
stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R, E> SsTableScan<R, E>
where
E: Executor,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<E::File>>) -> Self {
impl<R> SsTableScan<R> {
pub fn new(stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>) -> Self {
SsTableScan { stream, iter: None }
}
}

impl<R, E> Stream for SsTableScan<R, E>
impl<R> Stream for SsTableScan<R>
where
R: Record,
E: Executor,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

Expand Down
28 changes: 14 additions & 14 deletions src/ondisk/sstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,34 @@ use parquet::{
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};

use super::scan::SsTableScan;
use crate::fs::AsyncFile;
use crate::{
arrows::get_range_filter,
executor::Executor,
oracle::{timestamp::TimestampedRef, Timestamp},
record::Record,
stream::record_batch::RecordBatchEntry,
};

pub(crate) struct SsTable<R, E>
pub(crate) struct SsTable<R>
where
R: Record,
E: Executor,
{
file: E::File,
_marker: PhantomData<(R, E)>,
file: Box<dyn AsyncFile>,
_marker: PhantomData<R>,
}

impl<R, E> SsTable<R, E>
impl<R> SsTable<R>
where
R: Record,
E: Executor,
{
pub(crate) fn open(file: E::File) -> Self {
pub(crate) fn open(file: Box<dyn AsyncFile>) -> Self {
SsTable {
file,
_marker: PhantomData,
}
}

fn create_writer(&mut self) -> AsyncArrowWriter<Compat<&mut E::File>> {
fn create_writer(&mut self) -> AsyncArrowWriter<Compat<&mut dyn AsyncFile>> {
// TODO: expose writer options
let options = ArrowWriterOptions::new().with_properties(
WriterProperties::builder()
Expand All @@ -53,7 +51,7 @@ where
.build(),
);
AsyncArrowWriter::try_new_with_options(
(&mut self.file).compat(),
(&mut self.file as &mut dyn AsyncFile).compat(),
R::arrow_schema().clone(),
options,
)
Expand All @@ -75,7 +73,7 @@ where
async fn into_parquet_builder(
self,
limit: usize,
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<E::File>>>> {
) -> parquet::errors::Result<ArrowReaderBuilder<AsyncReader<Compat<Box<dyn AsyncFile>>>>> {
Ok(ParquetRecordBatchStreamBuilder::new_with_options(
self.file.compat(),
ArrowReaderOptions::default().with_page_index(true),
Expand All @@ -99,7 +97,7 @@ where
self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
ts: Timestamp,
) -> Result<SsTableScan<R, E>, parquet::errors::ParquetError> {
) -> Result<SsTableScan<R>, parquet::errors::ParquetError> {
let builder = self.into_parquet_builder(1).await?;

let schema_descriptor = builder.metadata().file_metadata().schema_descr();
Expand All @@ -114,6 +112,7 @@ mod tests {
use std::borrow::Borrow;

use super::SsTable;
use crate::fs::AsyncFile;
use crate::{
executor::tokio::TokioExecutor,
fs::Fs,
Expand All @@ -127,8 +126,9 @@ mod tests {
let record_batch = get_test_record_batch::<TokioExecutor>().await;
let file = TokioExecutor::open(&temp_dir.path().join("test.parquet"))
.await
.unwrap();
let mut sstable = SsTable::<Test, TokioExecutor>::open(file);
.unwrap()
.to_file();
let mut sstable = SsTable::<Test>::open(file);

sstable.write(record_batch).await.unwrap();

Expand Down
33 changes: 14 additions & 19 deletions src/stream/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,25 @@ use futures_util::stream::StreamExt;
use pin_project_lite::pin_project;

use super::{Entry, ScanStream};
use crate::{executor::Executor, record::Record};
use crate::record::Record;

pin_project! {
pub(crate) struct MergeStream<'merge, R, E>
pub(crate) struct MergeStream<'merge, R>
where
R: Record,
E: Executor,
{
streams: Vec<ScanStream<'merge, R, E>>,
streams: Vec<ScanStream<'merge, R>>,
peeked: BinaryHeap<CmpEntry<'merge, R>>,
buf: Option<Entry<'merge, R>>,
}
}

impl<'merge, R, E> MergeStream<'merge, R, E>
impl<'merge, R> MergeStream<'merge, R>
where
R: Record,
E: Executor,
{
pub(crate) async fn from_vec(
mut streams: Vec<ScanStream<'merge, R, E>>,
mut streams: Vec<ScanStream<'merge, R>>,
) -> Result<Self, parquet::errors::ParquetError> {
let mut peeked = BinaryHeap::with_capacity(streams.len());

Expand All @@ -51,10 +49,9 @@ where
}
}

impl<'merge, R, E> Stream for MergeStream<'merge, R, E>
impl<'merge, R> Stream for MergeStream<'merge, R>
where
R: Record,
E: Executor,
{
type Item = Result<Entry<'merge, R>, parquet::errors::ParquetError>;

Expand Down Expand Up @@ -138,7 +135,7 @@ mod tests {
use futures_util::StreamExt;

use super::MergeStream;
use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable};
use crate::{inmem::mutable::Mutable};

#[tokio::test]
async fn merge_mutable() {
Expand All @@ -158,7 +155,7 @@ mod tests {
let lower = "a".to_string();
let upper = "e".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge = MergeStream::<String, TokioExecutor>::from_vec(vec![
let mut merge = MergeStream::from_vec(vec![
m1.scan(bound, 6.into()).into(),
m2.scan(bound, 6.into()).into(),
m3.scan(bound, 6.into()).into(),
Expand Down Expand Up @@ -186,10 +183,9 @@ mod tests {
let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 0.into()).into()])
.await
.unwrap();
let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()])
.await
.unwrap();

dbg!(merge.next().await);
dbg!(merge.next().await);
Expand All @@ -198,10 +194,9 @@ mod tests {
let lower = "1".to_string();
let upper = "4".to_string();
let bound = (Bound::Included(&lower), Bound::Included(&upper));
let mut merge =
MergeStream::<String, TokioExecutor>::from_vec(vec![m1.scan(bound, 1.into()).into()])
.await
.unwrap();
let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()])
.await
.unwrap();

dbg!(merge.next().await);
dbg!(merge.next().await);
Expand Down
23 changes: 8 additions & 15 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use pin_project_lite::pin_project;
use record_batch::RecordBatchEntry;

use crate::{
executor::Executor,
inmem::{immutable::ImmutableScan, mutable::MutableScan},
ondisk::scan::SsTableScan,
oracle::timestamp::Timestamped,
Expand Down Expand Up @@ -66,10 +65,9 @@ where

pin_project! {
#[project = ScanStreamProject]
pub enum ScanStream<'scan, R, E>
pub enum ScanStream<'scan, R>
where
R: Record,
E: Executor,
{
Mutable {
#[pin]
Expand All @@ -81,15 +79,14 @@ pin_project! {
},
SsTable {
#[pin]
inner: SsTableScan<R, E>,
inner: SsTableScan<R>,
},
}
}

impl<'scan, R, E> From<MutableScan<'scan, R>> for ScanStream<'scan, R, E>
impl<'scan, R> From<MutableScan<'scan, R>> for ScanStream<'scan, R>
where
R: Record,
E: Executor,
{
fn from(inner: MutableScan<'scan, R>) -> Self {
ScanStream::Mutable {
Expand All @@ -98,10 +95,9 @@ where
}
}

impl<'scan, R, E> From<ImmutableScan<'scan, R>> for ScanStream<'scan, R, E>
impl<'scan, R> From<ImmutableScan<'scan, R>> for ScanStream<'scan, R>
where
R: Record,
E: Executor,
{
fn from(inner: ImmutableScan<'scan, R>) -> Self {
ScanStream::Immutable {
Expand All @@ -110,20 +106,18 @@ where
}
}

impl<'scan, R, E> From<SsTableScan<R, E>> for ScanStream<'scan, R, E>
impl<'scan, R> From<SsTableScan<R>> for ScanStream<'scan, R>
where
R: Record,
E: Executor,
{
fn from(inner: SsTableScan<R, E>) -> Self {
fn from(inner: SsTableScan<R>) -> Self {
ScanStream::SsTable { inner }
}
}

impl<R, E> fmt::Debug for ScanStream<'_, R, E>
impl<R> fmt::Debug for ScanStream<'_, R>
where
R: Record,
E: Executor,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Expand All @@ -134,10 +128,9 @@ where
}
}

impl<'scan, R, E> Stream for ScanStream<'scan, R, E>
impl<'scan, R> Stream for ScanStream<'scan, R>
where
R: Record,
E: Executor,
{
type Item = Result<Entry<'scan, R>, parquet::errors::ParquetError>;

Expand Down
Loading

0 comments on commit 8a2764d

Please sign in to comment.