Skip to content

Commit

Permalink
chore: init
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 15, 2024
0 parents commit 596f286
Show file tree
Hide file tree
Showing 25 changed files with 2,120 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
Cargo.lock
26 changes: 26 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
edition = "2021"
name = "seren"
resolver = "2"
version = "0.1.0"

[features]
tokio = ["dep:tokio"]

[dependencies]
arrow = "52"
async-lock = "3"
crossbeam-skiplist = "0.1"
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
once_cell = "1"
parquet = { version = "52", features = ["async"] }
pin-project-lite = "0.2"
thiserror = "1"
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"] }

[dev-dependencies]
tempfile = "3"
tokio = { version = "1", features = ["full"] }
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
type-complexity-threshold = 900
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "stable"
10 changes: 10 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
comment_width = 100
edition = "2021"
format_code_in_doc_comments = true
format_strings = true
group_imports = "StdExternalCrate"
imports_granularity = "Crate"
max_width = 100
normalize_comments = true
normalize_doc_attributes = true
wrap_comments = true
91 changes: 91 additions & 0 deletions src/arrows/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::ops::Bound;

use arrow::{
array::{BooleanArray, Datum},
buffer::BooleanBuffer,
compute::kernels::cmp::{gt, gt_eq, lt_eq},
error::ArrowError,
};
use parquet::{
arrow::{
arrow_reader::{ArrowPredicate, ArrowPredicateFn, RowFilter},
ProjectionMask,
},
schema::types::SchemaDescriptor,
};

use crate::{
oracle::Timestamp,
record::{Key, Record},
};

unsafe fn get_range_bound_fn<R>(
range: Bound<&R::Key>,
) -> (
Option<&'static R::Key>,
&'static (dyn Fn(&dyn Datum, &dyn Datum) -> Result<BooleanArray, ArrowError> + Sync),
)
where
R: Record,
{
let cmp: &'static (dyn Fn(&dyn Datum, &dyn Datum) -> Result<BooleanArray, ArrowError> + Sync);
let key = match range {
Bound::Included(key) => {
cmp = &gt_eq;
Some(unsafe { &*(key as *const _) })
}
Bound::Excluded(key) => {
cmp = &gt;
Some(unsafe { &*(key as *const _) })
}
Bound::Unbounded => {
cmp = &|this, _| {
let len = this.get().0.len();
Ok(BooleanArray::new(
BooleanBuffer::collect_bool(len, |_| true),
None,
))
};
None
}
};
(key, cmp)
}

pub(crate) unsafe fn get_range_filter<R>(
schema_descriptor: &SchemaDescriptor,
range: (Bound<&R::Key>, Bound<&R::Key>),
ts: Timestamp,
) -> RowFilter
where
R: Record,
{
let (lower_key, lower_cmp) = get_range_bound_fn::<R>(range.0);
let (upper_key, upper_cmp) = get_range_bound_fn::<R>(range.0);

let predictions: Vec<Box<dyn ArrowPredicate>> = vec![
Box::new(ArrowPredicateFn::new(
ProjectionMask::roots(schema_descriptor, [2]),
move |record_batch| {
lower_cmp(
record_batch.column(0),
&lower_key.unwrap().to_arrow_datum() as &dyn Datum,
)
},
)),
Box::new(ArrowPredicateFn::new(
ProjectionMask::roots(schema_descriptor, [2]),
move |record_batch| {
upper_cmp(
record_batch.column(0),
&upper_key.unwrap().to_arrow_datum() as &dyn Datum,
)
},
)),
Box::new(ArrowPredicateFn::new(
ProjectionMask::roots(schema_descriptor, [1]),
move |record_batch| lt_eq(record_batch.column(0), &ts.to_arrow_scalar() as &dyn Datum),
)),
];
RowFilter::new(predictions)
}
33 changes: 33 additions & 0 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::future::Future;

pub trait Executor {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
}

#[cfg(any(feature = "tokio", test))]
pub mod tokio {
use std::future::Future;

use super::Executor;

pub struct TokioExecutor {
tokio: tokio::runtime::Runtime,
}

impl TokioExecutor {
pub fn new(tokio: tokio::runtime::Runtime) -> Self {
Self { tokio }
}
}

impl Executor for TokioExecutor {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tokio.spawn(future);
}
}
}
16 changes: 16 additions & 0 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#[cfg(any(feature = "tokio", test))]
pub mod tokio;

use std::{future::Future, io, path::Path};

use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}

pub trait Fs {
type File: AsyncFile;

fn open(&self, path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;
}
17 changes: 17 additions & 0 deletions src/fs/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::{io, path::Path};

use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

use super::Fs;

pub struct TokioFs;

impl Fs for TokioFs {
type File = Compat<tokio::fs::File>;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
tokio::fs::File::create_new(path)
.await
.map(TokioAsyncReadCompatExt::compat)
}
}
Loading

0 comments on commit 596f286

Please sign in to comment.