From 746d952771ee9ed34da3063c3e50818c22884dc6 Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Tue, 12 Nov 2024 17:10:37 +0800 Subject: [PATCH] feat: support parquet lru reader --- Cargo.toml | 9 +- bindings/python/Cargo.toml | 1 + bindings/python/src/db.rs | 4 +- bindings/python/src/transaction.rs | 19 +-- examples/datafusion.rs | 11 +- parquet-lru/Cargo.toml | 13 ++ parquet-lru/src/lib.rs | 208 +++++++++++++++++++++++++++++ src/compaction/mod.rs | 38 ++++-- src/fs/mod.rs | 2 +- src/lib.rs | 119 ++++++++++++----- src/ondisk/scan.rs | 12 +- src/ondisk/sstable.rs | 48 +++++-- src/stream/level.rs | 43 ++++-- src/stream/mem_projection.rs | 22 ++- src/stream/merge.rs | 43 +++--- src/stream/mod.rs | 32 +++-- src/stream/package.rs | 19 ++- src/transaction.rs | 29 +++- src/version/mod.rs | 44 ++++-- tests/data_integrity.rs | 3 +- 20 files changed, 574 insertions(+), 145 deletions(-) create mode 100644 parquet-lru/Cargo.toml create mode 100644 parquet-lru/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index c9dfa9dc..a8b70e34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = ["tonbo_macros"] } +workspace = { members = ["parquet-lru", "tonbo_macros"] } [package] description = "An embedded persistent KV database in Rust." @@ -74,17 +74,18 @@ fusio-parquet = { package = "fusio-parquet", version = "0.2.1" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" -lockable = "0.0.8" +lockable = "0.1.1" once_cell = "1" parquet = { version = "53", features = ["async"] } +parquet-lru = { version = "0.1.0", path = "parquet-lru" } pin-project-lite = "0.2" regex = "1" -thiserror = "1" +thiserror = "2.0.3" tokio = { version = "1", features = ["io-util"], default-features = false } tokio-util = { version = "0.7" } tonbo_macros = { version = "0.2.0", path = "tonbo_macros" } tracing = "0.1" -ulid = "1" +ulid = { version = "1", features = ["serde"] } # Only used for benchmarks log = "0.4.22" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 21b170ca..8fbbb9b0 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -15,6 +15,7 @@ fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [ "tokio", ] } futures = { version = "0.3" } +parquet-lru = { version = "0.1.0", path = "../../parquet-lru" } pyo3 = { version = "0.21.2", features = [ "abi3", "abi3-py310", diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index 840ff728..f788d5c8 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use parquet_lru::NoopCache; use pyo3::{ prelude::*, pyclass, pymethods, @@ -9,6 +10,7 @@ use pyo3::{ use pyo3_asyncio::tokio::{future_into_py, get_runtime}; use tonbo::{ executor::tokio::TokioExecutor, + fs::FileId, record::{ColumnDesc, DynRecord}, DB, }; @@ -28,7 +30,7 @@ type PyExecutor = TokioExecutor; pub struct TonboDB { desc: Arc>, primary_key_index: usize, - db: Arc>, + db: Arc>>, } #[pymethods] diff --git a/bindings/python/src/transaction.rs b/bindings/python/src/transaction.rs index 70d535d7..f40091d4 100644 --- a/bindings/python/src/transaction.rs +++ b/bindings/python/src/transaction.rs @@ -1,12 +1,13 @@ use std::{mem::transmute, sync::Arc}; +use parquet_lru::NoopCache; use pyo3::{ pyclass, pymethods, types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple}, Bound, IntoPy, Py, PyAny, PyResult, Python, }; use pyo3_asyncio::tokio::future_into_py; -use tonbo::{record::DynRecord, transaction, Projection}; +use tonbo::{fs::FileId, record::DynRecord, transaction, Projection}; use crate::{ column::Column, @@ -18,14 +19,14 @@ use crate::{ #[pyclass] pub struct Transaction { - txn: Option>, + txn: Option>>, desc: Arc>, primary_key_index: usize, } impl Transaction { pub(crate) fn new<'txn>( - txn: transaction::Transaction<'txn, DynRecord>, + txn: transaction::Transaction<'txn, DynRecord, NoopCache>, desc: Arc>, ) -> Self { let primary_key_index = desc @@ -37,8 +38,8 @@ impl Transaction { Transaction { txn: Some(unsafe { transmute::< - transaction::Transaction<'txn, DynRecord>, - transaction::Transaction<'static, DynRecord>, + transaction::Transaction<'txn, DynRecord, NoopCache>, + transaction::Transaction<'static, DynRecord, NoopCache>, >(txn) }), desc, @@ -84,8 +85,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord>, - &'static transaction::Transaction<'_, DynRecord>, + &transaction::Transaction<'_, DynRecord, NoopCache>, + &'static transaction::Transaction<'_, DynRecord, NoopCache>, >(txn) }; @@ -169,8 +170,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord>, - &'static transaction::Transaction<'_, DynRecord>, + &transaction::Transaction<'_, DynRecord, NoopCache>, + &'static transaction::Transaction<'_, DynRecord, NoopCache>, >(txn) }; let col_desc = self.desc.get(self.primary_key_index).unwrap(); diff --git a/examples/datafusion.rs b/examples/datafusion.rs index 2b7dd167..fefa4b89 100644 --- a/examples/datafusion.rs +++ b/examples/datafusion.rs @@ -26,7 +26,9 @@ use datafusion::{ use fusio::path::Path; use futures_core::Stream; use futures_util::StreamExt; +use parquet_lru::NoopCache; use tokio::fs; +use tonbo::fs::FileId; use tonbo::{ executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB, }; @@ -41,12 +43,12 @@ pub struct Music { } struct MusicProvider { - db: Arc>, + db: Arc>>, } struct MusicExec { cache: PlanProperties, - db: Arc>, + db: Arc>>, projection: Option>, limit: Option, range: (Bound<::Key>, Bound<::Key>), @@ -95,7 +97,10 @@ impl TableProvider for MusicProvider { } impl MusicExec { - fn new(db: Arc>, projection: Option<&Vec>) -> Self { + fn new( + db: Arc>>, + projection: Option<&Vec>, + ) -> Self { let schema = Music::arrow_schema(); let schema = if let Some(projection) = &projection { Arc::new(schema.project(projection).unwrap()) diff --git a/parquet-lru/Cargo.toml b/parquet-lru/Cargo.toml new file mode 100644 index 00000000..db4da47a --- /dev/null +++ b/parquet-lru/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition = "2021" +name = "parquet-lru" +version = "0.1.0" + +[dependencies] +bytes = { version = "1.8.0", features = ["serde"] } +foyer = "0.12.2" +futures-core = "0.3.31" +futures-util = "0.3.31" +parquet = { version = "53.2.0", features = ["async"] } +serde = "1.0.214" +thiserror = "2.0.3" diff --git a/parquet-lru/src/lib.rs b/parquet-lru/src/lib.rs new file mode 100644 index 00000000..19559131 --- /dev/null +++ b/parquet-lru/src/lib.rs @@ -0,0 +1,208 @@ +use std::{future::Future, hash::Hash, marker::PhantomData, ops::Range, sync::Arc}; + +use bytes::Bytes; +use futures_core::future::BoxFuture; +use futures_util::future::FutureExt; +use parquet::{ + arrow::async_reader::AsyncFileReader, + errors::{ParquetError, Result}, + file::metadata::ParquetMetaData, +}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Default)] +pub struct Options { + meta_capacity: usize, + data_capacity: usize, +} + +impl Options { + pub fn meta_capacity(mut self, meta_capacity: usize) -> Self { + self.meta_capacity = meta_capacity; + self + } + + pub fn data_capacity(mut self, data_capacity: usize) -> Self { + self.data_capacity = data_capacity; + self + } +} + +pub trait LruCache: Clone + Send + Sync + 'static { + type LruReader: AsyncFileReader + 'static; + + fn new(options: Options) -> impl Future> + Send; + + fn get_reader(&self, key: K, reader: R) -> impl Future> + Send + where + R: AsyncFileReader + 'static; +} + +#[derive(Clone)] +pub struct FoyerCache +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, +{ + inner: Arc>, +} + +pub struct FoyerCacheInner +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, +{ + meta: foyer::Cache>, + data: foyer::HybridCache<(K, Range), Bytes>, +} + +impl LruCache for FoyerCache +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static, +{ + type LruReader = ParquetLru; + + async fn new(options: Options) -> Result { + Ok(Self { + inner: Arc::new(FoyerCacheInner { + meta: foyer::CacheBuilder::new(options.meta_capacity).build(), + data: foyer::HybridCacheBuilder::new() + .memory(options.data_capacity) + .storage(foyer::Engine::Large) + .build() + .await + .map_err(|e| Error::Foyer(e.into()))?, + }), + }) + } + + async fn get_reader(&self, key: K, reader: R) -> ParquetLru { + ParquetLru::new(self.clone(), key, reader) + } +} + +pub struct ParquetLru +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, +{ + cache: FoyerCache, + key: K, + reader: R, +} + +impl ParquetLru +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static, + R: AsyncFileReader, +{ + fn new(cache: FoyerCache, key: K, reader: R) -> Self { + Self { cache, key, reader } + } +} + +impl AsyncFileReader for ParquetLru +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static, + R: AsyncFileReader, +{ + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { + if let Some(data) = self + .cache + .inner + .data + .get(&(self.key.clone(), range.clone())) + .await + .map_err(|e| ParquetError::External(e.into()))? + { + Ok(data.value().clone()) + } else { + let data = self.reader.get_bytes(range.clone()).await?; + self.cache + .inner + .data + .insert((self.key.clone(), range), data.clone()); + Ok(data) + } + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { + async move { + if let Some(meta) = self.cache.inner.meta.get(&self.key) { + Ok(meta.value().clone()) + } else { + let meta = self.reader.get_metadata().await?; + self.cache.inner.meta.insert(self.key.clone(), meta.clone()); + Ok(meta) + } + } + .boxed() + } + + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + async move { + let mut missed = Vec::with_capacity(ranges.len()); + let mut results = Vec::with_capacity(ranges.len()); + for (id, range) in ranges.iter().enumerate() { + if let Some(data) = self + .cache + .inner + .data + .get(&(self.key.clone(), range.clone())) + .await + .map_err(|e| ParquetError::External(e.into()))? + { + results.push((id, data.value().clone())); + } else { + missed.push((id, range)); + } + } + if !missed.is_empty() { + let data = self + .reader + .get_byte_ranges(missed.iter().map(|&(_, r)| r.clone()).collect()) + .await?; + for (id, range) in missed { + let data = data[id].clone(); + self.cache + .inner + .data + .insert((self.key.clone(), range.clone()), data.clone()); + results.push((id, data)); + } + } + results.sort_by_key(|(id, _)| *id); + Ok(results.into_iter().map(|(_, data)| data).collect()) + } + .boxed() + } +} + +#[derive(Clone, Default)] +pub struct NoopCache { + _phantom: PhantomData, +} + +impl LruCache for NoopCache +where + for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static, +{ + type LruReader = R; + + async fn new(_options: Options) -> Result { + Ok(Self { + _phantom: PhantomData, + }) + } + + async fn get_reader(&self, _key: K, reader: R) -> R { + reader + } +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Foyer error: {0}")] + Foyer(#[from] Box), +} diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index eac33d95..948bc30f 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -5,6 +5,7 @@ use fusio::DynFs; use fusio_parquet::writer::AsyncWriter; use futures_util::StreamExt; use parquet::arrow::{AsyncArrowWriter, ProjectionMask}; +use parquet_lru::LruCache; use thiserror::Error; use tokio::sync::oneshot; use ulid::Ulid; @@ -60,7 +61,13 @@ where } } - pub(crate) async fn check_then_compaction(&mut self) -> Result<(), CompactionError> { + pub(crate) async fn check_then_compaction( + &mut self, + parquet_lru_cache: C, + ) -> Result<(), CompactionError> + where + C: LruCache + Unpin, + { let mut guard = self.schema.write().await; guard.trigger.reset(); @@ -108,6 +115,7 @@ where &mut delete_gens, &guard.record_instance, &self.manager, + &parquet_lru_cache, ) .await?; } @@ -186,7 +194,7 @@ where } #[allow(clippy::too_many_arguments)] - pub(crate) async fn major_compaction( + pub(crate) async fn major_compaction( version: &Version, option: &DbOption, mut min: &R::Key, @@ -195,7 +203,11 @@ where delete_gens: &mut Vec<(FileId, usize)>, instance: &RecordInstance, manager: &StoreManager, - ) -> Result<(), CompactionError> { + parquet_cache: &C, + ) -> Result<(), CompactionError> + where + C: LruCache + Unpin, + { let mut level = 0; while level < MAX_LEVEL - 2 { @@ -220,7 +232,7 @@ where .await?; streams.push(ScanStream::SsTable { - inner: SsTable::open(file) + inner: SsTable::open(parquet_cache.clone(), scope.gen, file) .await? .scan( (Bound::Unbounded, Bound::Unbounded), @@ -243,6 +255,7 @@ where None, ProjectionMask::all(), level_fs.clone(), + parquet_cache.clone(), ) .ok_or(CompactionError::EmptyLevel)?; @@ -263,6 +276,7 @@ where None, ProjectionMask::all(), level_fs.clone(), + parquet_cache.clone(), ) .ok_or(CompactionError::EmptyLevel)?; @@ -378,15 +392,18 @@ where (meet_scopes_l, start_l, end_l - 1) } - async fn build_tables<'scan>( + async fn build_tables<'scan, C>( option: &DbOption, version_edits: &mut Vec::Key>>, level: usize, - streams: Vec>, + streams: Vec>, instance: &RecordInstance, fs: &Arc, - ) -> Result<(), CompactionError> { - let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; + ) -> Result<(), CompactionError> + where + C: LruCache + Unpin, + { + let mut stream = MergeStream::::from_vec(streams, u32::MAX.into()).await?; // Kould: is the capacity parameter necessary? let mut builder = R::Columns::builder(&instance.arrow_schema::(), 8192); @@ -513,6 +530,7 @@ pub(crate) mod tests { use fusio_dispatch::FsOptions; use fusio_parquet::writer::AsyncWriter; use parquet::arrow::AsyncArrowWriter; + use parquet_lru::NoopCache; use tempfile::TempDir; use crate::{ @@ -810,6 +828,7 @@ pub(crate) mod tests { &mut vec![], &RecordInstance::Normal, &manager, + &NoopCache::default(), ) .await .unwrap(); @@ -1201,6 +1220,7 @@ pub(crate) mod tests { &mut vec![], &RecordInstance::Normal, &manager, + &NoopCache::default(), ) .await .unwrap(); @@ -1221,7 +1241,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for i in 5..9 { let item = Test { diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 1bffbb50..0f97191b 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -8,7 +8,7 @@ use std::{ use fusio::{fs::OpenOptions, path::Path}; use ulid::{DecodeError, Ulid}; -pub(crate) type FileId = Ulid; +pub type FileId = Ulid; pub enum FileType { Wal, diff --git a/src/lib.rs b/src/lib.rs index 09ac0fb7..f22bfcbd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,6 +136,7 @@ pub use arrow; use async_lock::RwLock; use async_stream::stream; use flume::{bounded, Sender}; +use fs::FileId; use futures_core::Stream; use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; @@ -146,6 +147,7 @@ use parquet::{ arrow::{arrow_to_parquet_schema, ProjectionMask}, errors::ParquetError, }; +use parquet_lru::{LruCache, NoopCache}; use record::{ColumnDesc, DynRecord, Record, RecordInstance}; use thiserror::Error; use timestamp::{Timestamp, TimestampedRef}; @@ -153,12 +155,13 @@ use tokio::sync::oneshot; pub use tonbo_macros::{KeyAttributes, Record}; use tracing::error; use transaction::{CommitError, Transaction, TransactionEntry}; +use ulid::Ulid; pub use crate::option::*; use crate::{ compaction::{CompactTask, CompactionError, Compactor}, executor::Executor, - fs::{manager::StoreManager, parse_file_id, FileId, FileType}, + fs::{manager::StoreManager, parse_file_id, FileType}, serdes::Decode, stream::{ mem_projection::MemProjectionStream, merge::MergeStream, package::PackageStream, Entry, @@ -170,19 +173,21 @@ use crate::{ wal::{log::LogType, RecoverError, WalFile}, }; -pub struct DB +pub struct DB where R: Record, E: Executor, + C: LruCache, { schema: Arc>>, version_set: VersionSet, lock_map: LockMap, manager: Arc, + parquet_lru_cache: C, _p: PhantomData, } -impl DB +impl DB> where E: Executor + Send + Sync + 'static, { @@ -198,11 +203,11 @@ where let instance = RecordInstance::Runtime(DynRecord::empty_record(column_descs, primary_index)); - Self::build(option, executor, instance).await + Self::build(option, executor, instance, Default::default()).await } } -impl DB +impl DB> where R: Record + Send + Sync, R::Columns: Send + Sync, @@ -214,13 +219,28 @@ where /// /// For more configurable options, please refer to [`DbOption`]. pub async fn new(option: DbOption, executor: E) -> Result> { - Self::build(Arc::new(option), executor, RecordInstance::Normal).await + Self::build( + Arc::new(option), + executor, + RecordInstance::Normal, + Default::default(), + ) + .await } +} +impl DB +where + R: Record + Send + Sync, + R::Columns: Send + Sync, + E: Executor + Send + Sync + 'static, + C: LruCache + Unpin, +{ async fn build( option: Arc>, executor: E, instance: RecordInstance, + lru_cache: C, ) -> Result> { let manager = Arc::new(StoreManager::new( option.base_fs.clone(), @@ -258,12 +278,20 @@ where error!("[Cleaner Error]: {}", err) } }); + + let compact_task_cache = lru_cache.clone(); executor.spawn(async move { while let Ok(task) = task_rx.recv_async().await { if let Err(err) = match task { - CompactTask::Freeze => compactor.check_then_compaction().await, + CompactTask::Freeze => { + compactor + .check_then_compaction(compact_task_cache.clone()) + .await + } CompactTask::Flush(option_tx) => { - let mut result = compactor.check_then_compaction().await; + let mut result = compactor + .check_then_compaction(compact_task_cache.clone()) + .await; if let Some(tx) = option_tx { if result.is_ok() { result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -276,22 +304,25 @@ where } } }); + Ok(Self { schema, version_set, lock_map: Arc::new(Default::default()), manager, + parquet_lru_cache: lru_cache, _p: Default::default(), }) } /// open an optimistic ACID transaction - pub async fn transaction(&self) -> Transaction<'_, R> { + pub async fn transaction(&self) -> Transaction<'_, R, C> { Transaction::new( self.version_set.current().await, self.schema.read().await, self.lock_map.clone(), self.manager.clone(), + self.parquet_lru_cache.clone(), ) } @@ -348,6 +379,7 @@ where key, self.version_set.load_ts(), Projection::All, + self.parquet_lru_cache.clone(), ) .await? .and_then(|entry| { @@ -376,6 +408,7 @@ where self.version_set.load_ts(), &*current, Box::new(|_| None), + self.parquet_lru_cache.clone(), ).take().await?; while let Some(record) = scan.next().await { @@ -555,14 +588,18 @@ where self.mutable.append(None, key, ts, value).await } - async fn get<'get>( + async fn get<'get, C>( &'get self, version: &'get Version, manager: &StoreManager, key: &'get R::Key, ts: Timestamp, projection: Projection, - ) -> Result>, DbError> { + parquet_cache: C, + ) -> Result>, DbError> + where + C: LruCache, + { if let Some(entry) = self.mutable.get(key, ts) { return Ok(Some(Entry::Mutable(entry))); } @@ -591,7 +628,12 @@ where } Ok(version - .query(manager, TimestampedRef::new(key, ts), projection) + .query( + manager, + TimestampedRef::new(key, ts), + projection, + parquet_cache, + ) .await? .map(|entry| Entry::RecordBatch(entry))) } @@ -612,9 +654,10 @@ where } /// scan configuration intermediate structure -pub struct Scan<'scan, R> +pub struct Scan<'scan, R, C> where R: Record, + C: LruCache, { schema: &'scan Schema, manager: &'scan StoreManager, @@ -624,16 +667,19 @@ where version: &'scan Version, fn_pre_stream: - Box) -> Option> + Send + 'scan>, + Box) -> Option> + Send + 'scan>, limit: Option, projection_indices: Option>, projection: ProjectionMask, + + parquet_cache: C, } -impl<'scan, R> Scan<'scan, R> +impl<'scan, R, C> Scan<'scan, R, C> where R: Record + Send, + C: LruCache + Unpin, { fn new( schema: &'scan Schema, @@ -642,8 +688,9 @@ where ts: Timestamp, version: &'scan Version, fn_pre_stream: Box< - dyn FnOnce(Option) -> Option> + Send + 'scan, + dyn FnOnce(Option) -> Option> + Send + 'scan, >, + parquet_cache: C, ) -> Self { Self { schema, @@ -656,6 +703,7 @@ where limit: None, projection_indices: None, projection: ProjectionMask::all(), + parquet_cache, } } @@ -731,6 +779,7 @@ where self.ts, self.limit, self.projection, + self.parquet_cache, ) .await?; @@ -783,6 +832,7 @@ where self.ts, self.limit, self.projection, + self.parquet_cache, ) .await?; let merge_stream = MergeStream::from_vec(streams, self.ts).await?; @@ -847,8 +897,10 @@ pub(crate) mod tests { use futures::StreamExt; use once_cell::sync::Lazy; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; + use parquet_lru::NoopCache; use tempfile::TempDir; use tracing::error; + use ulid::Ulid; use crate::{ compaction::{CompactTask, CompactionError, Compactor}, @@ -1086,7 +1138,7 @@ pub(crate) mod tests { option: DbOption, executor: E, ) -> RecordBatch { - let db: DB = DB::new(option.clone(), executor).await.unwrap(); + let db: DB = DB::new(option.clone(), executor).await.unwrap(); let base_fs = db.manager.base_fs(); db.write( @@ -1238,7 +1290,7 @@ pub(crate) mod tests { schema: crate::Schema, version: Version, manager: Arc, - ) -> Result, DbError> + ) -> Result>, DbError> where R: Record + Send + Sync, R::Columns: Send + Sync, @@ -1271,9 +1323,12 @@ pub(crate) mod tests { executor.spawn(async move { while let Ok(task) = compaction_rx.recv_async().await { if let Err(err) = match task { - CompactTask::Freeze => compactor.check_then_compaction().await, + CompactTask::Freeze => { + compactor.check_then_compaction(NoopCache::default()).await + } CompactTask::Flush(option_tx) => { - let mut result = compactor.check_then_compaction().await; + let mut result = + compactor.check_then_compaction(NoopCache::default()).await; if let Some(tx) = option_tx { let channel_result = tx.send(()).map_err(|_| CompactionError::ChannelClose); @@ -1294,6 +1349,7 @@ pub(crate) mod tests { version_set, lock_map: Arc::new(Default::default()), manager, + parquet_lru_cache: Default::default(), _p: Default::default(), }) } @@ -1522,7 +1578,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for (i, item) in test_items().into_iter().enumerate() { db.write(item, 0.into()).await.unwrap(); @@ -1558,7 +1614,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(/* max_mutable_len */ 50); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for item in &test_items()[0..10] { db.write(item.clone(), 0.into()).await.unwrap(); @@ -1607,9 +1663,10 @@ pub(crate) mod tests { schema.flush_wal().await.unwrap(); drop(schema); - let db: DB = DB::new(option.as_ref().to_owned(), TokioExecutor::new()) - .await - .unwrap(); + let db: DB = + DB::new(option.as_ref().to_owned(), TokioExecutor::new()) + .await + .unwrap(); let mut sort_items = BTreeMap::new(); for item in test_items() { @@ -1679,7 +1736,7 @@ pub(crate) mod tests { "id".to_owned(), primary_key_index, ); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), desc, primary_key_index) .await .unwrap(); @@ -1719,7 +1776,7 @@ pub(crate) mod tests { option.major_threshold_with_sst_size = 3; option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); for (idx, item) in test_items().into_iter().enumerate() { if idx % 2 == 0 { @@ -1761,7 +1818,7 @@ pub(crate) mod tests { option.major_default_oldest_table_num = 1; option.trigger_type = TriggerType::Length(5); - let db: DB = + let db: DB = DB::with_schema(option, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); @@ -1991,7 +2048,7 @@ pub(crate) mod tests { option3.major_default_oldest_table_num = 1; option3.trigger_type = TriggerType::Length(5); - let db1: DB = DB::with_schema( + let db1: DB = DB::with_schema( option, TokioExecutor::new(), cols_desc.clone(), @@ -1999,7 +2056,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db2: DB = DB::with_schema( + let db2: DB = DB::with_schema( option2, TokioExecutor::new(), cols_desc.clone(), @@ -2007,7 +2064,7 @@ pub(crate) mod tests { ) .await .unwrap(); - let db3: DB = + let db3: DB = DB::with_schema(option3, TokioExecutor::new(), cols_desc, primary_key_index) .await .unwrap(); diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 5a8ce66d..4957142e 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -6,9 +6,11 @@ use std::{ }; use arrow::datatypes::Schema; -use fusio_parquet::reader::AsyncReader; use futures_core::{ready, Stream}; -use parquet::arrow::{async_reader::ParquetRecordBatchStream, ProjectionMask}; +use parquet::arrow::{ + async_reader::{AsyncFileReader, ParquetRecordBatchStream}, + ProjectionMask, +}; use pin_project_lite::pin_project; use crate::{ @@ -18,9 +20,9 @@ use crate::{ pin_project! { #[derive(Debug)] - pub struct SsTableScan<'scan, R>{ + pub struct SsTableScan<'scan, R> { #[pin] - stream: ParquetRecordBatchStream, + stream: ParquetRecordBatchStream>, iter: Option>, projection_mask: ProjectionMask, full_schema: Arc, @@ -30,7 +32,7 @@ pin_project! { impl SsTableScan<'_, R> { pub fn new( - stream: ParquetRecordBatchStream, + stream: ParquetRecordBatchStream>, projection_mask: ProjectionMask, full_schema: Arc, ) -> Self { diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 2304d6d3..2c596f96 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -3,10 +3,16 @@ use std::{marker::PhantomData, ops::Bound}; use fusio::{dynamic::DynFile, DynRead}; use fusio_parquet::reader::AsyncReader; use futures_util::StreamExt; -use parquet::arrow::{ - arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, - ParquetRecordBatchStreamBuilder, ProjectionMask, +use parquet::{ + arrow::{ + arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions}, + async_reader::{AsyncFileReader, AsyncReader as ParquetAsyncReader}, + ParquetRecordBatchStreamBuilder, ProjectionMask, + }, + errors::Result as ParquetResult, }; +use parquet_lru::LruCache; +use ulid::Ulid; use super::{arrows::get_range_filter, scan::SsTableScan}; use crate::{ @@ -15,23 +21,31 @@ use crate::{ timestamp::{Timestamp, TimestampedRef}, }; -pub(crate) struct SsTable +pub(crate) struct SsTable where R: Record, + C: LruCache, { - reader: AsyncReader, + reader: C::LruReader, _marker: PhantomData, } -impl SsTable +impl SsTable where R: Record, + C: LruCache, { - pub(crate) async fn open(file: Box) -> Result { + pub(crate) async fn open( + lru_cache: C, + id: Ulid, + file: Box, + ) -> Result { let size = file.size().await?; Ok(SsTable { - reader: AsyncReader::new(file, size).await?, + reader: lru_cache + .get_reader(id, AsyncReader::new(file, size).await?) + .await, _marker: PhantomData, }) } @@ -40,11 +54,10 @@ where self, limit: Option, projection_mask: ProjectionMask, - ) -> parquet::errors::Result< - ArrowReaderBuilder>, - > { + ) -> ParquetResult>>> + { let mut builder = ParquetRecordBatchStreamBuilder::new_with_options( - self.reader, + Box::new(self.reader) as Box, ArrowReaderOptions::default().with_page_index(true), ) .await?; @@ -58,7 +71,7 @@ where self, key: &TimestampedRef, projection_mask: ProjectionMask, - ) -> parquet::errors::Result>> { + ) -> ParquetResult>> { self.scan( (Bound::Included(key.value()), Bound::Included(key.value())), key.ts(), @@ -114,6 +127,8 @@ pub(crate) mod tests { basic::{Compression, ZstdLevel}, file::properties::WriterProperties, }; + use parquet_lru::NoopCache; + use ulid::Ulid; use super::SsTable; use crate::{ @@ -152,11 +167,16 @@ pub(crate) mod tests { Ok(()) } - pub(crate) async fn open_sstable(store: &Arc, path: &Path) -> SsTable + pub(crate) async fn open_sstable( + store: &Arc, + path: &Path, + ) -> SsTable> where R: Record, { SsTable::open( + Default::default(), + Default::default(), store .open_options(path, FileType::Parquet.open_options(true)) .await diff --git a/src/stream/level.rs b/src/stream/level.rs index 2039d52c..ca8a3200 100644 --- a/src/stream/level.rs +++ b/src/stream/level.rs @@ -13,6 +13,8 @@ use fusio::{ }; use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; +use parquet_lru::LruCache; +use ulid::Ulid; use crate::{ fs::{FileId, FileType}, @@ -25,22 +27,27 @@ use crate::{ DbOption, }; -enum FutureStatus<'level, R> +enum FutureStatus<'level, R, C> where R: Record, + C: LruCache, { Init(FileId), Ready(SsTableScan<'level, R>), - OpenFile(Pin, Error>> + 'level>>), - OpenSst(Pin, Error>> + Send + 'level>>), + OpenFile( + Ulid, + Pin, Error>> + 'level>>, + ), + OpenSst(Pin, Error>> + Send + 'level>>), LoadStream( Pin, ParquetError>> + Send + 'level>>, ), } -pub(crate) struct LevelStream<'level, R> +pub(crate) struct LevelStream<'level, R, C> where R: Record, + C: LruCache, { lower: Bound<&'level R::Key>, upper: Bound<&'level R::Key>, @@ -50,14 +57,16 @@ where gens: VecDeque, limit: Option, projection_mask: ProjectionMask, - status: FutureStatus<'level, R>, + status: FutureStatus<'level, R, C>, fs: Arc, path: Option, + parquet_cache: C, } -impl<'level, R> LevelStream<'level, R> +impl<'level, R, C> LevelStream<'level, R, C> where R: Record, + C: LruCache, { // Kould: only used by Compaction now, and the start and end of the sstables range are known #[allow(clippy::too_many_arguments)] @@ -71,6 +80,7 @@ where limit: Option, projection_mask: ProjectionMask, fs: Arc, + parquet_cache: C, ) -> Option { let (lower, upper) = range; let mut gens: VecDeque = version.level_slice[level][start..end + 1] @@ -92,13 +102,15 @@ where status, fs, path: None, + parquet_cache, }) } } -impl<'level, R> Stream for LevelStream<'level, R> +impl<'level, R, C> Stream for LevelStream<'level, R, C> where R: Record, + C: LruCache + Unpin, { type Item = Result, ParquetError>; @@ -125,7 +137,7 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + self.status = FutureStatus::OpenFile(gen, reader); continue; } FutureStatus::Ready(stream) => match Pin::new(stream).poll_next(cx) { @@ -151,7 +163,7 @@ where >, >(reader) }; - self.status = FutureStatus::OpenFile(reader); + self.status = FutureStatus::OpenFile(gen, reader); continue; } }, @@ -163,9 +175,14 @@ where } Poll::Pending => Poll::Pending, }, - FutureStatus::OpenFile(file_future) => match Pin::new(file_future).poll(cx) { + FutureStatus::OpenFile(id, file_future) => match Pin::new(file_future).poll(cx) { Poll::Ready(Ok(file)) => { - self.status = FutureStatus::OpenSst(Box::pin(SsTable::open(file))); + let id = *id; + self.status = FutureStatus::OpenSst(Box::pin(SsTable::open( + self.parquet_cache.clone(), + id, + file, + ))); continue; } Poll::Ready(Err(err)) => { @@ -209,6 +226,7 @@ mod tests { use fusio_dispatch::FsOptions; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use parquet_lru::NoopCache; use tempfile::TempDir; use crate::{ @@ -251,6 +269,7 @@ mod tests { [0, 1, 2, 3], ), manager.base_fs().clone(), + NoopCache::default(), ) .unwrap(); @@ -287,6 +306,7 @@ mod tests { [0, 1, 2, 4], ), manager.base_fs().clone(), + NoopCache::default(), ) .unwrap(); @@ -323,6 +343,7 @@ mod tests { [0, 1, 2], ), manager.base_fs().clone(), + NoopCache::default(), ) .unwrap(); diff --git a/src/stream/mem_projection.rs b/src/stream/mem_projection.rs index 6671e285..e544fa49 100644 --- a/src/stream/mem_projection.rs +++ b/src/stream/mem_projection.rs @@ -6,7 +6,9 @@ use std::{ use futures_core::Stream; use parquet::{arrow::ProjectionMask, errors::ParquetError}; +use parquet_lru::LruCache; use pin_project_lite::pin_project; +use ulid::Ulid; use crate::{ record::Record, @@ -14,20 +16,25 @@ use crate::{ }; pin_project! { - pub struct MemProjectionStream<'projection, R> + pub struct MemProjectionStream<'projection, R, C> where R: Record, + C: LruCache, { - stream: Box>, + stream: Box>, projection_mask: Arc, } } -impl<'projection, R> MemProjectionStream<'projection, R> +impl<'projection, R, C> MemProjectionStream<'projection, R, C> where R: Record, + C: LruCache, { - pub(crate) fn new(stream: ScanStream<'projection, R>, projection_mask: ProjectionMask) -> Self { + pub(crate) fn new( + stream: ScanStream<'projection, R, C>, + projection_mask: ProjectionMask, + ) -> Self { Self { stream: Box::new(stream), projection_mask: Arc::new(projection_mask), @@ -35,9 +42,10 @@ where } } -impl<'projection, R> Stream for MemProjectionStream<'projection, R> +impl<'projection, R, C> Stream for MemProjectionStream<'projection, R, C> where R: Record, + C: LruCache + Unpin, { type Item = Result, ParquetError>; @@ -61,6 +69,8 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; use parquet::arrow::{arrow_to_parquet_schema, ProjectionMask}; + use parquet_lru::NoopCache; + use ulid::Ulid; use crate::{ inmem::mutable::Mutable, record::Record, stream::mem_projection::MemProjectionStream, @@ -121,7 +131,7 @@ mod tests { vec![0, 1, 2, 4], ); - let mut stream = MemProjectionStream::::new( + let mut stream = MemProjectionStream::>::new( mutable .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into(), diff --git a/src/stream/merge.rs b/src/stream/merge.rs index f58583c6..eab7a6f5 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -7,17 +7,20 @@ use std::{ use futures_core::{ready, Stream}; use futures_util::stream::StreamExt; +use parquet_lru::LruCache; use pin_project_lite::pin_project; +use ulid::Ulid; use super::{Entry, ScanStream}; use crate::{record::Record, timestamp::Timestamp}; pin_project! { - pub struct MergeStream<'merge, R> + pub struct MergeStream<'merge, R, C> where R: Record, + C: LruCache, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, ts: Timestamp, @@ -25,12 +28,13 @@ pin_project! { } } -impl<'merge, R> MergeStream<'merge, R> +impl<'merge, R, C> MergeStream<'merge, R, C> where R: Record, + C: LruCache + Unpin, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ts: Timestamp, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -62,9 +66,10 @@ where } } -impl<'merge, R> Stream for MergeStream<'merge, R> +impl<'merge, R, C> Stream for MergeStream<'merge, R, C> where R: Record, + C: LruCache + Unpin, { type Item = Result, parquet::errors::ParquetError>; @@ -160,6 +165,8 @@ mod tests { use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; + use parquet_lru::NoopCache; + use ulid::Ulid; use super::MergeStream; use crate::{ @@ -212,7 +219,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::>::from_vec( vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), @@ -291,10 +298,12 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()], 0.into()) - .await - .unwrap(); + let mut merge = MergeStream::>::from_vec( + vec![m1.scan(bound, 0.into()).into()], + 0.into(), + ) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -319,10 +328,12 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = - MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()], 1.into()) - .await - .unwrap(); + let mut merge = MergeStream::>::from_vec( + vec![m1.scan(bound, 1.into()).into()], + 1.into(), + ) + .await + .unwrap(); if let Some(Ok(Entry::Mutable(entry))) = merge.next().await { assert_eq!(entry.key().value, "1"); @@ -371,7 +382,7 @@ mod tests { let lower = "1".to_string(); let upper = "3".to_string(); { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::>::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], @@ -391,7 +402,7 @@ mod tests { assert!(merge.next().await.is_none()); } { - let mut merge = MergeStream::::from_vec( + let mut merge = MergeStream::>::from_vec( vec![m1 .scan((Bound::Included(&lower), Bound::Included(&upper)), 0.into()) .into()], diff --git a/src/stream/mod.rs b/src/stream/mod.rs index fa0b5afe..a26dffeb 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -15,8 +15,10 @@ use std::{ use futures_core::Stream; use futures_util::{ready, stream}; use parquet::arrow::ProjectionMask; +use parquet_lru::LruCache; use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; +use ulid::Ulid; use crate::{ inmem::{immutable::ImmutableScan, mutable::MutableScan}, @@ -100,9 +102,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R> + pub enum ScanStream<'scan, R, C> where R: Record, + C: LruCache, { Transaction { #[pin] @@ -122,18 +125,19 @@ pin_project! { }, Level { #[pin] - inner: LevelStream<'scan, R>, + inner: LevelStream<'scan, R, C>, }, MemProjection { #[pin] - inner: MemProjectionStream<'scan, R>, + inner: MemProjectionStream<'scan, R, C>, } } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: LruCache, { fn from(inner: TransactionScan<'scan, R>) -> Self { ScanStream::Transaction { @@ -142,9 +146,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: LruCache, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -153,9 +158,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: LruCache, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -164,27 +170,30 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: LruCache, { fn from(inner: SsTableScan<'scan, R>) -> Self { ScanStream::SsTable { inner } } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, C> From> for ScanStream<'scan, R, C> where R: Record, + C: LruCache, { - fn from(inner: MemProjectionStream<'scan, R>) -> Self { + fn from(inner: MemProjectionStream<'scan, R, C>) -> Self { ScanStream::MemProjection { inner } } } -impl fmt::Debug for ScanStream<'_, R> +impl fmt::Debug for ScanStream<'_, R, C> where R: Record, + C: LruCache, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -198,9 +207,10 @@ where } } -impl<'scan, R> Stream for ScanStream<'scan, R> +impl<'scan, R, C> Stream for ScanStream<'scan, R, C> where R: Record, + C: LruCache + Unpin, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/stream/package.rs b/src/stream/package.rs index c8493e8f..590037f6 100644 --- a/src/stream/package.rs +++ b/src/stream/package.rs @@ -4,7 +4,9 @@ use std::{ }; use futures_core::Stream; +use parquet_lru::LruCache; use pin_project_lite::pin_project; +use ulid::Ulid; use crate::{ inmem::immutable::{ArrowArrays, Builder}, @@ -13,25 +15,27 @@ use crate::{ }; pin_project! { - pub struct PackageStream<'package, R> + pub struct PackageStream<'package, R, C> where R: Record, + C: LruCache, { row_count: usize, batch_size: usize, - inner: MergeStream<'package, R>, + inner: MergeStream<'package, R, C>, builder: ::Builder, projection_indices: Option>, } } -impl<'package, R> PackageStream<'package, R> +impl<'package, R, C> PackageStream<'package, R, C> where R: Record, + C: LruCache, { pub(crate) fn new( batch_size: usize, - merge: MergeStream<'package, R>, + merge: MergeStream<'package, R, C>, projection_indices: Option>, instance: &RecordInstance, ) -> Self { @@ -45,9 +49,10 @@ where } } -impl<'package, R> Stream for PackageStream<'package, R> +impl<'package, R, C> Stream for PackageStream<'package, R, C> where R: Record, + C: LruCache + Unpin, { type Item = Result; @@ -84,7 +89,9 @@ mod tests { use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt32Array}; use fusio::{disk::TokioFs, path::Path, DynFs}; use futures_util::StreamExt; + use parquet_lru::NoopCache; use tempfile::TempDir; + use ulid::Ulid; use crate::{ inmem::{ @@ -177,7 +184,7 @@ mod tests { .await .unwrap(); - let merge = MergeStream::::from_vec( + let merge = MergeStream::>::from_vec( vec![m1 .scan((Bound::Unbounded, Bound::Unbounded), 6.into()) .into()], diff --git a/src/transaction.rs b/src/transaction.rs index 3a9d38c1..289cf2a4 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -12,7 +12,9 @@ use async_lock::RwLockReadGuard; use flume::SendError; use lockable::AsyncLimit; use parquet::{arrow::ProjectionMask, errors::ParquetError}; +use parquet_lru::LruCache; use thiserror::Error; +use ulid::Ulid; use crate::{ compaction::CompactTask, @@ -45,9 +47,10 @@ where } /// optimistic ACID transaction, open with /// [`DB::transaction`](crate::DB::transaction) method -pub struct Transaction<'txn, R> +pub struct Transaction<'txn, R, C> where R: Record, + C: LruCache, { ts: Timestamp, local: BTreeMap>, @@ -55,17 +58,20 @@ where version: VersionRef, lock_map: LockMap, manager: Arc, + parquet_cache: C, } -impl<'txn, R> Transaction<'txn, R> +impl<'txn, R, C> Transaction<'txn, R, C> where R: Record + Send, + C: LruCache + Unpin, { pub(crate) fn new( version: VersionRef, share: RwLockReadGuard<'txn, Schema>, lock_map: LockMap, manager: Arc, + parquet_cache: C, ) -> Self { Self { ts: version.load_ts(), @@ -74,6 +80,7 @@ where version, lock_map, manager, + parquet_cache, } } @@ -88,7 +95,14 @@ where Some(v) => Some(TransactionEntry::Local(v.as_record_ref())), None => self .share - .get(&self.version, &self.manager, key, self.ts, projection) + .get( + &self.version, + &self.manager, + key, + self.ts, + projection, + self.parquet_cache.clone(), + ) .await? .and_then(|entry| { if entry.value().is_none() { @@ -104,7 +118,7 @@ where pub fn scan<'scan>( &'scan self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), - ) -> Scan<'scan, R> { + ) -> Scan<'scan, R, C> { Scan::new( &self.share, &self.manager, @@ -122,6 +136,7 @@ where } Some(transaction_scan) }), + self.parquet_cache.clone(), ) } @@ -276,7 +291,7 @@ mod tests { async fn transaction_read_write() { let temp_dir = TempDir::new().unwrap(); - let db = DB::::new( + let db = DB::::new( DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()), TokioExecutor::new(), ) @@ -404,7 +419,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); @@ -437,7 +452,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db = DB::::new(option, TokioExecutor::new()) + let db = DB::::new(option, TokioExecutor::new()) .await .unwrap(); diff --git a/src/version/mod.rs b/src/version/mod.rs index 37d15c65..ceac25a5 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -13,8 +13,10 @@ use std::{ use flume::{SendError, Sender}; use fusio::DynFs; use parquet::arrow::ProjectionMask; +use parquet_lru::LruCache; use thiserror::Error; use tracing::error; +use ulid::Ulid; use crate::{ fs::{manager::StoreManager, FileId, FileType}, @@ -115,12 +117,16 @@ impl Version where R: Record, { - pub(crate) async fn query( + pub(crate) async fn query( &self, manager: &StoreManager, key: &TimestampedRef, projection_mask: ProjectionMask, - ) -> Result>, VersionError> { + parquet_cache: C, + ) -> Result>, VersionError> + where + C: LruCache, + { let level_0_path = self .option .level_fs_path(0) @@ -131,7 +137,14 @@ where continue; } if let Some(entry) = self - .table_query(level_0_fs, key, 0, &scope.gen, projection_mask.clone()) + .table_query( + level_0_fs, + key, + 0, + &scope.gen, + projection_mask.clone(), + parquet_cache.clone(), + ) .await? { return Ok(Some(entry)); @@ -158,6 +171,7 @@ where leve, &sort_runs[index].gen, projection_mask.clone(), + parquet_cache.clone(), ) .await? { @@ -168,14 +182,18 @@ where Ok(None) } - async fn table_query( + async fn table_query( &self, store: &Arc, key: &TimestampedRef<::Key>, level: usize, gen: &FileId, projection_mask: ProjectionMask, - ) -> Result>, VersionError> { + parquet_cache: C, + ) -> Result>, VersionError> + where + C: LruCache, + { let file = store .open_options( &self.option.table_path(gen, level), @@ -183,7 +201,7 @@ where ) .await .map_err(VersionError::Fusio)?; - SsTable::::open(file) + SsTable::::open(parquet_cache, *gen, file) .await? .get(key, projection_mask) .await @@ -200,15 +218,20 @@ where self.level_slice[level].len() } - pub(crate) async fn streams<'streams>( + #[allow(clippy::too_many_arguments)] + pub(crate) async fn streams<'streams, C>( &self, manager: &StoreManager, - streams: &mut Vec>, + streams: &mut Vec>, range: (Bound<&'streams R::Key>, Bound<&'streams R::Key>), ts: Timestamp, limit: Option, projection_mask: ProjectionMask, - ) -> Result<(), VersionError> { + parquet_cache: C, + ) -> Result<(), VersionError> + where + C: LruCache, + { let level_0_path = self .option .level_fs_path(0) @@ -225,7 +248,7 @@ where ) .await .map_err(VersionError::Fusio)?; - let table = SsTable::open(file).await?; + let table = SsTable::open(parquet_cache.clone(), scope.gen, file).await?; streams.push(ScanStream::SsTable { inner: table @@ -270,6 +293,7 @@ where limit, projection_mask.clone(), level_fs.clone(), + parquet_cache.clone(), ) .unwrap(), }); diff --git a/tests/data_integrity.rs b/tests/data_integrity.rs index 239d0d83..ee9f1404 100644 --- a/tests/data_integrity.rs +++ b/tests/data_integrity.rs @@ -72,7 +72,8 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap()); - let db: DB = DB::new(option, TokioExecutor::new()).await.unwrap(); + let db: DB = + DB::new(option, TokioExecutor::new()).await.unwrap(); for _ in 0..WRITE_TIMES { let customer = gen_record(&mut rng, &mut primary_key_count);