diff --git a/src/common/datasource/src/lister.rs b/src/common/datasource/src/lister.rs index 80b251fd85eb..06d2161c54b0 100644 --- a/src/common/datasource/src/lister.rs +++ b/src/common/datasource/src/lister.rs @@ -51,7 +51,7 @@ impl Lister { Source::Dir => { let streamer = self .object_store - .list(&self.path) + .lister_with(&self.path) .await .context(error::ListObjectsSnafu { path: &self.path })?; @@ -76,7 +76,16 @@ impl Lister { path: &file_full_path, }, )?; - Ok(vec![Entry::new(&file_full_path)]) + // Safety: file must exists + let file = self + .object_store + .list_with(&self.path) + .await + .context(error::ListObjectsSnafu { path: &self.path })? + .into_iter() + .find(|f| f.name() == filename) + .unwrap(); + Ok(vec![file]) } } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 884e6288fbae..8dc0683fd2f4 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -458,7 +458,7 @@ mod tests { use common_error::status_code::StatusCode; use common_test_util::temp_dir::create_temp_dir; use futures_util::future::BoxFuture; - use futures_util::{FutureExt, TryStreamExt}; + use futures_util::FutureExt; use object_store::ObjectStore; use super::*; @@ -492,11 +492,7 @@ mod tests { ) { let dir = proc_path!(procedure_store, "{procedure_id}/"); let lister = object_store.list(&dir).await.unwrap(); - let mut files_in_dir: Vec<_> = lister - .map_ok(|de| de.name().to_string()) - .try_collect() - .await - .unwrap(); + let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect(); files_in_dir.sort_unstable(); assert_eq!(files, files_in_dir); } diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 35062057df15..46f566098d60 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use futures::{Stream, StreamExt}; -use object_store::{EntryMode, Metakey, ObjectStore}; +use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result}; @@ -86,7 +86,8 @@ impl StateStore for ObjectStateStore { async fn walk_top_down(&self, path: &str) -> Result { let mut lister = self .store - .scan(path) + .lister_with(path) + .delimiter("") .await .map_err(|e| { BoxedError::new(PlainError::new( @@ -110,17 +111,8 @@ impl StateStore for ObjectStateStore { }) .context(ListStateSnafu { path: &path_string })?; let key = entry.path(); - let metadata = store - .metadata(&entry, Metakey::Mode) - .await - .map_err(|e| { - BoxedError::new(PlainError::new( - e.to_string(), - StatusCode::StorageUnavailable, - )) - }) - .context(ListStateSnafu { path: key })?; - if let EntryMode::FILE = metadata.mode() { + + if let EntryMode::FILE = entry.metadata().mode() { let value = store .read(key) .await diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index cb36d41823d3..8dd67a7daa5d 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -38,7 +38,7 @@ use datatypes::arrow::compute::can_cast_types; use datatypes::arrow::datatypes::{Schema, SchemaRef}; use datatypes::vectors::Helper; use futures_util::StreamExt; -use object_store::{Entry, EntryMode, Metakey, ObjectStore}; +use object_store::{Entry, EntryMode, ObjectStore}; use regex::Regex; use session::context::QueryContextRef; use snafu::ResultExt; @@ -256,11 +256,7 @@ impl StatementExecutor { let table_schema = table.schema().arrow_schema().clone(); for entry in entries.iter() { - let metadata = object_store - .metadata(entry, Metakey::Mode) - .await - .context(error::ReadObjectSnafu { path: entry.path() })?; - if metadata.mode() != EntryMode::FILE { + if entry.metadata().mode() != EntryMode::FILE { continue; } let path = entry.path(); diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 63da2a693bfd..a0f7dbf9714e 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -168,7 +168,7 @@ impl ManifestObjectStore { where F: Fn(Entry) -> Option, { - let streamer = match self.object_store.list(&self.path).await { + let streamer = match self.object_store.lister_with(&self.path).await { Ok(streamer) => streamer, Err(e) if e.kind() == ErrorKind::NotFound => { debug!("Manifest directory does not exists: {}", self.path); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index f1f30760458f..50f05bfd7216 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -19,14 +19,14 @@ use std::time::Duration; use common_query::Output; use common_telemetry::info; use common_telemetry::tracing::warn; -use futures::StreamExt; +use futures::TryStreamExt; use object_store::util::join_path; -use object_store::{EntryMode, Metakey, ObjectStore}; +use object_store::{EntryMode, ObjectStore}; use snafu::ResultExt; use store_api::storage::RegionId; use tokio::time::sleep; -use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{self, OpenDalSnafu, RegionNotFoundSnafu, Result}; use crate::region::RegionMapRef; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -115,17 +115,16 @@ pub(crate) async fn remove_region_dir_once( let mut has_parquet_file = false; // record all paths that neither ends with .parquet nor the marker file let mut files_to_remove_first = vec![]; - let mut files = object_store.scan(region_path).await.context(OpenDalSnafu)?; - while let Some(file) = files.next().await { - let file = file.context(OpenDalSnafu)?; + let mut files = object_store + .lister_with(region_path) + .await + .context(OpenDalSnafu)?; + while let Some(file) = files.try_next().await.context(error::OpenDalSnafu)? { if file.path().ends_with(".parquet") { has_parquet_file = true; break; } else if !file.path().ends_with(DROPPING_MARKER_FILE) { - let meta = object_store - .metadata(&file, Metakey::Mode) - .await - .context(OpenDalSnafu)?; + let meta = file.metadata(); if meta.mode() == EntryMode::FILE { files_to_remove_first.push(file.path().to_string()); } diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index b22805890655..8987a210aa0a 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use async_trait::async_trait; use lru::LruCache; use metrics::increment_counter; -use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write}; +use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt}; use opendal::raw::{ - Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend, - RpDelete, RpList, RpRead, RpWrite, + Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, + RpWrite, }; use opendal::{ErrorKind, Result}; use tokio::sync::Mutex; @@ -114,7 +114,6 @@ impl LayeredAccessor for LruCacheAccessor { type BlockingWriter = I::BlockingWriter; type Pager = I::Pager; type BlockingPager = I::BlockingPager; - type Appender = I::Appender; fn inner(&self) -> &Self::Inner { &self.inner @@ -146,7 +145,7 @@ impl LayeredAccessor for LruCacheAccessor { let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?; while let Some(bytes) = reader.next().await { - writer.write(bytes?).await?; + writer.write(&bytes?).await?; } writer.close().await?; @@ -178,10 +177,6 @@ impl LayeredAccessor for LruCacheAccessor { self.inner.write(path, args).await } - async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await - } - async fn delete(&self, path: &str, args: OpDelete) -> Result { let cache_path = md5::compute(path); let lru_cache = &self.lru_cache; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index fd3c18b18592..38cb2b775dd7 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -21,7 +21,7 @@ use common_test_util::temp_dir::create_temp_dir; use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; -use object_store::{util, ObjectStore, ObjectStoreBuilder}; +use object_store::{ObjectStore, ObjectStoreBuilder}; use opendal::raw::Accessor; use opendal::services::{Azblob, Gcs, Oss}; use opendal::{EntryMode, Operator, OperatorBuilder}; @@ -37,7 +37,7 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> { assert_eq!("Hello, World!", String::from_utf8(bs)?); // Read range from object; - let bs = store.range_read(file_name, 1..=11).await?; + let bs = store.read_with(file_name).range(1..=11).await?; assert_eq!("ello, World", String::from_utf8(bs)?); // Get object's Metadata @@ -62,8 +62,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { store.write(p3, "Hello, object3!").await?; // List objects - let lister = store.list("/").await?; - let entries = util::collect(lister).await?; + let entries = store.list("/").await?; assert_eq!(3, entries.len()); store.delete(p1).await?; @@ -71,7 +70,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { // List objects again // Only o2 is exists - let entries = util::collect(store.list("/").await?).await?; + let entries = store.list("/").await?; assert_eq!(1, entries.len()); assert_eq!(p2, entries.get(0).unwrap().path()); @@ -79,7 +78,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { assert_eq!("Hello, object2!", String::from_utf8(content)?); store.delete(p2).await?; - let entries = util::collect(store.list("/").await?).await?; + let entries = store.list("/").await?; assert!(entries.is_empty()); Ok(()) } @@ -225,8 +224,7 @@ async fn assert_cache_files( file_names: &[&str], file_contents: &[&str], ) -> Result<()> { - let obs = store.list("/").await?; - let objects = util::collect(obs).await?; + let objects = store.list("/").await?; // compare the cache file with the expected cache file; ignore orders for o in objects { @@ -284,10 +282,10 @@ async fn test_object_store_cache_policy() -> Result<()> { store.write(p2, "Hello, object2!").await.unwrap(); // create cache by read object - let _ = store.range_read(p1, 0..).await?; + let _ = store.read_with(p1).range(0..).await?; let _ = store.read(p1).await?; - let _ = store.range_read(p2, 0..).await?; - let _ = store.range_read(p2, 7..).await?; + let _ = store.read_with(p2).range(0..).await?; + let _ = store.read_with(p2).range(7..).await?; let _ = store.read(p2).await?; assert_cache_files( @@ -327,7 +325,7 @@ async fn test_object_store_cache_policy() -> Result<()> { store.write(p3, "Hello, object3!").await.unwrap(); let _ = store.read(p3).await.unwrap(); - let _ = store.range_read(p3, 0..5).await.unwrap(); + let _ = store.read_with(p3).range(0..5).await.unwrap(); assert_cache_files( &cache_store, diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index baf5a9efb8e9..806c00bceea8 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -177,7 +177,7 @@ impl ManifestObjectStore { { let streamer = self .object_store - .list(&self.path) + .lister_with(&self.path) .await .context(ListObjectsSnafu { path: &self.path })?; streamer