Skip to content

Commit

Permalink
build(deps): Upgrade OpenDAL to 0.47 (#4224)
Browse files Browse the repository at this point in the history
* catch up changes

Signed-off-by: tison <[email protected]>

* fmt

Signed-off-by: tison <[email protected]>

* Fix cache for 0471 (#7)

* Fix cache for 0471

Signed-off-by: Xuanwo <[email protected]>

* Make clippy happy

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>

* tidy

Signed-off-by: tison <[email protected]>

* use opendal's exported type

Signed-off-by: tison <[email protected]>

* clippy

Signed-off-by: tison <[email protected]>

* fmt

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
  • Loading branch information
tisonkun and Xuanwo authored Jul 1, 2024
1 parent e531326 commit 2665616
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 158 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);
.into_bytes_stream(..)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let mut upstream = compression_type.convert_stream(reader).fuse();

Expand Down
3 changes: 3 additions & 0 deletions src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ impl FileFormat for CsvFormat {
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;

let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();

let decoded = self.compression_type.convert_async_read(reader);
Expand Down
3 changes: 3 additions & 0 deletions src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ impl FileFormat for JsonFormat {
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;

let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();

let decoded = self.compression_type.convert_async_read(reader);
Expand Down
4 changes: 4 additions & 0 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ impl FileFormat for ParquetFormat {
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;

let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();

let metadata = reader
Expand Down Expand Up @@ -129,6 +132,7 @@ impl LazyParquetFileReader {
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.await?
.compat();
self.reader = Some(reader);
}
Expand Down
16 changes: 9 additions & 7 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod gcs;
mod oss;
mod s3;

use std::sync::Arc;
use std::time::Duration;
use std::{env, path};

Expand All @@ -28,7 +29,7 @@ use common_telemetry::info;
use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{HttpClient, ObjectStore};
use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
Expand Down Expand Up @@ -106,13 +107,14 @@ async fn create_object_store_with_cache(
if let Some(path) = cache_path {
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let mut builder = Fs::default();
builder.root(path).atomic_write_dir(&atomic_temp_dir);
let cache_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu)?
.finish();

let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize)
let cache_store = {
let mut builder = Fs::default();
builder.root(path).atomic_write_dir(&atomic_temp_dir);
builder.build().context(error::InitBackendSnafu)?
};

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;

Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ impl WriteCache {
.reader(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..cached_value.content_length());
.into_futures_async_read(0..cached_value.content_length())
.await
.context(error::OpenDalSnafu)?;

let mut writer = remote_store
.writer_with(upload_path)
Expand Down
18 changes: 13 additions & 5 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use store_api::storage::RegionId;

use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{
ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu,
Result,
ApplyIndexSnafu, OpenDalSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu,
PuffinReadMetadataSnafu, Result,
};
use crate::metrics::{
INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
Expand Down Expand Up @@ -128,11 +128,19 @@ impl SstIndexApplier {
return Ok(None);
};

Ok(file_cache
let Some(reader) = file_cache
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
.map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64))
.map(PuffinFileReader::new))
else {
return Ok(None);
};

let reader = reader
.into_futures_async_read(0..indexed_value.file_size as u64)
.await
.context(OpenDalSnafu)?;

Ok(Some(PuffinFileReader::new(reader)))
}

/// Helper function to create a [`PuffinFileReader`] from the remote index file.
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/sst/index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ impl InstrumentedStore {
.reader(path)
.await
.context(OpenDalSnafu)?
.into_futures_async_read(0..meta.content_length());
.into_futures_async_read(0..meta.content_length())
.await
.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncRead::new(
reader,
read_byte_count,
Expand Down
2 changes: 1 addition & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.46", features = [
opendal = { version = "0.47", features = [
"layers-tracing",
"services-azblob",
"services-fs",
Expand Down
30 changes: 16 additions & 14 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,26 @@

use std::sync::Arc;

use opendal::raw::oio::ReadDyn;
use opendal::raw::oio::Reader;
use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::{Operator, Result};
use opendal::Result;
mod read_cache;
use common_telemetry::info;
use read_cache::ReadCache;

/// An opendal layer with local LRU file cache supporting.
#[derive(Clone)]
pub struct LruCacheLayer {
pub struct LruCacheLayer<C: Access> {
// The read cache
read_cache: ReadCache,
read_cache: ReadCache<C>,
}

impl LruCacheLayer {
impl<C: Access> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Operator, capacity: usize) -> Result<Self> {
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?;

Expand All @@ -52,12 +52,12 @@ impl LruCacheLayer {

/// Returns the read cache statistics info `(EntryCount, SizeInBytes)`.
pub async fn read_cache_stat(&self) -> (u64, u64) {
self.read_cache.stat().await
self.read_cache.cache_stat().await
}
}

impl<I: Access> Layer<I> for LruCacheLayer {
type LayeredAccess = LruCacheAccess<I>;
impl<I: Access, C: Access> Layer<I> for LruCacheLayer<C> {
type LayeredAccess = LruCacheAccess<I, C>;

fn layer(&self, inner: I) -> Self::LayeredAccess {
LruCacheAccess {
Expand All @@ -68,14 +68,14 @@ impl<I: Access> Layer<I> for LruCacheLayer {
}

#[derive(Debug)]
pub struct LruCacheAccess<I> {
pub struct LruCacheAccess<I, C> {
inner: I,
read_cache: ReadCache,
read_cache: ReadCache<C>,
}

impl<I: Access> LayeredAccess for LruCacheAccess<I> {
impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
type Inner = I;
type Reader = Arc<dyn ReadDyn>;
type Reader = Reader;
type BlockingReader = I::BlockingReader;
type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter;
Expand All @@ -87,7 +87,9 @@ impl<I: Access> LayeredAccess for LruCacheAccess<I> {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.read_cache.read(&self.inner, path, args).await
self.read_cache
.read_from_cache(&self.inner, path, args)
.await
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
Loading

0 comments on commit 2665616

Please sign in to comment.