Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: migrate OpenDal to 0.39 #2383

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 8 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions src/common/datasource/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Lister {
Source::Dir => {
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?;

Expand All @@ -76,7 +76,16 @@ impl Lister {
path: &file_full_path,
},
)?;
Ok(vec![Entry::new(&file_full_path)])

Ok(self
.object_store
.list_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?
.into_iter()
.find(|f| f.name() == filename)
.map(|f| vec![f])
.unwrap_or_default())
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ mod tests {
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::{FutureExt, TryStreamExt};
use futures_util::FutureExt;
use object_store::ObjectStore;

use super::*;
Expand Down Expand Up @@ -492,11 +492,7 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
.try_collect()
.await
.unwrap();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}
Expand Down
18 changes: 5 additions & 13 deletions src/common/procedure/src/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use futures::{Stream, StreamExt};
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;

use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
Expand Down Expand Up @@ -86,7 +86,8 @@ impl StateStore for ObjectStateStore {
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let mut lister = self
.store
.scan(path)
.lister_with(path)
.delimiter("")
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
Expand All @@ -110,17 +111,8 @@ impl StateStore for ObjectStateStore {
})
.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
let metadata = store
.metadata(&entry, Metakey::Mode)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
if let EntryMode::FILE = metadata.mode() {

if let EntryMode::FILE = entry.metadata().mode() {
let value = store
.read(key)
.await
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use futures_util::StreamExt;
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
use object_store::{Entry, EntryMode, ObjectStore};
use regex::Regex;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -256,11 +256,7 @@ impl StatementExecutor {
let table_schema = table.schema().arrow_schema().clone();

for entry in entries.iter() {
let metadata = object_store
.metadata(entry, Metakey::Mode)
.await
.context(error::ReadObjectSnafu { path: entry.path() })?;
if metadata.mode() != EntryMode::FILE {
if entry.metadata().mode() != EntryMode::FILE {
continue;
}
let path = entry.path();
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ManifestObjectStore {
where
F: Fn(Entry) -> Option<R>,
{
let streamer = match self.object_store.list(&self.path).await {
let streamer = match self.object_store.lister_with(&self.path).await {
Ok(streamer) => streamer,
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exists: {}", self.path);
Expand Down
19 changes: 9 additions & 10 deletions src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use std::time::Duration;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use futures::StreamExt;
use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::sleep;

use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::error::{self, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::region::RegionMapRef;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};

Expand Down Expand Up @@ -115,17 +115,16 @@ pub(crate) async fn remove_region_dir_once(
let mut has_parquet_file = false;
// record all paths that neither ends with .parquet nor the marker file
let mut files_to_remove_first = vec![];
let mut files = object_store.scan(region_path).await.context(OpenDalSnafu)?;
while let Some(file) = files.next().await {
let file = file.context(OpenDalSnafu)?;
let mut files = object_store
.lister_with(region_path)
.await
.context(OpenDalSnafu)?;
while let Some(file) = files.try_next().await.context(error::OpenDalSnafu)? {
if file.path().ends_with(".parquet") {
has_parquet_file = true;
break;
} else if !file.path().ends_with(DROPPING_MARKER_FILE) {
let meta = object_store
.metadata(&file, Metakey::Mode)
.await
.context(OpenDalSnafu)?;
let meta = file.metadata();
if meta.mode() == EntryMode::FILE {
files_to_remove_first.push(file.path().to_string());
}
Expand Down
5 changes: 4 additions & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ futures = { version = "0.3" }
lru = "0.9"
md5 = "0.7"
metrics.workspace = true
opendal = { version = "0.36", features = ["layers-tracing", "layers-metrics"] }
opendal = { git = "https://github.com/apache/incubator-opendal.git", rev = "7d5524f35f29f7eda8131e8b0873590b7cbe34ab", features = [
"layers-tracing",
"layers-metrics",
] }
pin-project = "1.0"
tokio.workspace = true
uuid.workspace = true
Expand Down
13 changes: 4 additions & 9 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use lru::LruCache;
use metrics::increment_counter;
use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write};
use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend,
RpDelete, RpList, RpRead, RpWrite,
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -114,7 +114,6 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type BlockingWriter = I::BlockingWriter;
type Pager = I::Pager;
type BlockingPager = I::BlockingPager;
type Appender = I::Appender;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -146,7 +145,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;

while let Some(bytes) = reader.next().await {
writer.write(bytes?).await?;
writer.write(&bytes?).await?;
}

writer.close().await?;
Expand Down Expand Up @@ -178,10 +177,6 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
self.inner.write(path, args).await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let cache_path = md5::compute(path);
let lru_cache = &self.lru_cache;
Expand Down
22 changes: 10 additions & 12 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_test_util::temp_dir::create_temp_dir;
use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, Operator, OperatorBuilder};
Expand All @@ -37,7 +37,7 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
assert_eq!("Hello, World!", String::from_utf8(bs)?);

// Read range from object;
let bs = store.range_read(file_name, 1..=11).await?;
let bs = store.read_with(file_name).range(1..=11).await?;
assert_eq!("ello, World", String::from_utf8(bs)?);

// Get object's Metadata
Expand All @@ -62,24 +62,23 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
store.write(p3, "Hello, object3!").await?;

// List objects
let lister = store.list("/").await?;
let entries = util::collect(lister).await?;
let entries = store.list("/").await?;
assert_eq!(3, entries.len());

store.delete(p1).await?;
store.delete(p3).await?;

// List objects again
// Only o2 is exists
let entries = util::collect(store.list("/").await?).await?;
let entries = store.list("/").await?;
assert_eq!(1, entries.len());
assert_eq!(p2, entries.get(0).unwrap().path());

let content = store.read(p2).await?;
assert_eq!("Hello, object2!", String::from_utf8(content)?);

store.delete(p2).await?;
let entries = util::collect(store.list("/").await?).await?;
let entries = store.list("/").await?;
assert!(entries.is_empty());
Ok(())
}
Expand Down Expand Up @@ -225,8 +224,7 @@ async fn assert_cache_files(
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let obs = store.list("/").await?;
let objects = util::collect(obs).await?;
let objects = store.list("/").await?;

// compare the cache file with the expected cache file; ignore orders
for o in objects {
Expand Down Expand Up @@ -284,10 +282,10 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p2, "Hello, object2!").await.unwrap();

// create cache by read object
let _ = store.range_read(p1, 0..).await?;
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.range_read(p2, 0..).await?;
let _ = store.range_read(p2, 7..).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;

assert_cache_files(
Expand Down Expand Up @@ -327,7 +325,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p3, "Hello, object3!").await.unwrap();

let _ = store.read(p3).await.unwrap();
let _ = store.range_read(p3, 0..5).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();

assert_cache_files(
&cache_store,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/manifest/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl ManifestObjectStore {
{
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(ListObjectsSnafu { path: &self.path })?;
streamer
Expand Down