Skip to content

Commit

Permalink
feat: impl Version & VersionSet
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould authored and ethe committed Jul 16, 2024
1 parent 2601443 commit 9dcb157
Show file tree
Hide file tree
Showing 25 changed files with 1,187 additions and 90 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ arrow = "52"
async-lock = "3"
crossbeam-skiplist = "0.1"
futures-core = "0.3"
futures-channel = "0.3"
futures-executor = "0.3"
futures-io = "0.3"
futures-util = "0.3"
once_cell = "1"
Expand All @@ -20,6 +22,8 @@ pin-project-lite = "0.2"
thiserror = "1"
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
ulid = "1"

[dev-dependencies]
tempfile = "3"
Expand Down
16 changes: 11 additions & 5 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::future::Future;

pub trait Executor {
use crate::fs::Fs;

pub trait Executor: Fs {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
Expand All @@ -10,15 +12,19 @@ pub trait Executor {
pub mod tokio {
use std::future::Future;

use tokio::runtime::Handle;

use super::Executor;

pub struct TokioExecutor {
tokio: tokio::runtime::Runtime,
handle: Handle,
}

impl TokioExecutor {
pub fn new(tokio: tokio::runtime::Runtime) -> Self {
Self { tokio }
pub fn new() -> Self {
Self {
handle: Handle::current(),
}
}
}

Expand All @@ -27,7 +33,7 @@ pub mod tokio {
where
F: Future<Output = ()> + Send + 'static,
{
self.tokio.spawn(future);
self.handle.spawn(future);
}
}
}
34 changes: 30 additions & 4 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
#[cfg(any(feature = "tokio", test))]
pub mod tokio;

use std::{future::Future, io, path::Path};
use std::{
fmt::{Display, Formatter},
future::Future,
io,
path::Path,
};

use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use ulid::Ulid;

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}
pub(crate) type FileId = Ulid;

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}
pub enum FileType {
WAL,
PARQUET,
LOG,
}

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

pub trait Fs {
type File: AsyncFile;

fn open(&self, path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;
fn open(path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;

fn remove(path: impl AsRef<Path>) -> impl Future<Output = io::Result<()>>;
}

impl Display for FileType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FileType::WAL => write!(f, "wal"),
FileType::PARQUET => write!(f, "parquet"),
FileType::LOG => write!(f, "log"),
}
}
}
16 changes: 10 additions & 6 deletions src/fs/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::{io, path::Path};

use tokio::fs::{remove_file, File};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

use super::Fs;
use crate::executor::tokio::TokioExecutor;

pub struct TokioFs;
impl Fs for TokioExecutor {
type File = Compat<File>;

impl Fs for TokioFs {
type File = Compat<tokio::fs::File>;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
tokio::fs::File::create_new(path)
async fn open(path: impl AsRef<Path>) -> io::Result<Self::File> {
File::create_new(path)
.await
.map(TokioAsyncReadCompatExt::compat)
}

async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
remove_file(path).await
}
}
8 changes: 4 additions & 4 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::{
pub trait ArrowArrays: Sized {
type Record: Record;

type Buider: Builder<Self>;
type Builder: Builder<Self>;

fn builder(capacity: usize) -> Self::Buider;
fn builder(capacity: usize) -> Self::Builder;

fn get(&self, offset: u32) -> Option<Option<<Self::Record as Record>::Ref<'_>>>;

Expand Down Expand Up @@ -167,9 +167,9 @@ pub(crate) mod tests {
impl ArrowArrays for TestImmutableArrays {
type Record = Test;

type Buider = TestBuilder;
type Builder = TestBuilder;

fn builder(capacity: usize) -> Self::Buider {
fn builder(capacity: usize) -> Self::Builder {
TestBuilder {
vstring: StringBuilder::with_capacity(capacity, 0),
vu32: PrimitiveBuilder::<UInt32Type>::with_capacity(capacity),
Expand Down
2 changes: 1 addition & 1 deletion src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where

#[cfg(test)]
mod tests {
use std::collections::Bound;
use std::ops::Bound;

use super::Mutable;
use crate::{
Expand Down
110 changes: 95 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#![allow(dead_code)]
pub(crate) mod arrows;
mod executor;
pub mod executor;
pub mod fs;
mod inmem;
mod ondisk;
mod oracle;
mod record;
mod scope;
pub mod serdes;
mod stream;
mod transaction;
mod version;

use std::{collections::VecDeque, io, mem, ops::Bound, sync::Arc};
use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};

use async_lock::{RwLock, RwLockReadGuard};
use futures_core::Stream;
Expand All @@ -18,31 +23,99 @@ use inmem::{immutable::Immutable, mutable::Mutable};
use oracle::Timestamp;
use parquet::errors::ParquetError;
use record::Record;
use stream::{merge::MergeStream, Entry, ScanStream};

pub struct DB<R>
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry, ScanStream},
version::Version,
};

#[derive(Debug)]
pub struct DbOption {
pub path: PathBuf,
pub max_mem_table_size: usize,
pub immutable_chunk_num: usize,
pub major_threshold_with_sst_size: usize,
pub level_sst_magnification: usize,
pub max_sst_file_size: usize,
pub clean_channel_buffer: usize,
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R>>>,
_p: PhantomData<E>,
}

impl<R> Default for DB<R>
impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
path: path.into(),
max_mem_table_size: 8 * 1024 * 1024,
immutable_chunk_num: 3,
major_threshold_with_sst_size: 10,
level_sst_magnification: 10,
max_sst_file_size: 24 * 1024 * 1024,
clean_channel_buffer: 10,
}
}

pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::PARQUET))
}

pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::WAL))
}

pub(crate) fn version_path(&self) -> PathBuf {
self.path.join(format!("version.{}", FileType::LOG))
}

pub(crate) fn is_threshold_exceeded_major<R, E>(
&self,
version: &Version<R, E>,
level: usize,
) -> bool
where
R: Record,
E: Executor,
{
Version::<R, E>::tables_len(version, level)
>= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32))
}
}

impl<R, E> Default for DB<R, E>
where
R: Record,
E: Executor,
{
fn default() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
}
}

impl<R> DB<R>
impl<R, E> DB<R, E>
where
R: Record + Send + Sync,
R::Key: Send,
R::Key: Send + Sync,
E: Executor,
{
pub fn empty() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
let columns = self.schema.read().await;
columns.write(record, ts).await
Expand Down Expand Up @@ -95,25 +168,31 @@ where
Ok(())
}

async fn get<'get>(
async fn get<'get, E>(
&'get self,
key: &'get R::Key,
ts: Timestamp,
) -> Result<Option<Entry<'get, R>>, ParquetError> {
self.scan(Bound::Included(key), Bound::Unbounded, ts)
) -> Result<Option<Entry<'get, R>>, ParquetError>
where
E: Executor,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan>(
async fn scan<'scan, E>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError> {
let mut streams = Vec::<ScanStream<R>>::with_capacity(self.immutables.len() + 1);
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down Expand Up @@ -141,6 +220,7 @@ pub(crate) mod tests {
use once_cell::sync::Lazy;

use crate::{
executor::Executor,
inmem::immutable::tests::TestImmutableArrays,
record::{internal::InternalRecordRef, RecordRef},
Record, DB,
Expand Down Expand Up @@ -227,8 +307,8 @@ pub(crate) mod tests {
}
}

pub(crate) async fn get_test_record_batch() -> RecordBatch {
let db = DB::default();
pub(crate) async fn get_test_record_batch<E: Executor>() -> RecordBatch {
let db: DB<Test, E> = DB::empty();

db.write(
Test {
Expand Down
18 changes: 11 additions & 7 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,36 @@ use pin_project_lite::pin_project;
use tokio_util::compat::Compat;

use crate::{
fs::AsyncFile,
executor::Executor,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R> {
pub struct SsTableScan<R, E>
where
E: Executor
{
#[pin]
stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>,
stream: ParquetRecordBatchStream<Compat<E::File>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R> SsTableScan<R>
impl<R, E> SsTableScan<R, E>
where
R: Record,
E: Executor,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>) -> Self {
pub fn new(stream: ParquetRecordBatchStream<Compat<E::File>>) -> Self {
SsTableScan { stream, iter: None }
}
}

impl<R> Stream for SsTableScan<R>
impl<R, E> Stream for SsTableScan<R, E>
where
R: Record,
E: Executor,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

Expand Down
Loading

0 comments on commit 9dcb157

Please sign in to comment.