Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl Version & VersionSet #5

Merged
merged 10 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tokio = ["dep:tokio"]
arrow = "52"
async-lock = "3"
crossbeam-skiplist = "0.1"
flume = { version = "0.11.0", features = ["async"] }
KKould marked this conversation as resolved.
Show resolved Hide resolved
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand All @@ -20,6 +21,8 @@ pin-project-lite = "0.2"
thiserror = "1"
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
ulid = "1"

[dev-dependencies]
tempfile = "3"
Expand Down
22 changes: 17 additions & 5 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::future::Future;

pub trait Executor {
use crate::fs::Fs;

pub trait Executor: Fs {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
Expand All @@ -10,15 +12,25 @@ pub trait Executor {
pub mod tokio {
use std::future::Future;

use tokio::runtime::Handle;

use super::Executor;

pub struct TokioExecutor {
tokio: tokio::runtime::Runtime,
handle: Handle,
}

impl Default for TokioExecutor {
fn default() -> Self {
Self::new()
}
}

impl TokioExecutor {
pub fn new(tokio: tokio::runtime::Runtime) -> Self {
Self { tokio }
pub fn new() -> Self {
Self {
handle: Handle::current(),
}
}
}

Expand All @@ -27,7 +39,7 @@ pub mod tokio {
where
F: Future<Output = ()> + Send + 'static,
{
self.tokio.spawn(future);
self.handle.spawn(future);
}
}
}
41 changes: 37 additions & 4 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,49 @@
#[cfg(any(feature = "tokio", test))]
pub mod tokio;

use std::{future::Future, io, path::Path};
use std::{
fmt::{Display, Formatter},
future::Future,
io,
path::Path,
};

use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use ulid::Ulid;

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}
pub(crate) type FileId = Ulid;

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}
pub enum FileType {
KKould marked this conversation as resolved.
Show resolved Hide resolved
WAL,
PARQUET,
LOG,
}

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {
fn to_file(self) -> Box<dyn AsyncFile>
where
Self: Sized,
{
Box::new(self) as Box<dyn AsyncFile>
}
}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

pub trait Fs {
type File: AsyncFile;

fn open(&self, path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;
fn open(path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;

fn remove(path: impl AsRef<Path>) -> impl Future<Output = io::Result<()>>;
}

impl Display for FileType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FileType::WAL => write!(f, "wal"),
FileType::PARQUET => write!(f, "parquet"),
FileType::LOG => write!(f, "log"),
}
}
}
16 changes: 10 additions & 6 deletions src/fs/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::{io, path::Path};

use tokio::fs::{remove_file, File};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

use super::Fs;
use crate::executor::tokio::TokioExecutor;

pub struct TokioFs;
impl Fs for TokioExecutor {
type File = Compat<File>;

impl Fs for TokioFs {
type File = Compat<tokio::fs::File>;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
tokio::fs::File::create_new(path)
async fn open(path: impl AsRef<Path>) -> io::Result<Self::File> {
File::create_new(path)
.await
.map(TokioAsyncReadCompatExt::compat)
}

async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
remove_file(path).await
}
}
8 changes: 4 additions & 4 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::{
pub trait ArrowArrays: Sized {
type Record: Record;

type Buider: Builder<Self>;
type Builder: Builder<Self>;

fn builder(capacity: usize) -> Self::Buider;
fn builder(capacity: usize) -> Self::Builder;

fn get(&self, offset: u32) -> Option<Option<<Self::Record as Record>::Ref<'_>>>;

Expand Down Expand Up @@ -167,9 +167,9 @@ pub(crate) mod tests {
impl ArrowArrays for TestImmutableArrays {
type Record = Test;

type Buider = TestBuilder;
type Builder = TestBuilder;

fn builder(capacity: usize) -> Self::Buider {
fn builder(capacity: usize) -> Self::Builder {
TestBuilder {
vstring: StringBuilder::with_capacity(capacity, 0),
vu32: PrimitiveBuilder::<UInt32Type>::with_capacity(capacity),
Expand Down
2 changes: 1 addition & 1 deletion src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where

#[cfg(test)]
mod tests {
use std::collections::Bound;
use std::ops::Bound;

use super::Mutable;
use crate::{
Expand Down
94 changes: 84 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#![allow(dead_code)]
pub(crate) mod arrows;
mod executor;
pub mod executor;
pub mod fs;
mod inmem;
mod ondisk;
mod oracle;
mod record;
mod scope;
pub mod serdes;
mod stream;
mod transaction;
mod version;

use std::{collections::VecDeque, io, mem, ops::Bound, sync::Arc};
use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};

use async_lock::{RwLock, RwLockReadGuard};
use futures_core::Stream;
Expand All @@ -18,31 +23,99 @@ use inmem::{immutable::Immutable, mutable::Mutable};
use oracle::Timestamp;
use parquet::errors::ParquetError;
use record::Record;
use stream::{merge::MergeStream, Entry, ScanStream};

pub struct DB<R>
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry},
version::Version,
};

#[derive(Debug)]
pub struct DbOption {
pub path: PathBuf,
pub max_mem_table_size: usize,
pub immutable_chunk_num: usize,
pub major_threshold_with_sst_size: usize,
pub level_sst_magnification: usize,
pub max_sst_file_size: usize,
pub clean_channel_buffer: usize,
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R>>>,
_p: PhantomData<E>,
}

impl<R> Default for DB<R>
impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
path: path.into(),
max_mem_table_size: 8 * 1024 * 1024,
immutable_chunk_num: 3,
major_threshold_with_sst_size: 10,
level_sst_magnification: 10,
max_sst_file_size: 24 * 1024 * 1024,
clean_channel_buffer: 10,
}
}

pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::PARQUET))
}

pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::WAL))
}

pub(crate) fn version_path(&self) -> PathBuf {
self.path.join(format!("version.{}", FileType::LOG))
}

pub(crate) fn is_threshold_exceeded_major<R, E>(
&self,
version: &Version<R, E>,
level: usize,
) -> bool
where
R: Record,
E: Executor,
{
Version::<R, E>::tables_len(version, level)
>= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32))
}
}

impl<R, E> Default for DB<R, E>
where
R: Record,
E: Executor,
{
fn default() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
}
}

impl<R> DB<R>
impl<R, E> DB<R, E>
where
R: Record + Send + Sync,
R::Key: Send,
E: Executor,
{
pub fn empty() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
let columns = self.schema.read().await;
columns.write(record, ts).await
Expand Down Expand Up @@ -88,7 +161,7 @@ where
impl<R> Schema<R>
where
R: Record + Send + Sync,
R::Key: Send + Sync,
R::Key: Send,
{
async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
self.mutable.insert(record, ts);
Expand All @@ -113,7 +186,7 @@ where
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError> {
let mut streams = Vec::<ScanStream<R>>::with_capacity(self.immutables.len() + 1);
let mut streams = Vec::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down Expand Up @@ -141,6 +214,7 @@ pub(crate) mod tests {
use once_cell::sync::Lazy;

use crate::{
executor::Executor,
inmem::immutable::tests::TestImmutableArrays,
record::{internal::InternalRecordRef, RecordRef},
Record, DB,
Expand Down Expand Up @@ -227,8 +301,8 @@ pub(crate) mod tests {
}
}

pub(crate) async fn get_test_record_batch() -> RecordBatch {
let db = DB::default();
pub(crate) async fn get_test_record_batch<E: Executor>() -> RecordBatch {
let db: DB<Test, E> = DB::empty();

db.write(
Test {
Expand Down
8 changes: 3 additions & 5 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@ use crate::{

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R> {
pub struct SsTableScan<R>
{
#[pin]
stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R> SsTableScan<R>
where
R: Record,
{
impl<R> SsTableScan<R> {
pub fn new(stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>) -> Self {
SsTableScan { stream, iter: None }
}
Expand Down
Loading
Loading