From 39ab2136687b2fbd8b22472d749701da64de039a Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Thu, 1 Aug 2024 15:24:33 +0800 Subject: [PATCH] feat: add get insert remove insert_batch and scan to DB --- examples/declare.rs | 2 +- src/compaction/mod.rs | 6 +-- src/inmem/mutable.rs | 8 ++-- src/lib.rs | 100 +++++++++++++++++++++++++++++++++--------- src/transaction.rs | 26 +++++------ src/wal/mod.rs | 2 +- 6 files changed, 102 insertions(+), 42 deletions(-) diff --git a/examples/declare.rs b/examples/declare.rs index 690b7a0b..91f750bc 100644 --- a/examples/declare.rs +++ b/examples/declare.rs @@ -24,7 +24,7 @@ async fn main() { let mut txn = db.transaction().await; // set with owned value - txn.set(User { + txn.insert(User { name: "Alice".into(), age: 22, email: Some("alice@gmail.com".into()), diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index 1d85113b..5a5063ad 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -448,13 +448,13 @@ pub(crate) mod tests { timestamp::Timestamp, version::{edit::VersionEdit, Version}, wal::log::LogType, - DataBaseError, DbOption, + DbError, DbOption, }; async fn build_immutable( option: &DbOption, records: Vec<(LogType, R, Timestamp)>, - ) -> Result, DataBaseError> + ) -> Result, DbError> where R: Record + Send, FP: FileProvider, @@ -471,7 +471,7 @@ pub(crate) mod tests { option: &DbOption, gen: FileId, records: Vec<(LogType, R, Timestamp)>, - ) -> Result<(), DataBaseError> + ) -> Result<(), DbError> where R: Record + Send, FP: Executor, diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index cd910041..3935d747 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -17,7 +17,7 @@ use crate::{ Timestamp, EPOCH, }, wal::{log::LogType, WalFile}, - DataBaseError, DbOption, + DbError, DbOption, }; pub(crate) type MutableScan<'scan, R> = Range< @@ -67,7 +67,7 @@ where log_ty: LogType, record: R, ts: Timestamp, - ) -> Result> { + ) -> Result> { self.append(Some(log_ty), record.key().to_key(), ts, Some(record)) .await } @@ -77,7 +77,7 @@ where log_ty: LogType, key: R::Key, ts: Timestamp, - ) -> Result> { + ) -> Result> { self.append(Some(log_ty), key, ts, None).await } @@ -87,7 +87,7 @@ where key: R::Key, ts: Timestamp, value: Option, - ) -> Result> { + ) -> Result> { let timestamped_key = Timestamped::new(key, ts); if let Some(log_ty) = log_ty { diff --git a/src/lib.rs b/src/lib.rs index 4f6997d1..05aafacb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ use std::{ }; use async_lock::{RwLock, RwLockReadGuard}; +use async_stream::stream; use flume::{bounded, Sender}; use fs::FileProvider; use futures_core::Stream; @@ -38,7 +39,7 @@ use record::Record; use thiserror::Error; use timestamp::Timestamp; use tracing::error; -use transaction::Transaction; +use transaction::{CommitError, Transaction, TransactionEntry}; pub use crate::option::*; use crate::{ @@ -69,7 +70,7 @@ where R::Columns: Send + Sync, E: Executor + Send + Sync + 'static, { - pub async fn new(option: DbOption, executor: E) -> Result> { + pub async fn new(option: DbOption, executor: E) -> Result> { let option = Arc::new(option); E::create_dir_all(&option.path).await?; E::create_dir_all(&option.wal_dir_path()).await?; @@ -120,7 +121,72 @@ where ) } - pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DataBaseError> { + pub async fn insert(&self, record: R) -> Result<(), CommitError> { + Ok(self + .write(record, self.version_set.transaction_ts()) + .await?) + } + + pub async fn insert_batch( + &self, + records: impl ExactSizeIterator, + ) -> Result<(), CommitError> { + Ok(self + .write_batch(records, self.version_set.transaction_ts()) + .await?) + } + + pub async fn remove(&self, key: R::Key) -> Result> { + Ok(self + .schema + .read() + .await + .remove(LogType::Full, key, self.version_set.transaction_ts()) + .await?) + } + + pub async fn get( + &self, + key: &R::Key, + mut f: impl FnMut(TransactionEntry<'_, R>) -> T, + ) -> Result, CommitError> { + Ok(self + .schema + .read() + .await + .get( + &*self.version_set.current().await, + key, + self.version_set.transaction_ts(), + Projection::All, + ) + .await? + .map(|e| f(TransactionEntry::Stream(e)))) + } + + pub async fn scan<'scan, T: 'scan>( + &'scan self, + range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + mut f: impl FnMut(TransactionEntry<'_, R>) -> T + 'scan, + ) -> impl Stream>> + 'scan { + stream! { + let schema = self.schema.read().await; + let current = self.version_set.current().await; + let mut scan = Scan::new( + &schema, + range, + self.version_set.transaction_ts(), + &*current, + Vec::new(), + ).take().await?; + + while let Some(record) = scan.next().await { + yield Ok(f(TransactionEntry::Stream(record?))) + } + } + } + + pub(crate) async fn write(&self, record: R, ts: Timestamp) -> Result<(), DbError> { let schema = self.schema.read().await; if schema.write(LogType::Full, record, ts).await? { @@ -134,7 +200,7 @@ where &self, mut records: impl ExactSizeIterator, ts: Timestamp, - ) -> Result<(), DataBaseError> { + ) -> Result<(), DbError> { let schema = self.schema.read().await; if let Some(first) = records.next() { @@ -186,7 +252,7 @@ where option: Arc, compaction_tx: Sender, version_set: &VersionSet, - ) -> Result> { + ) -> Result> { let mut schema = Schema { mutable: Mutable::new(&option).await?, immutables: Default::default(), @@ -247,12 +313,7 @@ where Ok(schema) } - async fn write( - &self, - log_ty: LogType, - record: R, - ts: Timestamp, - ) -> Result> { + async fn write(&self, log_ty: LogType, record: R, ts: Timestamp) -> Result> { Ok(self.mutable.insert(log_ty, record, ts).await? > self.option.max_mem_table_size) } @@ -261,7 +322,7 @@ where log_ty: LogType, key: R::Key, ts: Timestamp, - ) -> Result> { + ) -> Result> { Ok(self.mutable.remove(log_ty, key, ts).await? > self.option.max_mem_table_size) } @@ -270,7 +331,7 @@ where key: R::Key, ts: Timestamp, value: Option, - ) -> Result> { + ) -> Result> { Ok(self.mutable.append(None, key, ts, value).await? > self.option.max_mem_table_size) } @@ -280,7 +341,7 @@ where key: &'get R::Key, ts: Timestamp, projection: Projection, - ) -> Result>, DataBaseError> + ) -> Result>, DbError> where FP: FileProvider, { @@ -374,7 +435,7 @@ where pub async fn take( mut self, - ) -> Result, ParquetError>>, DataBaseError> { + ) -> Result, ParquetError>>, DbError> { self.streams.push( self.schema .mutable @@ -404,8 +465,7 @@ where pub async fn package( mut self, batch_size: usize, - ) -> Result> + 'scan, DataBaseError> - { + ) -> Result> + 'scan, DbError> { self.streams.push( self.schema .mutable @@ -439,7 +499,7 @@ where } #[derive(Debug, Error)] -pub enum DataBaseError +pub enum DbError where R: Record, { @@ -491,7 +551,7 @@ pub(crate) mod tests { serdes::{Decode, Encode}, version::{cleaner::Cleaner, set::tests::build_version_set, Version}, wal::log::LogType, - DataBaseError, DbOption, Immutable, Projection, Record, DB, + DbError, DbOption, Immutable, Projection, Record, DB, }; #[derive(Debug, PartialEq, Eq)] @@ -821,7 +881,7 @@ pub(crate) mod tests { executor: E, schema: crate::Schema, version: Version, - ) -> Result, DataBaseError> + ) -> Result, DbError> where R: Record + Send + Sync, R::Columns: Send + Sync, diff --git a/src/transaction.rs b/src/transaction.rs index 4b848f3b..c2c227ea 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -20,7 +20,7 @@ use crate::{ timestamp::{Timestamp, Timestamped}, version::VersionRef, wal::log::LogType, - DataBaseError, LockMap, Projection, Record, Scan, Schema, + DbError, LockMap, Projection, Record, Scan, Schema, }; pub(crate) struct TransactionScan<'scan, R: Record> { @@ -76,7 +76,7 @@ where &'get self, key: &'get R::Key, projection: Projection, - ) -> Result>, DataBaseError> { + ) -> Result>, DbError> { Ok(match self.local.get(key).and_then(|v| v.as_ref()) { Some(v) => Some(TransactionEntry::Local(v.as_record_ref())), None => self @@ -105,7 +105,7 @@ where Scan::new(&self.share, range, self.ts, &self.version, streams) } - pub fn set(&mut self, value: R) { + pub fn insert(&mut self, value: R) { self.entry(value.key().to_key(), Some(value)) } @@ -215,7 +215,7 @@ where #[error("transaction parquet error {:?}", .0)] Parquet(#[from] ParquetError), #[error("transaction database error {:?}", .0)] - DataBase(#[from] DataBaseError), + Database(#[from] DbError), #[error("transaction write conflict: {:?}", .0)] WriteConflict(R::Key), } @@ -245,7 +245,7 @@ mod tests { .unwrap(); { let mut txn1 = db.transaction().await; - txn1.set("foo".to_string()); + txn1.insert("foo".to_string()); let txn2 = db.transaction().await; dbg!(txn2 @@ -279,18 +279,18 @@ mod tests { .unwrap(); let mut txn = db.transaction().await; - txn.set(0.to_string()); - txn.set(1.to_string()); + txn.insert(0.to_string()); + txn.insert(1.to_string()); txn.commit().await.unwrap(); let mut txn_0 = db.transaction().await; let mut txn_1 = db.transaction().await; let mut txn_2 = db.transaction().await; - txn_0.set(1.to_string()); - txn_1.set(1.to_string()); - txn_1.set(2.to_string()); - txn_2.set(2.to_string()); + txn_0.insert(1.to_string()); + txn_1.insert(1.to_string()); + txn_1.insert(2.to_string()); + txn_2.insert(2.to_string()); txn_0.commit().await.unwrap(); @@ -312,7 +312,7 @@ mod tests { .unwrap(); let mut txn1 = db.transaction().await; - txn1.set(Test { + txn1.insert(Test { vstring: 0.to_string(), vu32: 0, vbool: Some(true), @@ -341,7 +341,7 @@ mod tests { .unwrap(); let mut txn = db.transaction().await; - txn.set(Test { + txn.insert(Test { vstring: "king".to_string(), vu32: 8, vbool: Some(true), diff --git a/src/wal/mod.rs b/src/wal/mod.rs index c99d3e5e..7e416ba5 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -108,7 +108,7 @@ where } #[derive(Debug, Error)] -pub(crate) enum RecoverError { +pub enum RecoverError { #[error("wal recover decode error: {0}")] Decode(E), #[error("wal recover checksum error")]