diff --git a/Cargo.lock b/Cargo.lock index b303658d3c57f..4b17bed74eda2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5269,9 +5269,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.39.0" +version = "0.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad95e460e5976ab1b74f398ab856c59f8417b3dd32202329e3491dcbe3a6b84" +checksum = "ddba7299bab261d3ae2f37617fb7f45b19ed872752bb4e22cf93a69d979366c5" dependencies = [ "anyhow", "async-compat", @@ -5290,6 +5290,7 @@ dependencies = [ "parking_lot 0.12.1", "percent-encoding", "pin-project", + "prometheus", "quick-xml 0.29.0", "reqsign", "reqwest", @@ -5302,9 +5303,9 @@ dependencies = [ [[package]] name = "opendal" -version = "0.40.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddba7299bab261d3ae2f37617fb7f45b19ed872752bb4e22cf93a69d979366c5" +checksum = "e31b48f0af6de5b3b344c1acc1e06c4581dca3e13cd5ba05269927fc2abf953a" dependencies = [ "anyhow", "async-compat", @@ -5323,8 +5324,7 @@ dependencies = [ "parking_lot 0.12.1", "percent-encoding", "pin-project", - "prometheus", - "quick-xml 0.29.0", + "quick-xml 0.30.0", "reqsign", "reqwest", "serde", @@ -6564,6 +6564,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.33" @@ -7840,7 +7850,7 @@ dependencies = [ "itertools 0.11.0", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.39.0", + "opendal 0.41.0", "prometheus", "risingwave_common", "spin 0.9.8", diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index f117c272a9afc..2459bf83b5af6 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -24,7 +24,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } hyper = "0.14" hyper-tls = "0.5.0" itertools = "0.11" -opendal = "0.39" +opendal = "0.41" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } spin = "0.9" diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 96e58397dfa82..fe80794756246 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -242,12 +242,7 @@ impl ObjectStoreImpl { match self { ObjectStoreImpl::InMem(_) => true, ObjectStoreImpl::Opendal(store) => { - store - .inner - .op - .info() - .capability() - .write_without_content_length + store.inner.op.info().native_capability().write_can_multi } ObjectStoreImpl::S3(_) => true, } diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs index bb6ef8eee0446..b497f85c263a5 100644 --- a/src/object_store/src/object/opendal_engine/gcs.rs +++ b/src/object_store/src/object/opendal_engine/gcs.rs @@ -19,12 +19,6 @@ use opendal::Operator; use super::{EngineType, OpendalObjectStore}; use crate::object::ObjectResult; -/// The fixed number of bytes that is buffered before they are uploaded as a part, will be used in -/// streaing upload. -/// -/// Reference: -const GCS_PART_SIZE: usize = 16 * 1024 * 1024; - impl OpendalObjectStore { /// create opendal gcs engine. pub fn new_gcs_engine(bucket: String, root: String) -> ObjectResult { @@ -35,8 +29,6 @@ impl OpendalObjectStore { builder.root(&root); - builder.write_fixed_size(GCS_PART_SIZE); - // if credential env is set, use it. Otherwise, ADC will be used. let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); if let Ok(cred) = cred { diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index b829dbd544abf..ff682946b0651 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -12,15 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; -use std::task::{ready, Context, Poll}; - use bytes::Bytes; use fail::fail_point; -use futures::future::BoxFuture; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{stream, StreamExt}; use opendal::services::Memory; -use opendal::{Entry, Error, Lister, Metakey, Operator, Writer}; +use opendal::{Metakey, Operator, Writer}; use risingwave_common::range::RangeBoundsExt; use tokio::io::AsyncRead; @@ -84,7 +80,10 @@ impl ObjectStore for OpendalObjectStore { let data = if range.is_full() { self.op.read(path).await? } else { - self.op.range_read(path, range.map(|v| *v as u64)).await? + self.op + .read_with(path) + .range(range.map(|v| *v as u64)) + .await? }; if let Some(len) = range.len() && len != data.len() { @@ -112,7 +111,12 @@ impl ObjectStore for OpendalObjectStore { ObjectError::internal("opendal streaming read error") )); let reader = match start_pos { - Some(start_position) => self.op.range_reader(path, start_position as u64..).await?, + Some(start_position) => { + self.op + .reader_with(path) + .range(start_position as u64..) + .await? + } None => self.op.reader(path).await?, }; @@ -149,8 +153,36 @@ impl ObjectStore for OpendalObjectStore { } async fn list(&self, prefix: &str) -> ObjectResult { - let lister = self.op.scan(prefix).await?; - Ok(Box::pin(OpenDalObjectIter::new(lister, self.op.clone()))) + let object_lister = self + .op + .lister_with(prefix) + .delimiter("") + .metakey(Metakey::ContentLength | Metakey::ContentType) + .await?; + + let stream = stream::unfold(object_lister, |mut object_lister| async move { + match object_lister.next().await { + Some(Ok(object)) => { + let key = object.path().to_string(); + let om = object.metadata(); + let last_modified = match om.last_modified() { + Some(t) => t.timestamp() as f64, + None => 0_f64, + }; + let total_size = om.content_length() as usize; + let metadata = ObjectMetadata { + key, + last_modified, + total_size, + }; + Some((Ok(metadata), object_lister)) + } + Some(Err(err)) => Some((Err(err.into()), object_lister)), + None => None, + } + }); + + Ok(stream.boxed()) } fn store_media_type(&self) -> &'static str { @@ -172,12 +204,12 @@ pub struct OpenDalStreamingUploader { } impl OpenDalStreamingUploader { pub async fn new(op: Operator, path: String) -> ObjectResult { - let writer = op.writer(&path).await?; + let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?; Ok(Self { writer }) } } -const OPENDAL_BUFFER_SIZE: u64 = 8 * 1024 * 1024; +const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[async_trait::async_trait] impl StreamingUploader for OpenDalStreamingUploader { @@ -199,99 +231,7 @@ impl StreamingUploader for OpenDalStreamingUploader { } fn get_memory_usage(&self) -> u64 { - OPENDAL_BUFFER_SIZE - } -} - -struct OpenDalObjectIter { - lister: Option, - op: Option, - #[allow(clippy::type_complexity)] - next_future: Option>, Lister)>>, - #[allow(clippy::type_complexity)] - metadata_future: Option, Operator)>>, -} - -impl OpenDalObjectIter { - fn new(lister: Lister, op: Operator) -> Self { - Self { - lister: Some(lister), - op: Some(op), - next_future: None, - metadata_future: None, - } - } -} - -impl Stream for OpenDalObjectIter { - type Item = ObjectResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(metadata_future) = self.metadata_future.as_mut() { - let (result, op) = ready!(metadata_future.poll_unpin(cx)); - self.op = Some(op); - return match result { - Ok(m) => { - self.metadata_future = None; - Poll::Ready(Some(Ok(m))) - } - Err(e) => { - self.metadata_future = None; - Poll::Ready(Some(Err(e.into()))) - } - }; - } - if let Some(next_future) = self.next_future.as_mut() { - let (option, lister) = ready!(next_future.poll_unpin(cx)); - self.lister = Some(lister); - return match option { - None => { - self.next_future = None; - Poll::Ready(None) - } - Some(result) => { - self.next_future = None; - match result { - Ok(object) => { - let op = self.op.take().expect("op should not be None"); - let f = async move { - let key = object.path().to_string(); - // FIXME: How does opendal metadata cache work? - // Will below line result in one IO per object? - let om = match op - .metadata( - &object, - Metakey::LastModified | Metakey::ContentLength, - ) - .await - { - Ok(om) => om, - Err(e) => return (Err(e), op), - }; - let last_modified = match om.last_modified() { - Some(t) => t.timestamp() as f64, - None => 0_f64, - }; - let total_size = om.content_length() as usize; - let metadata = ObjectMetadata { - key, - last_modified, - total_size, - }; - (Ok(metadata), op) - }; - self.metadata_future = Some(Box::pin(f)); - self.poll_next(cx) - } - Err(e) => Poll::Ready(Some(Err(e.into()))), - } - } - }; - } - let mut lister = self.lister.take().expect("list should not be None"); - let f = async move { (lister.next().await, lister) }; - self.next_future = Some(Box::pin(f)); - self.poll_next(cx) + OPENDAL_BUFFER_SIZE as u64 } }