From 353df0cfc26dcce72d810ee9020ca48808a805b8 Mon Sep 17 00:00:00 2001 From: Kould Date: Fri, 8 Nov 2024 17:06:46 +0800 Subject: [PATCH] chore: detach SnapShot from Transaction --- src/lib.rs | 11 ++- src/snapshot.rs | 204 +++++++++++++++++++++++++++++++++++++++++++++ src/transaction.rs | 67 ++++++--------- 3 files changed, 238 insertions(+), 44 deletions(-) create mode 100644 src/snapshot.rs diff --git a/src/lib.rs b/src/lib.rs index 09ac0fb7..a45f508f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,6 +121,7 @@ pub mod option; pub mod record; mod scope; pub mod serdes; +pub mod snapshot; pub mod stream; pub mod timestamp; pub mod transaction; @@ -160,6 +161,7 @@ use crate::{ executor::Executor, fs::{manager::StoreManager, parse_file_id, FileId, FileType}, serdes::Decode, + snapshot::Snapshot, stream::{ mem_projection::MemProjectionStream, merge::MergeStream, package::PackageStream, Entry, ScanStream, @@ -287,10 +289,13 @@ where /// open an optimistic ACID transaction pub async fn transaction(&self) -> Transaction<'_, R> { - Transaction::new( - self.version_set.current().await, + Transaction::new(self.snapshot().await, self.lock_map.clone()) + } + + pub async fn snapshot(&self) -> Snapshot<'_, R> { + Snapshot::new( self.schema.read().await, - self.lock_map.clone(), + self.version_set.current().await, self.manager.clone(), ) } diff --git a/src/snapshot.rs b/src/snapshot.rs new file mode 100644 index 00000000..ef28823c --- /dev/null +++ b/src/snapshot.rs @@ -0,0 +1,204 @@ +use std::{collections::Bound, sync::Arc}; + +use async_lock::RwLockReadGuard; +use parquet::arrow::ProjectionMask; + +use crate::{ + fs::manager::StoreManager, + record::Record, + stream, + stream::ScanStream, + timestamp::Timestamp, + version::{TransactionTs, VersionRef}, + DbError, Projection, Scan, Schema, +}; + +pub struct Snapshot<'s, R: Record> { + ts: Timestamp, + share: RwLockReadGuard<'s, Schema>, + version: VersionRef, + manager: Arc, +} + +impl<'s, R: Record> Snapshot<'s, R> { + pub async fn get<'get>( + &'get self, + key: &'get R::Key, + projection: Projection, + ) -> Result>, DbError> { + Ok(self + .share + .get(&self.version, &self.manager, key, self.ts, projection) + .await? + .and_then(|entry| { + if entry.value().is_none() { + None + } else { + Some(entry) + } + })) + } + + pub fn scan<'scan>( + &'scan self, + range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + ) -> Scan<'scan, R> { + Scan::new( + &self.share, + &self.manager, + range, + self.ts, + &self.version, + Box::new(move |_: Option| None), + ) + } + + pub(crate) fn new( + share: RwLockReadGuard<'s, Schema>, + version: VersionRef, + manager: Arc, + ) -> Self { + Self { + ts: version.load_ts(), + share, + version, + manager, + } + } + + pub(crate) fn ts(&self) -> Timestamp { + self.ts + } + + pub(crate) fn increase_ts(&self) -> Timestamp { + self.version.increase_ts() + } + + pub(crate) fn schema(&self) -> &Schema { + &self.share + } + + pub(crate) fn _scan<'scan>( + &'scan self, + range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), + fn_pre_stream: Box< + dyn FnOnce(Option) -> Option> + Send + 'scan, + >, + ) -> Scan<'scan, R> { + Scan::new( + &self.share, + &self.manager, + range, + self.ts, + &self.version, + fn_pre_stream, + ) + } +} + +#[cfg(test)] +mod tests { + use std::{collections::Bound, sync::Arc}; + + use fusio::path::Path; + use fusio_dispatch::FsOptions; + use futures_util::StreamExt; + use tempfile::TempDir; + + use crate::{ + compaction::tests::build_version, + executor::tokio::TokioExecutor, + fs::manager::StoreManager, + tests::{build_db, build_schema}, + version::TransactionTs, + DbOption, + }; + + #[tokio::test] + async fn snapshot_scan() { + let temp_dir = TempDir::new().unwrap(); + let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap()); + let option = Arc::new(DbOption::from( + Path::from_filesystem_path(temp_dir.path()).unwrap(), + )); + + manager + .base_fs() + .create_dir_all(&option.version_log_dir_path()) + .await + .unwrap(); + manager + .base_fs() + .create_dir_all(&option.wal_dir_path()) + .await + .unwrap(); + + let (_, version) = build_version(&option, &manager).await; + let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs()) + .await + .unwrap(); + let db = build_db( + option, + compaction_rx, + TokioExecutor::new(), + schema, + version, + manager, + ) + .await + .unwrap(); + + { + // to increase timestamps to 1 because the data ts built in advance is 1 + db.version_set.increase_ts(); + } + let mut snapshot = db.snapshot().await; + + let mut stream = snapshot + .scan((Bound::Unbounded, Bound::Unbounded)) + .projection(vec![1]) + .take() + .await + .unwrap(); + + let entry_0 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_0.key().value, "1"); + assert!(entry_0.value().unwrap().vbool.is_none()); + let entry_1 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_1.key().value, "2"); + assert!(entry_1.value().unwrap().vbool.is_none()); + let entry_2 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_2.key().value, "3"); + assert!(entry_2.value().unwrap().vbool.is_none()); + let entry_3 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_3.key().value, "4"); + assert!(entry_3.value().unwrap().vbool.is_none()); + let entry_4 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_4.key().value, "5"); + assert!(entry_4.value().unwrap().vbool.is_none()); + let entry_5 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_5.key().value, "6"); + assert!(entry_5.value().unwrap().vbool.is_none()); + let entry_6 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_6.key().value, "7"); + assert!(entry_6.value().unwrap().vbool.is_none()); + let entry_7 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_7.key().value, "8"); + assert!(entry_7.value().unwrap().vbool.is_none()); + let entry_8 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_8.key().value, "9"); + assert!(entry_8.value().unwrap().vbool.is_none()); + let entry_9 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_9.key().value, "alice"); + let entry_10 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_10.key().value, "ben"); + let entry_11 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_11.key().value, "carl"); + let entry_12 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_12.key().value, "dice"); + let entry_13 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_13.key().value, "erika"); + let entry_14 = stream.next().await.unwrap().unwrap(); + assert_eq!(entry_14.key().value, "funk"); + } +} diff --git a/src/transaction.rs b/src/transaction.rs index 3a9d38c1..1bada127 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -5,10 +5,8 @@ use std::{ }, io, mem::transmute, - sync::Arc, }; -use async_lock::RwLockReadGuard; use flume::SendError; use lockable::AsyncLimit; use parquet::{arrow::ProjectionMask, errors::ParquetError}; @@ -16,12 +14,11 @@ use thiserror::Error; use crate::{ compaction::CompactTask, - fs::manager::StoreManager, record::{Key, KeyRef}, + snapshot::Snapshot, stream, stream::mem_projection::MemProjectionStream, timestamp::{Timestamp, Timestamped}, - version::{TransactionTs, VersionRef}, wal::log::LogType, DbError, LockMap, Projection, Record, Scan, Schema, }; @@ -49,31 +46,20 @@ pub struct Transaction<'txn, R> where R: Record, { - ts: Timestamp, local: BTreeMap>, - share: RwLockReadGuard<'txn, Schema>, - version: VersionRef, + snapshot: Snapshot<'txn, R>, lock_map: LockMap, - manager: Arc, } impl<'txn, R> Transaction<'txn, R> where R: Record + Send, { - pub(crate) fn new( - version: VersionRef, - share: RwLockReadGuard<'txn, Schema>, - lock_map: LockMap, - manager: Arc, - ) -> Self { + pub(crate) fn new(snapshot: Snapshot<'txn, R>, lock_map: LockMap) -> Self { Self { - ts: version.load_ts(), local: BTreeMap::new(), - share, - version, + snapshot, lock_map, - manager, } } @@ -87,16 +73,10 @@ where Ok(match self.local.get(key).and_then(|v| v.as_ref()) { Some(v) => Some(TransactionEntry::Local(v.as_record_ref())), None => self - .share - .get(&self.version, &self.manager, key, self.ts, projection) + .snapshot + .get(key, projection) .await? - .and_then(|entry| { - if entry.value().is_none() { - None - } else { - Some(TransactionEntry::Stream(entry)) - } - }), + .map(TransactionEntry::Stream), }) } @@ -105,16 +85,12 @@ where &'scan self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ) -> Scan<'scan, R> { - Scan::new( - &self.share, - &self.manager, + self.snapshot._scan( range, - self.ts, - &self.version, Box::new(move |projection_mask: Option| { let mut transaction_scan = TransactionScan { inner: self.local.range(range), - ts: self.ts, + ts: self.snapshot.ts(), } .into(); if let Some(mask) = projection_mask { @@ -159,7 +135,11 @@ where ); } for (key, _) in self.local.iter() { - if self.share.check_conflict(key, self.ts) { + if self + .snapshot + .schema() + .check_conflict(key, self.snapshot.ts()) + { return Err(CommitError::WriteConflict(key.clone())); } } @@ -168,27 +148,32 @@ where let is_excess = match len { 0 => false, 1 => { - let new_ts = self.version.increase_ts(); + let new_ts = self.snapshot.increase_ts(); let (key, record) = self.local.pop_first().unwrap(); - Self::append(&self.share, LogType::Full, key, record, new_ts).await? + Self::append(self.snapshot.schema(), LogType::Full, key, record, new_ts).await? } _ => { - let new_ts = self.version.increase_ts(); + let new_ts = self.snapshot.increase_ts(); let mut iter = self.local.into_iter(); let (key, record) = iter.next().unwrap(); - Self::append(&self.share, LogType::First, key, record, new_ts).await?; + Self::append(self.snapshot.schema(), LogType::First, key, record, new_ts).await?; for (key, record) in (&mut iter).take(len - 2) { - Self::append(&self.share, LogType::Middle, key, record, new_ts).await?; + Self::append(self.snapshot.schema(), LogType::Middle, key, record, new_ts) + .await?; } let (key, record) = iter.next().unwrap(); - Self::append(&self.share, LogType::Last, key, record, new_ts).await? + Self::append(self.snapshot.schema(), LogType::Last, key, record, new_ts).await? } }; if is_excess { - let _ = self.share.compaction_tx.try_send(CompactTask::Freeze); + let _ = self + .snapshot + .schema() + .compaction_tx + .try_send(CompactTask::Freeze); } Ok(()) }