Skip to content

Commit

Permalink
chore: support transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Jul 22, 2024
1 parent 96947e1 commit e87133f
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
FP: FileProvider,
{
pub(crate) option: Arc<DbOption>,
pub(crate) schema: Arc<RwLock<Schema<R>>>,
pub(crate) schema: Arc<RwLock<Schema<R, FP>>>,
pub(crate) version_set: VersionSet<R, FP>,
}

Expand All @@ -40,7 +40,7 @@ where
FP: FileProvider,
{
pub(crate) fn new(
schema: Arc<RwLock<Schema<R>>>,
schema: Arc<RwLock<Schema<R, FP>>>,
option: Arc<DbOption>,
version_set: VersionSet<R, FP>,
) -> Self {
Expand Down
68 changes: 38 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};

use async_lock::{RwLock, RwLockReadGuard};
use async_lock::RwLock;
use fs::FileProvider;
use futures_core::Stream;
use futures_util::StreamExt;
use inmem::{immutable::Immutable, mutable::Mutable};
use oracle::Timestamp;
use oracle::{Oracle, Timestamp};
use parquet::{errors::ParquetError, file::properties::WriterProperties};
use record::Record;
use transaction::Transaction;

use crate::{
executor::Executor,
Expand All @@ -45,15 +46,6 @@ pub struct DbOption {
pub write_parquet_option: Option<WriterProperties>,
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R>>>,
_p: PhantomData<E>,
}

impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
Expand Down Expand Up @@ -94,6 +86,16 @@ impl DbOption {
}
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R, E>>>,
oracle: Oracle<R::Key>,
_p: PhantomData<E>,
}

impl<R, E> Default for DB<R, E>
where
R: Record,
Expand All @@ -103,6 +105,7 @@ where
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
oracle: Oracle::default(),
}
}
}
Expand All @@ -113,15 +116,16 @@ where
E: Executor,
{
pub fn empty() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
Self::default()
}

pub async fn transaction(&self) -> Transaction<'_, R, E> {
Transaction::new(&self.oracle, self.schema.read().await)
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
let columns = self.schema.read().await;
columns.write(record, ts).await
let schema = self.schema.read().await;
schema.write(record, ts).await
}

pub(crate) async fn write_batch(
Expand All @@ -135,66 +139,70 @@ where
}
Ok(())
}

pub(crate) async fn read(&self) -> RwLockReadGuard<'_, Schema<R>> {
self.schema.read().await
}
}

pub(crate) struct Schema<R>
pub(crate) struct Schema<R, FP>
where
R: Record,
{
mutable: Mutable<R>,
immutables: VecDeque<Immutable<R::Columns>>,
_marker: PhantomData<FP>,
}

impl<R> Default for Schema<R>
impl<R, FP> Default for Schema<R, FP>
where
R: Record,
{
fn default() -> Self {
Self {
mutable: Mutable::default(),
immutables: VecDeque::default(),
_marker: Default::default(),
}
}
}

impl<R> Schema<R>
impl<R, FP> Schema<R, FP>
where
R: Record + Send,
FP: FileProvider,
{
async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
self.mutable.insert(record, ts);
Ok(())
}

async fn get<'get, E>(
async fn remove(&self, key: R::Key, ts: Timestamp) -> io::Result<()> {
self.mutable.remove(key, ts);
Ok(())
}

async fn get<'get>(
&'get self,
key: &'get R::Key,
ts: Timestamp,
) -> Result<Option<Entry<'get, R>>, ParquetError>
where
E: Executor + 'get,
FP: FileProvider,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
self.scan(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan, E>(
async fn scan<'scan>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor + 'scan,
FP: FileProvider,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
let mut streams = Vec::<ScanStream<R, FP>>::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down
13 changes: 7 additions & 6 deletions src/oracle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<K> Oracle<K>
where
K: Eq + Hash + Clone,
{
fn start_read(&self) -> Timestamp {
pub(crate) fn start_read(&self) -> Timestamp {
let mut in_read = self.in_read.lock().unwrap();
let now = self.now.load(Ordering::Relaxed).into();
match in_read.entry(now) {
Expand All @@ -75,7 +75,7 @@ where
now
}

fn read_commit(&self, ts: Timestamp) {
pub(crate) fn read_commit(&self, ts: Timestamp) {
match self.in_read.lock().unwrap().entry(ts) {
Entry::Vacant(_) => panic!("commit non-existing read"),
Entry::Occupied(mut o) => match o.get_mut() {
Expand All @@ -89,27 +89,28 @@ where
}
}

fn start_write(&self) -> Timestamp {
pub(crate) fn start_write(&self) -> Timestamp {
(self.now.fetch_add(1, Ordering::Relaxed) + 1).into()
}

fn write_commit(
pub(crate) fn write_commit(
&self,
read_at: Timestamp,
write_at: Timestamp,
in_write: HashSet<K>,
) -> Result<(), WriteConflict<K>> {
let mut committed_txns = self.committed_txns.lock().unwrap();
let conflicts: Vec<_> = committed_txns
let conflicts = committed_txns
.range((Bound::Excluded(read_at), Bound::Excluded(write_at)))
.flat_map(|(_, txn)| txn.intersection(&in_write))
.cloned()
.collect();
.collect::<Vec<_>>();

if !conflicts.is_empty() {
return Err(WriteConflict { keys: conflicts });
}

// TODO: clean committed transactions
committed_txns.insert(write_at, in_write);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/oracle/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use crate::{oracle::Timestamp, serdes::Encode};

#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct Timestamped<V> {
pub struct Timestamped<V> {
pub(crate) ts: Timestamp,
pub(crate) value: V,
}
Expand Down
4 changes: 2 additions & 2 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub(crate) mod internal;
mod str;

use std::{fmt::Debug, sync::Arc};
use std::{hash::Hash, sync::Arc};

use arrow::{
array::{Datum, RecordBatch},
Expand All @@ -14,7 +14,7 @@ use crate::{
serdes::{Decode, Encode},
};

pub trait Key: 'static + Debug + Encode + Decode + Ord + Clone + Send {
pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Hash + std::fmt::Debug {
type Ref<'r>: KeyRef<'r, Key = Self> + Copy
where
Self: 'r;
Expand Down
10 changes: 0 additions & 10 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,6 @@ where
}
}

impl<'scan, R, FP> From<LevelStream<'scan, R, FP>> for ScanStream<'scan, R, FP>
where
R: Record,
FP: FileProvider,
{
fn from(inner: LevelStream<'scan, R, FP>) -> Self {
ScanStream::Level { inner }
}
}

impl<R, FP> fmt::Debug for ScanStream<'_, R, FP>
where
R: Record,
Expand Down
Loading

0 comments on commit e87133f

Please sign in to comment.