From e00775b0ebcb025c028650e7f7dee21759029ef1 Mon Sep 17 00:00:00 2001 From: Kould Date: Mon, 11 Nov 2024 18:52:20 +0800 Subject: [PATCH] chore: optimization with review comments --- Cargo.toml | 8 +++---- bindings/python/src/db.rs | 4 ++-- bindings/python/src/transaction.rs | 18 +++++++-------- rust-toolchain.toml | 2 +- src/inmem/mutable.rs | 6 ++--- tonbo_ext_reader/Cargo.toml | 8 +++---- tonbo_ext_reader/src/foyer_reader.rs | 17 +++++++++----- tonbo_ext_reader/src/lib.rs | 34 ++++++++++++---------------- tonbo_ext_reader/src/lru_reader.rs | 17 +++++++++----- 9 files changed, 59 insertions(+), 55 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 95ea8aff..c7e47b3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ resolver = "2" version = "0.2.0" [package.metadata] -msrv = "1.81.0" +msrv = "1.82.0" [features] bench = ["redb", "rocksdb", "sled"] @@ -58,7 +58,7 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [ +fusio = { package = "fusio", version = "0.3.3", features = [ "aws", "dyn", "fs", @@ -66,11 +66,11 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a9 "tokio", "tokio-http", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [ +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "aws", "tokio", ] } -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" } +fusio-parquet = { package = "fusio-parquet", version = "0.2.2" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/bindings/python/src/db.rs b/bindings/python/src/db.rs index 0af488fb..e640301b 100644 --- a/bindings/python/src/db.rs +++ b/bindings/python/src/db.rs @@ -11,7 +11,7 @@ use tonbo::{ record::{ColumnDesc, DynRecord}, DB, }; -use tonbo_ext_reader::foyer_reader::FoyerReader; +use tonbo_ext_reader::lru_reader::LruReader; use crate::{ column::Column, error::{CommitError, DbError}, @@ -27,7 +27,7 @@ type PyExecutor = TokioExecutor; pub struct TonboDB { desc: Arc>, primary_key_index: usize, - db: Arc>, + db: Arc>, } #[pymethods] diff --git a/bindings/python/src/transaction.rs b/bindings/python/src/transaction.rs index 2c41c024..15e70a97 100644 --- a/bindings/python/src/transaction.rs +++ b/bindings/python/src/transaction.rs @@ -7,7 +7,7 @@ use pyo3::{ }; use pyo3_asyncio::tokio::future_into_py; use tonbo::{record::DynRecord, transaction, Projection}; -use tonbo_ext_reader::foyer_reader::FoyerReader; +use tonbo_ext_reader::lru_reader::LruReader; use crate::{ column::Column, error::{repeated_commit_err, CommitError, DbError}, @@ -18,14 +18,14 @@ use crate::{ #[pyclass] pub struct Transaction { - txn: Option>, + txn: Option>, desc: Arc>, primary_key_index: usize, } impl Transaction { pub(crate) fn new<'txn>( - txn: transaction::Transaction<'txn, DynRecord, FoyerReader>, + txn: transaction::Transaction<'txn, DynRecord, LruReader>, desc: Arc>, ) -> Self { let primary_key_index = desc @@ -37,8 +37,8 @@ impl Transaction { Transaction { txn: Some(unsafe { transmute::< - transaction::Transaction<'txn, DynRecord, FoyerReader>, - transaction::Transaction<'static, DynRecord, FoyerReader>, + transaction::Transaction<'txn, DynRecord, LruReader>, + transaction::Transaction<'static, DynRecord, LruReader>, >(txn) }), desc, @@ -84,8 +84,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord, FoyerReader>, - &'static transaction::Transaction<'_, DynRecord, FoyerReader>, + &transaction::Transaction<'_, DynRecord, LruReader>, + &'static transaction::Transaction<'_, DynRecord, LruReader>, >(txn) }; @@ -169,8 +169,8 @@ impl Transaction { let txn = self.txn.as_ref().unwrap(); let txn = unsafe { transmute::< - &transaction::Transaction<'_, DynRecord, FoyerReader>, - &'static transaction::Transaction<'_, DynRecord, FoyerReader>, + &transaction::Transaction<'_, DynRecord, LruReader>, + &'static transaction::Transaction<'_, DynRecord, LruReader>, >(txn) }; let col_desc = self.desc.get(self.primary_key_index).unwrap(); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index db16da3a..704483a8 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.81.0" +channel = "1.82.0" components = ["clippy", "rust-analyzer", "rustfmt"] diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index e5a1ea3c..230baffa 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -5,7 +5,7 @@ use crossbeam_skiplist::{ map::{Entry, Range}, SkipMap, }; -use fusio::{buffered::BufWriter, dynamic::DynFile, DynFs}; +use fusio::{buffered::BufWriter, DynFs, DynWrite}; use ulid::Ulid; use crate::{ @@ -37,7 +37,7 @@ where R: Record, { pub(crate) data: SkipMap, Option>, - wal: Option, R>>>, + wal: Option, R>>>, pub(crate) trigger: Arc + Send + Sync>>, } @@ -61,7 +61,7 @@ where ) .await?, option.wal_buffer_size, - )) as Box; + )) as Box; wal = Some(Mutex::new(WalFile::new(file, file_id))); }; diff --git a/tonbo_ext_reader/Cargo.toml b/tonbo_ext_reader/Cargo.toml index 1ebe3345..8e85b093 100644 --- a/tonbo_ext_reader/Cargo.toml +++ b/tonbo_ext_reader/Cargo.toml @@ -11,7 +11,7 @@ bytes = { version = "1.7", features = ["serde"] } foyer = { version = "0.12" } futures-core = "0.3" futures-util = "0.3" -fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" } +fusio-parquet = { package = "fusio-parquet", version = "0.2.2" } lru = "0.12" parking_lot = "0.12" parquet = { version = "53", features = ["async"] } @@ -19,7 +19,7 @@ thiserror = "1" ulid = { version = "1", features = ["serde"] } [dev-dependencies] -fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [ +fusio = { package = "fusio", version = "0.3.3", features = [ "aws", "dyn", "fs", @@ -27,8 +27,8 @@ fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a9 "tokio", "tokio-http", ] } -fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [ +fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [ "tokio", ] } tempfile = "3" -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio = { version = "1", features = ["full"] } diff --git a/tonbo_ext_reader/src/foyer_reader.rs b/tonbo_ext_reader/src/foyer_reader.rs index 2411acc5..70090bad 100644 --- a/tonbo_ext_reader/src/foyer_reader.rs +++ b/tonbo_ext_reader/src/foyer_reader.rs @@ -10,7 +10,7 @@ use futures_util::FutureExt; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; use ulid::Ulid; -use crate::{CacheError, CacheReader, MetaCache, RangeCache}; +use crate::{CacheError, CacheReader, TonboCache}; #[derive(Debug, Clone)] pub struct FoyerMetaCache(Cache>); @@ -24,9 +24,9 @@ pub struct FoyerReader { meta_cache: FoyerMetaCache, } -impl MetaCache for FoyerMetaCache { - fn get(&self, gen: &Ulid) -> Option> { - self.0.get(gen).map(|entry| entry.value().clone()) +impl TonboCache> for FoyerMetaCache { + async fn get(&self, gen: &Ulid) -> Result>, CacheError> { + Ok(self.0.get(gen).map(|entry| entry.value().clone())) } fn insert(&self, gen: Ulid, data: Arc) -> Arc { @@ -34,7 +34,7 @@ impl MetaCache for FoyerMetaCache { } } -impl RangeCache for FoyerRangeCache { +impl TonboCache<(Ulid, Range), Bytes> for FoyerRangeCache { async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { Ok(self.0.get(key).await?.map(|entry| entry.value().clone())) } @@ -112,7 +112,12 @@ impl AsyncFileReader for FoyerReader { fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { async move { - if let Some(meta) = self.meta_cache.get(&self.gen) { + if let Some(meta) = self + .meta_cache + .get(&self.gen) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { return Ok(meta); } diff --git a/tonbo_ext_reader/src/lib.rs b/tonbo_ext_reader/src/lib.rs index 1b738d35..f3d7f8df 100644 --- a/tonbo_ext_reader/src/lib.rs +++ b/tonbo_ext_reader/src/lib.rs @@ -9,24 +9,18 @@ use ulid::Ulid; pub mod foyer_reader; pub mod lru_reader; -pub trait MetaCache: Sync + Send + Clone + Debug { - fn get(&self, gen: &Ulid) -> Option>; - - fn insert(&self, gen: Ulid, data: Arc) -> Arc; -} - -pub trait RangeCache: Sync + Send + Clone + Debug { +pub trait TonboCache: Sync + Send + Clone + Debug { fn get( &self, - key: &(Ulid, Range), - ) -> impl std::future::Future, CacheError>> + Send; + key: &K, + ) -> impl std::future::Future, CacheError>> + Send; - fn insert(&self, key: (Ulid, Range), bytes: Bytes) -> Bytes; + fn insert(&self, key: K, value: V) -> V; } pub trait CacheReader: AsyncFileReader + Unpin { - type MetaCache: MetaCache; - type RangeCache: RangeCache; + type MetaCache: TonboCache>; + type RangeCache: TonboCache<(Ulid, Range), Bytes>; fn new( meta_cache: Self::MetaCache, @@ -37,14 +31,14 @@ pub trait CacheReader: AsyncFileReader + Unpin { #[allow(clippy::too_many_arguments)] fn build_caches( - cache_path: impl AsRef + Send, - cache_meta_capacity: usize, - cache_meta_shards: usize, - cache_meta_ratio: f64, - cache_range_memory: usize, - cache_range_disk: usize, - cache_range_capacity: usize, - cache_range_shards: usize, + path: impl AsRef + Send, + meta_capacity: usize, + meta_shards: usize, + meta_ratio: f64, + range_memory: usize, + range_disk: usize, + range_capacity: usize, + range_shards: usize, ) -> impl std::future::Future> + Send; } diff --git a/tonbo_ext_reader/src/lru_reader.rs b/tonbo_ext_reader/src/lru_reader.rs index e3f4a5d3..4a2cbc77 100644 --- a/tonbo_ext_reader/src/lru_reader.rs +++ b/tonbo_ext_reader/src/lru_reader.rs @@ -15,7 +15,7 @@ use parking_lot::Mutex; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; use ulid::Ulid; -use crate::{CacheError, CacheReader, MetaCache, RangeCache}; +use crate::{CacheError, CacheReader, TonboCache}; pub(crate) trait SharedKey: Hash + PartialEq + Eq { fn shared(&self, hash_builder: &S, shared: usize) -> usize; @@ -96,9 +96,9 @@ pub struct LruReader { meta_cache: LruMetaCache, } -impl MetaCache for LruMetaCache { - fn get(&self, gen: &Ulid) -> Option> { - self.0.get(gen, |v| v.map(Arc::clone)) +impl TonboCache> for LruMetaCache { + async fn get(&self, gen: &Ulid) -> Result>, CacheError> { + Ok(self.0.get(gen, |v| v.map(Arc::clone))) } fn insert(&self, gen: Ulid, data: Arc) -> Arc { @@ -107,7 +107,7 @@ impl MetaCache for LruMetaCache { } } -impl RangeCache for LruRangeCache { +impl TonboCache<(Ulid, Range), Bytes> for LruRangeCache { async fn get(&self, key: &(Ulid, Range)) -> Result, CacheError> { Ok(self.0.get(key, |v| v.cloned())) } @@ -139,7 +139,12 @@ impl AsyncFileReader for LruReader { fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { async move { - if let Some(meta) = self.meta_cache.get(&self.gen) { + if let Some(meta) = self + .meta_cache + .get(&self.gen) + .await + .map_err(|e| parquet::errors::ParquetError::External(From::from(e)))? + { return Ok(meta); }