diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 0fd6dfd1..509a69a6 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -5,7 +5,6 @@ use std::{ }; use arrow::array::RecordBatch; -use futures_util::stream::{self, Iter}; use super::mutable::Mutable; use crate::{ diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 116091be..28b66eed 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -57,26 +57,29 @@ where R: Record + Send + Sync, R::Key: Send, { - pub(crate) fn insert(&self, record: Timestamped) { - let (record, ts) = record.into_parts(); + pub(crate) fn insert(&self, record: R, ts: Timestamp) { self.data // TODO: remove key cloning .insert(Timestamped::new(record.key().to_key(), ts), Some(record)); } - pub(crate) fn remove(&self, key: Timestamped) { - self.data.insert(key, None); + pub(crate) fn remove(&self, key: R::Key, ts: Timestamp) { + self.data.insert(Timestamped::new(key, ts), None); } fn get( &self, - key: &TimestampedRef, + key: &R::Key, + ts: Timestamp, ) -> Option, Option>> { self.data - .range::, _>((Bound::Included(key), Bound::Unbounded)) + .range::, _>(( + Bound::Included(TimestampedRef::new(key, ts)), + Bound::Unbounded, + )) .next() .and_then(|entry| { - if &entry.key().value == key.value() { + if &entry.key().value == key { Some(entry) } else { None @@ -111,9 +114,11 @@ where #[cfg(test)] mod tests { + use std::ops::Bound; + use super::Mutable; use crate::{ - oracle::timestamp::{Timestamped, TimestampedRef}, + oracle::timestamp::Timestamped, record::Record, tests::{Test, TestRef}, }; @@ -125,26 +130,24 @@ mod tests { let mem_table = Mutable::default(); - mem_table.insert(Timestamped::new( + mem_table.insert( Test { vstring: key_1.clone(), vu32: 1, vobool: Some(true), }, 0_u32.into(), - )); - mem_table.insert(Timestamped::new( + ); + mem_table.insert( Test { vstring: key_2.clone(), vu32: 2, vobool: None, }, 1_u32.into(), - )); + ); - let entry = mem_table - .get(TimestampedRef::new(&key_1, 0_u32.into())) - .unwrap(); + let entry = mem_table.get(&key_1, 0_u32.into()).unwrap(); assert_eq!( entry.value().as_ref().unwrap().as_record_ref(), TestRef { @@ -154,4 +157,46 @@ mod tests { } ) } + + #[test] + fn range() { + let mutable = Mutable::::new(); + + mutable.insert("1".into(), 0_u32.into()); + mutable.insert("2".into(), 0_u32.into()); + mutable.insert("2".into(), 1_u32.into()); + mutable.insert("3".into(), 1_u32.into()); + mutable.insert("4".into(), 0_u32.into()); + + let mut scan = mutable.scan((Bound::Unbounded, Bound::Unbounded), 0_u32.into()); + + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("1".into(), 0_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 0_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("4".into(), 0_u32.into()) + ); + + let lower = "1".to_string(); + let upper = "4".to_string(); + let mut scan = mutable.scan( + (Bound::Included(&lower), Bound::Included(&upper)), + 1_u32.into(), + ); + + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("2".into(), 1_u32.into()) + ); + assert_eq!( + scan.next().unwrap().key(), + &Timestamped::new("3".into(), 1_u32.into()) + ); + } } diff --git a/src/lib.rs b/src/lib.rs index b93db048..d1f22fe6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,113 +9,59 @@ mod record; mod stream; mod transaction; -use std::{ - any::TypeId, - collections::{hash_map::Entry, HashMap, VecDeque}, - io, mem, - ops::Bound, - sync::Arc, -}; - -use async_lock::RwLock; +use std::{collections::VecDeque, io, mem, ops::Bound, sync::Arc}; + +use async_lock::{RwLock, RwLockReadGuard}; +use futures_core::Stream; +use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; -use oracle::{timestamp::Timestamped, Timestamp}; +use oracle::Timestamp; +use parquet::errors::ParquetError; use record::Record; +use stream::{merge::MergeStream, Entry, ScanStream}; -#[derive(Debug)] -pub struct DB { - schemas: std::sync::RwLock>, +pub struct DB +where + R: Record, +{ + schema: Arc>>, } -impl DB { - pub fn empty() -> Self { +impl Default for DB +where + R: Record, +{ + fn default() -> Self { Self { - schemas: std::sync::RwLock::new(HashMap::new()), + schema: Arc::new(RwLock::new(Schema::default())), } } +} - pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> - where - R: Record + Send + Sync, - R::Key: Send, - { - let columns = self.get_schema::(); - let columns = columns.read().await; +impl DB +where + R: Record + Send + Sync, + R::Key: Send, +{ + pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { + let columns = self.schema.read().await; columns.write(record, ts).await } - pub(crate) async fn write_batch( + pub(crate) async fn write_batch( &self, records: impl Iterator, ts: Timestamp, - ) -> io::Result<()> - where - R: Record + Send + Sync, - R::Key: Send, - { - let columns = self.get_schema::(); - let columns = columns.read().await; + ) -> io::Result<()> { + let columns = self.schema.read().await; for record in records { columns.write(record, ts).await?; } Ok(()) } - pub(crate) async fn get(&self, key: Timestamped) -> io::Result> { - let columns = self.get_schema::(); - let columns = columns.read().await; - // columns.get(key, ts).await - todo!() - } - - pub async fn range_scan(&self, start: Bound<&T::Key>, end: Bound<&T::Key>) {} - - fn get_schema(&self) -> Arc>> - where - R: Record, - { - let schemas = self.schemas.read().unwrap(); - match schemas.get(&TypeId::of::()) { - Some(schema) => { - let inner = unsafe { Arc::from_raw(*schema as *const RwLock>) }; - let schema = inner.clone(); - std::mem::forget(inner); - schema - } - None => { - drop(schemas); - let mut schemas = self.schemas.write().unwrap(); - match schemas.entry(TypeId::of::()) { - Entry::Occupied(o) => unsafe { - let inner = Arc::from_raw(*o.get() as *const RwLock>); - let schema = inner.clone(); - std::mem::forget(inner); - schema - }, - Entry::Vacant(v) => { - let schema = Schema { - mutable: Mutable::new(), - immutables: VecDeque::new(), - }; - let columns = Arc::new(RwLock::new(schema)); - v.insert(Arc::into_raw(columns.clone()) as *const ()); - columns - } - } - } - } - } -} - -impl Drop for DB { - fn drop(&mut self) { - self.schemas - .write() - .unwrap() - .values() - .for_each(|schema| unsafe { - Arc::from_raw(*schema as *const RwLock<()>); - }); + pub(crate) async fn read(&self) -> RwLockReadGuard<'_, Schema> { + self.schema.read().await } } @@ -127,16 +73,56 @@ where immutables: VecDeque>, } +impl Default for Schema +where + R: Record, +{ + fn default() -> Self { + Self { + mutable: Mutable::default(), + immutables: VecDeque::default(), + } + } +} + impl Schema where R: Record + Send + Sync, R::Key: Send + Sync, { async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { - self.mutable.insert(Timestamped::new(record, ts)); + self.mutable.insert(record, ts); Ok(()) } + async fn get<'get>( + &'get self, + key: &'get R::Key, + ts: Timestamp, + ) -> Result>, ParquetError> { + self.scan(Bound::Included(key), Bound::Unbounded, ts) + .await? + .next() + .await + .transpose() + } + + async fn scan<'scan>( + &'scan self, + lower: Bound<&'scan R::Key>, + uppwer: Bound<&'scan R::Key>, + ts: Timestamp, + ) -> Result, ParquetError>>, ParquetError> { + let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); + streams.push(self.mutable.scan((lower, uppwer), ts).into()); + for immutable in &self.immutables { + streams.push(immutable.scan((lower, uppwer), ts).into()); + } + // TODO: sstable scan + + MergeStream::from_vec(streams).await + } + fn freeze(&mut self) { let mutable = mem::replace(&mut self.mutable, Mutable::new()); let immutable = Immutable::from(mutable); @@ -242,7 +228,7 @@ pub(crate) mod tests { } pub(crate) async fn get_test_record_batch() -> RecordBatch { - let db = DB::empty(); + let db = DB::default(); db.write( Test { @@ -265,9 +251,7 @@ pub(crate) mod tests { .await .unwrap(); - let schema = db.get_schema::(); - - let mut schema = schema.write().await; + let mut schema = db.schema.write().await; schema.freeze(); diff --git a/src/stream/merge.rs b/src/stream/merge.rs index e942a1b5..38f823cb 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -27,10 +27,9 @@ impl<'merge, R> MergeStream<'merge, R> where R: Record, { - async fn from_iter>>( - iter: T, + pub(crate) async fn from_vec( + mut streams: Vec>, ) -> Result { - let mut streams = iter.into_iter().collect::>(); let mut peeked = BinaryHeap::with_capacity(streams.len()); for stream in &mut streams { @@ -136,27 +135,27 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::{inmem::mutable::Mutable, oracle::timestamp::Timestamped}; + use crate::inmem::mutable::Mutable; #[tokio::test] async fn merge_mutable() { let m1 = Mutable::::new(); - m1.remove(Timestamped::new("b".into(), 3.into())); - m1.insert(Timestamped::new("c".into(), 4.into())); - m1.insert(Timestamped::new("d".into(), 5.into())); + m1.remove("b".into(), 3.into()); + m1.insert("c".into(), 4.into()); + m1.insert("d".into(), 5.into()); let m2 = Mutable::::new(); - m2.insert(Timestamped::new("a".into(), 1.into())); - m2.insert(Timestamped::new("b".into(), 2.into())); - m2.insert(Timestamped::new("c".into(), 3.into())); + m2.insert("a".into(), 1.into()); + m2.insert("b".into(), 2.into()); + m2.insert("c".into(), 3.into()); let m3 = Mutable::::new(); - m3.insert(Timestamped::new("e".into(), 4.into())); + m3.insert("e".into(), 4.into()); let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_iter(vec![ + let mut merge = MergeStream::from_vec(vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), m3.scan(bound, 6.into()).into(), diff --git a/src/transaction.rs b/src/transaction.rs index cddeaabe..f7054b83 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -2,12 +2,11 @@ use std::{collections::BTreeMap, io, sync::Arc}; use crate::{oracle::Timestamp, Record, DB}; -#[derive(Debug)] pub struct Transaction where R: Record, { - db: Arc, + db: Arc>, read_at: Timestamp, local: BTreeMap>, } @@ -16,7 +15,7 @@ impl Transaction where R: Record, { - pub(crate) fn new(db: Arc, read_at: Timestamp) -> Self { + pub(crate) fn new(db: Arc>, read_at: Timestamp) -> Self { Self { db, read_at,