Skip to content

Commit

Permalink
feat: support parquet lru reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 12, 2024
1 parent 25b0389 commit 746d952
Show file tree
Hide file tree
Showing 20 changed files with 574 additions and 145 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
workspace = { members = ["tonbo_macros"] }
workspace = { members = ["parquet-lru", "tonbo_macros"] }

[package]
description = "An embedded persistent KV database in Rust."
Expand Down Expand Up @@ -74,17 +74,18 @@ fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
lockable = "0.0.8"
lockable = "0.1.1"
once_cell = "1"
parquet = { version = "53", features = ["async"] }
parquet-lru = { version = "0.1.0", path = "parquet-lru" }
pin-project-lite = "0.2"
regex = "1"
thiserror = "1"
thiserror = "2.0.3"
tokio = { version = "1", features = ["io-util"], default-features = false }
tokio-util = { version = "0.7" }
tonbo_macros = { version = "0.2.0", path = "tonbo_macros" }
tracing = "0.1"
ulid = "1"
ulid = { version = "1", features = ["serde"] }

# Only used for benchmarks
log = "0.4.22"
Expand Down
1 change: 1 addition & 0 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
"tokio",
] }
futures = { version = "0.3" }
parquet-lru = { version = "0.1.0", path = "../../parquet-lru" }
pyo3 = { version = "0.21.2", features = [
"abi3",
"abi3-py310",
Expand Down
4 changes: 3 additions & 1 deletion bindings/python/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use parquet_lru::NoopCache;
use pyo3::{
prelude::*,
pyclass, pymethods,
Expand All @@ -9,6 +10,7 @@ use pyo3::{
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
use tonbo::{
executor::tokio::TokioExecutor,
fs::FileId,
record::{ColumnDesc, DynRecord},
DB,
};
Expand All @@ -28,7 +30,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor>>,
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
}

#[pymethods]
Expand Down
19 changes: 10 additions & 9 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{mem::transmute, sync::Arc};

use parquet_lru::NoopCache;
use pyo3::{
pyclass, pymethods,
types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple},
Bound, IntoPy, Py, PyAny, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo::{fs::FileId, record::DynRecord, transaction, Projection};

use crate::{
column::Column,
Expand All @@ -18,14 +19,14 @@ use crate::{

#[pyclass]
pub struct Transaction {
txn: Option<transaction::Transaction<'static, DynRecord>>,
txn: Option<transaction::Transaction<'static, DynRecord, NoopCache<FileId>>>,
desc: Arc<Vec<Column>>,
primary_key_index: usize,
}

impl Transaction {
pub(crate) fn new<'txn>(
txn: transaction::Transaction<'txn, DynRecord>,
txn: transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
desc: Arc<Vec<Column>>,
) -> Self {
let primary_key_index = desc
Expand All @@ -37,8 +38,8 @@ impl Transaction {
Transaction {
txn: Some(unsafe {
transmute::<
transaction::Transaction<'txn, DynRecord>,
transaction::Transaction<'static, DynRecord>,
transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
transaction::Transaction<'static, DynRecord, NoopCache<FileId>>,
>(txn)
}),
desc,
Expand Down Expand Up @@ -84,8 +85,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord>,
&'static transaction::Transaction<'_, DynRecord>,
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
>(txn)
};

Expand Down Expand Up @@ -169,8 +170,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord>,
&'static transaction::Transaction<'_, DynRecord>,
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
>(txn)
};
let col_desc = self.desc.get(self.primary_key_index).unwrap();
Expand Down
11 changes: 8 additions & 3 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use datafusion::{
use fusio::path::Path;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet_lru::NoopCache;
use tokio::fs;
use tonbo::fs::FileId;
use tonbo::{
executor::tokio::TokioExecutor, inmem::immutable::ArrowArrays, record::Record, DbOption, DB,
};
Expand All @@ -41,12 +43,12 @@ pub struct Music {
}

struct MusicProvider {
db: Arc<DB<Music, TokioExecutor>>,
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
}

struct MusicExec {
cache: PlanProperties,
db: Arc<DB<Music, TokioExecutor>>,
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
Expand Down Expand Up @@ -95,7 +97,10 @@ impl TableProvider for MusicProvider {
}

impl MusicExec {
fn new(db: Arc<DB<Music, TokioExecutor>>, projection: Option<&Vec<usize>>) -> Self {
fn new(
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
projection: Option<&Vec<usize>>,
) -> Self {
let schema = Music::arrow_schema();
let schema = if let Some(projection) = &projection {
Arc::new(schema.project(projection).unwrap())
Expand Down
13 changes: 13 additions & 0 deletions parquet-lru/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
edition = "2021"
name = "parquet-lru"
version = "0.1.0"

[dependencies]
bytes = { version = "1.8.0", features = ["serde"] }
foyer = "0.12.2"
futures-core = "0.3.31"
futures-util = "0.3.31"
parquet = { version = "53.2.0", features = ["async"] }
serde = "1.0.214"
thiserror = "2.0.3"
208 changes: 208 additions & 0 deletions parquet-lru/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use std::{future::Future, hash::Hash, marker::PhantomData, ops::Range, sync::Arc};

use bytes::Bytes;
use futures_core::future::BoxFuture;
use futures_util::future::FutureExt;
use parquet::{
arrow::async_reader::AsyncFileReader,
errors::{ParquetError, Result},
file::metadata::ParquetMetaData,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[derive(Default)]
pub struct Options {
meta_capacity: usize,
data_capacity: usize,
}

impl Options {
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
self.meta_capacity = meta_capacity;
self
}

pub fn data_capacity(mut self, data_capacity: usize) -> Self {
self.data_capacity = data_capacity;
self
}
}

pub trait LruCache<K>: Clone + Send + Sync + 'static {
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;

fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;

fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
where
R: AsyncFileReader + 'static;
}

#[derive(Clone)]
pub struct FoyerCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
{
inner: Arc<FoyerCacheInner<K>>,
}

pub struct FoyerCacheInner<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
{
meta: foyer::Cache<K, Arc<ParquetMetaData>>,
data: foyer::HybridCache<(K, Range<usize>), Bytes>,
}

impl<K> LruCache<K> for FoyerCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
{
type LruReader<R: AsyncFileReader + 'static> = ParquetLru<K, R>;

async fn new(options: Options) -> Result<Self, Error> {
Ok(Self {
inner: Arc::new(FoyerCacheInner {
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
data: foyer::HybridCacheBuilder::new()
.memory(options.data_capacity)
.storage(foyer::Engine::Large)
.build()
.await
.map_err(|e| Error::Foyer(e.into()))?,
}),
})
}

async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> ParquetLru<K, R> {
ParquetLru::new(self.clone(), key, reader)
}
}

pub struct ParquetLru<K, R>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
{
cache: FoyerCache<K>,
key: K,
reader: R,
}

impl<K, R> ParquetLru<K, R>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + 'static,
R: AsyncFileReader,
{
fn new(cache: FoyerCache<K>, key: K, reader: R) -> Self {
Self { cache, key, reader }
}
}

impl<K, R> AsyncFileReader for ParquetLru<K, R>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
R: AsyncFileReader,
{
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
async move {
if let Some(data) = self
.cache
.inner
.data
.get(&(self.key.clone(), range.clone()))
.await
.map_err(|e| ParquetError::External(e.into()))?
{
Ok(data.value().clone())
} else {
let data = self.reader.get_bytes(range.clone()).await?;
self.cache
.inner
.data
.insert((self.key.clone(), range), data.clone());
Ok(data)
}
}
.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
async move {
if let Some(meta) = self.cache.inner.meta.get(&self.key) {
Ok(meta.value().clone())
} else {
let meta = self.reader.get_metadata().await?;
self.cache.inner.meta.insert(self.key.clone(), meta.clone());
Ok(meta)
}
}
.boxed()
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut missed = Vec::with_capacity(ranges.len());
let mut results = Vec::with_capacity(ranges.len());
for (id, range) in ranges.iter().enumerate() {
if let Some(data) = self
.cache
.inner
.data
.get(&(self.key.clone(), range.clone()))
.await
.map_err(|e| ParquetError::External(e.into()))?
{
results.push((id, data.value().clone()));
} else {
missed.push((id, range));
}
}
if !missed.is_empty() {
let data = self
.reader
.get_byte_ranges(missed.iter().map(|&(_, r)| r.clone()).collect())
.await?;
for (id, range) in missed {
let data = data[id].clone();
self.cache
.inner
.data
.insert((self.key.clone(), range.clone()), data.clone());
results.push((id, data));
}
}
results.sort_by_key(|(id, _)| *id);
Ok(results.into_iter().map(|(_, data)| data).collect())
}
.boxed()
}
}

#[derive(Clone, Default)]
pub struct NoopCache<K> {
_phantom: PhantomData<K>,
}

impl<K> LruCache<K> for NoopCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
{
type LruReader<R: AsyncFileReader + 'static> = R;

async fn new(_options: Options) -> Result<Self, Error> {
Ok(Self {
_phantom: PhantomData,
})
}

async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
reader
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error("Foyer error: {0}")]
Foyer(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
Loading

0 comments on commit 746d952

Please sign in to comment.