Skip to content

Commit

Permalink
refactor: migrate OpenDal to 0.39
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 13, 2023
1 parent dd12ab0 commit 8d6dcbe
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 60 deletions.
13 changes: 11 additions & 2 deletions src/common/datasource/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Lister {
Source::Dir => {
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?;

Expand All @@ -76,7 +76,16 @@ impl Lister {
path: &file_full_path,
},
)?;
Ok(vec![Entry::new(&file_full_path)])
// Safety: file must exists
let file = self
.object_store
.list_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?
.into_iter()
.find(|f| f.name() == filename)
.unwrap();
Ok(vec![file])
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ mod tests {
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::{FutureExt, TryStreamExt};
use futures_util::FutureExt;
use object_store::ObjectStore;

use super::*;
Expand Down Expand Up @@ -492,11 +492,7 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
.try_collect()
.await
.unwrap();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}
Expand Down
18 changes: 5 additions & 13 deletions src/common/procedure/src/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use futures::{Stream, StreamExt};
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;

use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
Expand Down Expand Up @@ -86,7 +86,8 @@ impl StateStore for ObjectStateStore {
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let mut lister = self
.store
.scan(path)
.lister_with(path)
.delimiter("")
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
Expand All @@ -110,17 +111,8 @@ impl StateStore for ObjectStateStore {
})
.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
let metadata = store
.metadata(&entry, Metakey::Mode)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
if let EntryMode::FILE = metadata.mode() {

if let EntryMode::FILE = entry.metadata().mode() {
let value = store
.read(key)
.await
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use futures_util::StreamExt;
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
use object_store::{Entry, EntryMode, ObjectStore};
use regex::Regex;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -256,11 +256,7 @@ impl StatementExecutor {
let table_schema = table.schema().arrow_schema().clone();

for entry in entries.iter() {
let metadata = object_store
.metadata(entry, Metakey::Mode)
.await
.context(error::ReadObjectSnafu { path: entry.path() })?;
if metadata.mode() != EntryMode::FILE {
if entry.metadata().mode() != EntryMode::FILE {
continue;
}
let path = entry.path();
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ManifestObjectStore {
where
F: Fn(Entry) -> Option<R>,
{
let streamer = match self.object_store.list(&self.path).await {
let streamer = match self.object_store.lister_with(&self.path).await {
Ok(streamer) => streamer,
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exists: {}", self.path);
Expand Down
19 changes: 9 additions & 10 deletions src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use std::time::Duration;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use futures::StreamExt;
use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::sleep;

use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::error::{self, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::region::RegionMapRef;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};

Expand Down Expand Up @@ -115,17 +115,16 @@ pub(crate) async fn remove_region_dir_once(
let mut has_parquet_file = false;
// record all paths that neither ends with .parquet nor the marker file
let mut files_to_remove_first = vec![];
let mut files = object_store.scan(region_path).await.context(OpenDalSnafu)?;
while let Some(file) = files.next().await {
let file = file.context(OpenDalSnafu)?;
let mut files = object_store
.lister_with(region_path)
.await
.context(OpenDalSnafu)?;
while let Some(file) = files.try_next().await.context(error::OpenDalSnafu)? {
if file.path().ends_with(".parquet") {
has_parquet_file = true;
break;
} else if !file.path().ends_with(DROPPING_MARKER_FILE) {
let meta = object_store
.metadata(&file, Metakey::Mode)
.await
.context(OpenDalSnafu)?;
let meta = file.metadata();
if meta.mode() == EntryMode::FILE {
files_to_remove_first.push(file.path().to_string());
}
Expand Down
13 changes: 4 additions & 9 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use lru::LruCache;
use metrics::increment_counter;
use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write};
use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend,
RpDelete, RpList, RpRead, RpWrite,
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -114,7 +114,6 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type BlockingWriter = I::BlockingWriter;
type Pager = I::Pager;
type BlockingPager = I::BlockingPager;
type Appender = I::Appender;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -146,7 +145,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;

while let Some(bytes) = reader.next().await {
writer.write(bytes?).await?;
writer.write(&bytes?).await?;
}

writer.close().await?;
Expand Down Expand Up @@ -178,10 +177,6 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
self.inner.write(path, args).await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let cache_path = md5::compute(path);
let lru_cache = &self.lru_cache;
Expand Down
22 changes: 10 additions & 12 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_test_util::temp_dir::create_temp_dir;
use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, Operator, OperatorBuilder};
Expand All @@ -37,7 +37,7 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
assert_eq!("Hello, World!", String::from_utf8(bs)?);

// Read range from object;
let bs = store.range_read(file_name, 1..=11).await?;
let bs = store.read_with(file_name).range(1..=11).await?;
assert_eq!("ello, World", String::from_utf8(bs)?);

// Get object's Metadata
Expand All @@ -62,24 +62,23 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
store.write(p3, "Hello, object3!").await?;

// List objects
let lister = store.list("/").await?;
let entries = util::collect(lister).await?;
let entries = store.list("/").await?;
assert_eq!(3, entries.len());

store.delete(p1).await?;
store.delete(p3).await?;

// List objects again
// Only o2 is exists
let entries = util::collect(store.list("/").await?).await?;
let entries = store.list("/").await?;
assert_eq!(1, entries.len());
assert_eq!(p2, entries.get(0).unwrap().path());

let content = store.read(p2).await?;
assert_eq!("Hello, object2!", String::from_utf8(content)?);

store.delete(p2).await?;
let entries = util::collect(store.list("/").await?).await?;
let entries = store.list("/").await?;
assert!(entries.is_empty());
Ok(())
}
Expand Down Expand Up @@ -225,8 +224,7 @@ async fn assert_cache_files(
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let obs = store.list("/").await?;
let objects = util::collect(obs).await?;
let objects = store.list("/").await?;

// compare the cache file with the expected cache file; ignore orders
for o in objects {
Expand Down Expand Up @@ -284,10 +282,10 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p2, "Hello, object2!").await.unwrap();

// create cache by read object
let _ = store.range_read(p1, 0..).await?;
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.range_read(p2, 0..).await?;
let _ = store.range_read(p2, 7..).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;

assert_cache_files(
Expand Down Expand Up @@ -327,7 +325,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p3, "Hello, object3!").await.unwrap();

let _ = store.read(p3).await.unwrap();
let _ = store.range_read(p3, 0..5).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();

assert_cache_files(
&cache_store,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl ManifestObjectStore {
{
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(ListObjectsSnafu { path: &self.path })?;
streamer
Expand Down

0 comments on commit 8d6dcbe

Please sign in to comment.