Skip to content

Commit

Permalink
chore: support transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 18, 2024
1 parent 6cd34f1 commit 808bb08
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 89 deletions.
69 changes: 39 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};

use async_lock::{RwLock, RwLockReadGuard};
use async_lock::RwLock;
use fs::FileProvider;
use futures_core::Stream;
use futures_util::StreamExt;
use inmem::{immutable::Immutable, mutable::Mutable};
use oracle::Timestamp;
use oracle::{Oracle, Timestamp};
use parquet::errors::ParquetError;
use record::Record;
use transaction::Transaction;

use crate::{
executor::Executor,
Expand All @@ -42,15 +44,6 @@ pub struct DbOption {
pub clean_channel_buffer: usize,
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R>>>,
_p: PhantomData<E>,
}

impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
Expand Down Expand Up @@ -90,6 +83,16 @@ impl DbOption {
}
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R, E>>>,
oracle: Oracle<R::Key>,
_p: PhantomData<E>,
}

impl<R, E> Default for DB<R, E>
where
R: Record,
Expand All @@ -99,6 +102,7 @@ where
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
oracle: Oracle::default(),
}
}
}
Expand All @@ -109,15 +113,16 @@ where
E: Executor,
{
pub fn empty() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
Self::default()
}

pub async fn transaction(&self) -> Transaction<'_, R, E> {
Transaction::new(&self.oracle, self.schema.read().await)
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
let columns = self.schema.read().await;
columns.write(record, ts).await
let schema = self.schema.read().await;
schema.write(record, ts).await
}

pub(crate) async fn write_batch(
Expand All @@ -131,66 +136,70 @@ where
}
Ok(())
}

pub(crate) async fn read(&self) -> RwLockReadGuard<'_, Schema<R>> {
self.schema.read().await
}
}

pub(crate) struct Schema<R>
pub(crate) struct Schema<R, FP>
where
R: Record,
{
mutable: Mutable<R>,
immutables: VecDeque<Immutable<R::Columns>>,
_marker: PhantomData<FP>,
}

impl<R> Default for Schema<R>
impl<R, FP> Default for Schema<R, FP>
where
R: Record,
{
fn default() -> Self {
Self {
mutable: Mutable::default(),
immutables: VecDeque::default(),
_marker: Default::default(),
}
}
}

impl<R> Schema<R>
impl<R, FP> Schema<R, FP>
where
R: Record + Send,
FP: FileProvider,
{
async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
self.mutable.insert(record, ts);
Ok(())
}

async fn get<'get, E>(
async fn remove(&self, key: R::Key, ts: Timestamp) -> io::Result<()> {
self.mutable.remove(key, ts);
Ok(())
}

async fn get<'get>(
&'get self,
key: &'get R::Key,
ts: Timestamp,
) -> Result<Option<Entry<'get, R>>, ParquetError>
where
E: Executor,
FP: FileProvider,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
self.scan(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan, E>(
async fn scan<'scan>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor,
FP: FileProvider,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
let mut streams = Vec::<ScanStream<R, FP>>::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down
18 changes: 9 additions & 9 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,37 @@ use parquet::arrow::async_reader::ParquetRecordBatchStream;
use pin_project_lite::pin_project;
use tokio_util::compat::Compat;

use crate::fs::FileProvider;
use crate::{
executor::Executor,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R, E>
pub struct SsTableScan<R, FP>
where
E: Executor,
FP: FileProvider,
{
#[pin]
stream: ParquetRecordBatchStream<Compat<E::File>>,
stream: ParquetRecordBatchStream<Compat<FP::File>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R, E> SsTableScan<R, E>
impl<R, FP> SsTableScan<R, FP>
where
E: Executor,
FP: FileProvider,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<E::File>>) -> Self {
pub fn new(stream: ParquetRecordBatchStream<Compat<FP::File>>) -> Self {
SsTableScan { stream, iter: None }
}
}

impl<R, E> Stream for SsTableScan<R, E>
impl<R, FP> Stream for SsTableScan<R, FP>
where
R: Record,
E: Executor,
FP: FileProvider,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

Expand Down
13 changes: 7 additions & 6 deletions src/oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<K> Oracle<K>
where
K: Eq + Hash + Clone,
{
fn start_read(&self) -> Timestamp {
pub(crate) fn start_read(&self) -> Timestamp {
let mut in_read = self.in_read.lock().unwrap();
let now = self.now.load(Ordering::Relaxed).into();
match in_read.entry(now) {
Expand All @@ -75,7 +75,7 @@ where
now
}

fn read_commit(&self, ts: Timestamp) {
pub(crate) fn read_commit(&self, ts: Timestamp) {
match self.in_read.lock().unwrap().entry(ts) {
Entry::Vacant(_) => panic!("commit non-existing read"),
Entry::Occupied(mut o) => match o.get_mut() {
Expand All @@ -89,27 +89,28 @@ where
}
}

fn start_write(&self) -> Timestamp {
pub(crate) fn start_write(&self) -> Timestamp {
(self.now.fetch_add(1, Ordering::Relaxed) + 1).into()
}

fn write_commit(
pub(crate) fn write_commit(
&self,
read_at: Timestamp,
write_at: Timestamp,
in_write: HashSet<K>,
) -> Result<(), WriteConflict<K>> {
let mut committed_txns = self.committed_txns.lock().unwrap();
let conflicts: Vec<_> = committed_txns
let conflicts = committed_txns
.range((Bound::Excluded(read_at), Bound::Excluded(write_at)))
.flat_map(|(_, txn)| txn.intersection(&in_write))
.cloned()
.collect();
.collect::<Vec<_>>();

if !conflicts.is_empty() {
return Err(WriteConflict { keys: conflicts });
}

// TODO: clean committed transactions
committed_txns.insert(write_at, in_write);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/oracle/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{borrow::Borrow, cmp::Ordering, marker::PhantomData, mem::transmute};
use crate::oracle::Timestamp;

#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct Timestamped<V> {
pub struct Timestamped<V> {
pub(crate) ts: Timestamp,
pub(crate) value: V,
}
Expand Down
4 changes: 2 additions & 2 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub(crate) mod internal;
mod str;

use std::sync::Arc;
use std::{hash::Hash, sync::Arc};

use arrow::{
array::{Datum, RecordBatch},
Expand All @@ -14,7 +14,7 @@ use crate::{
serdes::{Decode, Encode},
};

pub trait Key: 'static + Encode + Decode + Ord + Clone + Send {
pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Hash + std::fmt::Debug {
type Ref<'r>: KeyRef<'r, Key = Self> + Copy
where
Self: 'r;
Expand Down
19 changes: 10 additions & 9 deletions src/stream/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,28 @@ use futures_util::stream::StreamExt;
use pin_project_lite::pin_project;

use super::{Entry, ScanStream};
use crate::{executor::Executor, record::Record};
use crate::fs::FileProvider;
use crate::record::Record;

pin_project! {
pub(crate) struct MergeStream<'merge, R, E>
pub(crate) struct MergeStream<'merge, R, FP>
where
R: Record,
E: Executor,
FP: FileProvider,
{
streams: Vec<ScanStream<'merge, R, E>>,
streams: Vec<ScanStream<'merge, R, FP>>,
peeked: BinaryHeap<CmpEntry<'merge, R>>,
buf: Option<Entry<'merge, R>>,
}
}

impl<'merge, R, E> MergeStream<'merge, R, E>
impl<'merge, R, FP> MergeStream<'merge, R, FP>
where
R: Record,
E: Executor,
FP: FileProvider,
{
pub(crate) async fn from_vec(
mut streams: Vec<ScanStream<'merge, R, E>>,
mut streams: Vec<ScanStream<'merge, R, FP>>,
) -> Result<Self, parquet::errors::ParquetError> {
let mut peeked = BinaryHeap::with_capacity(streams.len());

Expand All @@ -51,10 +52,10 @@ where
}
}

impl<'merge, R, E> Stream for MergeStream<'merge, R, E>
impl<'merge, R, FP> Stream for MergeStream<'merge, R, FP>
where
R: Record,
E: Executor,
FP: FileProvider,
{
type Item = Result<Entry<'merge, R>, parquet::errors::ParquetError>;

Expand Down
Loading

0 comments on commit 808bb08

Please sign in to comment.