From 99c83e4ab17031062d50dfebc7665081698e85af Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 May 2024 12:32:56 +0800 Subject: [PATCH 1/4] refactor(object-store): refactor object store macro --- src/connector/src/sink/snowflake.rs | 6 +- src/object_store/src/object/mem.rs | 14 +- src/object_store/src/object/mod.rs | 237 +++++++++++++----- .../opendal_engine/opendal_object_store.rs | 15 +- src/object_store/src/object/s3.rs | 15 +- src/object_store/src/object/sim/mod.rs | 11 +- src/storage/src/hummock/sstable_store.rs | 2 +- 7 files changed, 205 insertions(+), 95 deletions(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 9e0c031c3ef7d..075f98427f36e 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_object_store::object::{ObjectStore, StreamingUploader}; +use risingwave_object_store::object::{ObjectStore, OpendalStreamingUploader, StreamingUploader}; use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; @@ -184,7 +184,7 @@ pub struct SnowflakeSinkWriter { /// note: the option here *implicitly* indicates whether we have at /// least call `streaming_upload` once during this epoch, /// which is mainly used to prevent uploading empty data. - streaming_uploader: Option<(Box, String)>, + streaming_uploader: Option<(OpendalStreamingUploader, String)>, } impl SnowflakeSinkWriter { @@ -242,7 +242,7 @@ impl SnowflakeSinkWriter { /// and `streaming_upload` being called the first time. /// i.e., lazily initialization of the internal `streaming_uploader`. /// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy. - async fn new_streaming_uploader(&mut self) -> Result<(Box, String)> { + async fn new_streaming_uploader(&mut self) -> Result<(OpendalStreamingUploader, String)> { let file_suffix = self.file_suffix(); let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); let uploader = self diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 3a1a7ed655e81..eff14c98abae5 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -28,8 +28,7 @@ use thiserror::Error; use tokio::sync::Mutex; use super::{ - BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, - ObjectStore, StreamingUploader, + ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, }; use crate::object::{ObjectDataStream, ObjectMetadataIter}; @@ -64,7 +63,6 @@ pub struct InMemStreamingUploader { objects: Arc>>, } -#[async_trait::async_trait] impl StreamingUploader for InMemStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { fail_point!("mem_write_bytes_err", |_| Err(ObjectError::internal( @@ -74,7 +72,7 @@ impl StreamingUploader for InMemStreamingUploader { Ok(()) } - async fn finish(self: Box) -> ObjectResult<()> { + async fn finish(self) -> ObjectResult<()> { fail_point!("mem_finish_streaming_upload_err", |_| Err( ObjectError::internal("mem finish streaming upload error") )); @@ -101,6 +99,8 @@ pub struct InMemObjectStore { #[async_trait::async_trait] impl ObjectStore for InMemObjectStore { + type StreamingUploader = InMemStreamingUploader; + fn get_object_prefix(&self, _obj_id: u64) -> String { String::default() } @@ -121,12 +121,12 @@ impl ObjectStore for InMemObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(Box::new(InMemStreamingUploader { + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(InMemStreamingUploader { path: path.to_string(), buf: BytesMut::new(), objects: self.objects.clone(), - })) + }) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 125ae9b9d33bb..c80ae5c35d958 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -47,9 +47,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use self::sim::SimObjectStore; pub type ObjectStoreRef = Arc; -pub type ObjectStreamingUploader = MonitoredStreamingUploader; - -type BoxedStreamingUploader = Box; +pub type ObjectStreamingUploader = impl StreamingUploader; pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fmt::Debug + 'static; @@ -76,11 +74,10 @@ pub struct ObjectMetadata { pub total_size: usize, } -#[async_trait::async_trait] pub trait StreamingUploader: Send { - async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()>; + fn write_bytes(&mut self, data: Bytes) -> impl Future> + Send + '_; - async fn finish(self: Box) -> ObjectResult<()>; + fn finish(self) -> impl Future> + Send; fn get_memory_usage(&self) -> u64; } @@ -88,13 +85,14 @@ pub trait StreamingUploader: Send { /// The implementation must be thread-safe. #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { + type StreamingUploader: StreamingUploader; /// Get the key prefix for object fn get_object_prefix(&self, obj_id: u64) -> String; /// Uploads the object to `ObjectStore`. async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>; - async fn streaming_upload(&self, path: &str) -> ObjectResult; + async fn streaming_upload(&self, path: &str) -> ObjectResult; /// If objects are PUT using a multipart upload, it's a good practice to GET them in the same /// part sizes (or at least aligned to part boundaries) for best performance. @@ -136,12 +134,149 @@ pub trait ObjectStore: Send + Sync { fn store_media_type(&self) -> &'static str; } -pub enum ObjectStoreImpl { - InMem(MonitoredObjectStore), - Opendal(MonitoredObjectStore), - S3(MonitoredObjectStore), - #[cfg(madsim)] - Sim(MonitoredObjectStore), +#[cfg(not(madsim))] +macro_rules! for_all_object_store { + ($macro:ident $($args:tt)*) => { + $macro! { + { + { InMem, InMemObjectStore }, + { Opendal, OpendalObjectStore }, + { S3, S3ObjectStore } + } + $($args)* + } + } +} + +#[cfg(madsim)] +macro_rules! for_all_object_store { + ($macro:ident $($args:tt)*) => { + $macro! { + { + { InMem, InMemObjectStore }, + { Opendal, OpendalObjectStore }, + { S3, S3ObjectStore }, + { Sim, SimObjectStore } + } + $($args)* + } + } +} + +macro_rules! define_object_store_impl { + () => { + for_all_object_store! { + define_object_store_impl + } + }; + ( + {$( + {$variant:ident, $type_name:ty} + ),*} + ) => { + pub enum ObjectStoreImpl { + $( + $variant(MonitoredObjectStore<$type_name>), + )* + } + }; +} + +define_object_store_impl!(); + +macro_rules! define_streaming_uploader_impl { + () => { + for_all_object_store! { + define_streaming_uploader_impl + } + }; + ( + {$( + {$variant:ident, $_type_name:ty} + ),*} + ) => { + enum StreamingUploaderImpl< + $($variant),* + > { + $( + $variant($variant), + )* + } + + impl< + $($variant: StreamingUploader),* + > StreamingUploader for StreamingUploaderImpl< + $($variant),* + > { + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { + match self { + $( + StreamingUploaderImpl::$variant(uploader) => { + uploader.write_bytes(data).await + }, + )* + } + } + + async fn finish(self) -> ObjectResult<()> { + match self { + $( + StreamingUploaderImpl::$variant(uploader) => { + uploader.finish().await + }, + )* + } + } + + fn get_memory_usage(&self) -> u64 { + match self { + $( + StreamingUploaderImpl::$variant(uploader) => { + uploader.get_memory_usage() + }, + )* + } + } + } + + impl ObjectStoreImpl { + async fn streaming_upload_inner(&self, path: &str) -> ObjectResult { + Ok(match self { + $( + ObjectStoreImpl::$variant(object_store) => StreamingUploaderImpl::$variant(object_store.streaming_upload(path).await?), + )* + }) + } + } + }; +} + +define_streaming_uploader_impl!(); + +macro_rules! dispatch_object_store { + ( + { + $( + {$variant:ident, $_type_name:ty} + ),* + }, + $object_store:expr, + $var_name:ident, + $func:expr + ) => { + match $object_store { + $( + ObjectStoreImpl::$variant($var_name) => { + $func + }, + )* + } + }; + ($object_store:expr, |$var_name:ident| $func:expr) => { + for_all_object_store! { + dispatch_object_store, $object_store, $var_name, $func + } + }; } macro_rules! dispatch_async { @@ -158,22 +293,9 @@ macro_rules! object_store_impl_method_body { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $path:expr $(, $args:expr)*) => { { let path = $path; - match $object_store { - ObjectStoreImpl::InMem(in_mem) => { - $dispatch_macro!(in_mem, $method_name, path $(, $args)*) - }, - ObjectStoreImpl::Opendal(opendal) => { - $dispatch_macro!(opendal, $method_name, path $(, $args)*) - }, - ObjectStoreImpl::S3(s3) => { - $dispatch_macro!(s3, $method_name, path $(, $args)*) - }, - #[cfg(madsim)] - ObjectStoreImpl::Sim(in_mem) => { - $dispatch_macro!(in_mem, $method_name, path $(, $args)*) - }, - } - + dispatch_object_store! {$object_store, |os| { + $dispatch_macro!(os, $method_name, path $(, $args)*) + }} } }; } @@ -187,21 +309,10 @@ macro_rules! object_store_impl_method_body_slice { ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { { let paths_rem = partition_object_store_paths($paths); - - match $object_store { - ObjectStoreImpl::InMem(in_mem) => { - $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) - }, - ObjectStoreImpl::Opendal(opendal) => { - $dispatch_macro!(opendal, $method_name, &paths_rem $(, $args)*) - }, - ObjectStoreImpl::S3(s3) => { - $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) - }, - #[cfg(madsim)] - ObjectStoreImpl::Sim(in_mem) => { - $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) - }, + dispatch_object_store! { + $object_store, |os| { + $dispatch_macro!(os, $method_name, &paths_rem $(, $args)*) + } } } }; @@ -212,8 +323,8 @@ impl ObjectStoreImpl { object_store_impl_method_body!(self, upload, dispatch_async, path, obj) } - pub async fn streaming_upload(&self, path: &str) -> ObjectResult { - object_store_impl_method_body!(self, streaming_upload, dispatch_async, path) + pub async fn streaming_upload(&self, path: &str) -> ObjectResult { + self.streaming_upload_inner(path).await } pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { @@ -253,16 +364,9 @@ impl ObjectStoreImpl { } pub fn get_object_prefix(&self, obj_id: u64) -> String { - // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore - // trait. Macro object_store_impl_method_body routes to local or remote only depending on - // the path - match self { - ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), - #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id), - } + dispatch_object_store!(self, |store| { + store.inner.get_object_prefix(obj_id) + }) } pub fn support_streaming_upload(&self) -> bool { @@ -303,18 +407,18 @@ fn try_update_failure_metric( /// - `streaming_upload_finish`: The time spent calling `finish`. /// - `failure_count`: `streaming_upload_start`, `streaming_upload_write_bytes`, /// `streaming_upload_finish` -pub struct MonitoredStreamingUploader { - inner: BoxedStreamingUploader, +pub struct MonitoredStreamingUploader { + inner: U, object_store_metrics: Arc, /// Length of data uploaded with this uploader. operation_size: usize, media_type: &'static str, } -impl MonitoredStreamingUploader { +impl MonitoredStreamingUploader { pub fn new( media_type: &'static str, - handle: BoxedStreamingUploader, + handle: U, object_store_metrics: Arc, ) -> Self { Self { @@ -326,8 +430,8 @@ impl MonitoredStreamingUploader { } } -impl MonitoredStreamingUploader { - pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { +impl StreamingUploader for MonitoredStreamingUploader { + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { let operation_type = OperationType::StreamingUpload; let operation_type_str = operation_type.as_str(); let data_len = data.len(); @@ -358,7 +462,7 @@ impl MonitoredStreamingUploader { res } - pub async fn finish(self) -> ObjectResult<()> { + async fn finish(self) -> ObjectResult<()> { let operation_type = OperationType::StreamingUploadFinish; let operation_type_str = operation_type.as_str(); let _timer = self @@ -381,7 +485,7 @@ impl MonitoredStreamingUploader { res } - pub fn get_memory_usage(&self) -> u64 { + fn get_memory_usage(&self) -> u64 { self.inner.get_memory_usage() } } @@ -544,7 +648,10 @@ impl MonitoredObjectStore { res } - pub async fn streaming_upload(&self, path: &str) -> ObjectResult { + pub async fn streaming_upload( + &self, + path: &str, + ) -> ObjectResult> { let operation_type = OperationType::StreamingUploadInit; let operation_type_str = operation_type.as_str(); let media_type = self.media_type(); diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index c073df7c1e102..6f3a2c881eb46 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -27,8 +27,8 @@ use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; use crate::object::{ - prefix, BoxedStreamingUploader, ObjectDataStream, ObjectError, ObjectMetadata, - ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, + prefix, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds, + ObjectResult, ObjectStore, StreamingUploader, }; /// Opendal object storage. @@ -70,6 +70,8 @@ impl OpendalObjectStore { #[async_trait::async_trait] impl ObjectStore for OpendalObjectStore { + type StreamingUploader = OpendalStreamingUploader; + fn get_object_prefix(&self, obj_id: u64) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), @@ -94,11 +96,11 @@ impl ObjectStore for OpendalObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(Box::new( + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok( OpendalStreamingUploader::new(self.op.clone(), path.to_string(), self.config.clone()) .await?, - )) + ) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { @@ -280,7 +282,6 @@ impl OpendalStreamingUploader { const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024; -#[async_trait::async_trait] impl StreamingUploader for OpendalStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { self.writer.write(data).await?; @@ -288,7 +289,7 @@ impl StreamingUploader for OpendalStreamingUploader { Ok(()) } - async fn finish(mut self: Box) -> ObjectResult<()> { + async fn finish(mut self) -> ObjectResult<()> { match self.writer.close().await { Ok(_) => (), Err(err) => { diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index d7c151e867465..9cc559f22d043 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -59,8 +59,8 @@ use tokio::task::JoinHandle; use super::object_metrics::ObjectStoreMetrics; use super::{ - prefix, retry_request, BoxedStreamingUploader, Bytes, ObjectError, ObjectErrorInner, - ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, + prefix, retry_request, Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds, + ObjectResult, ObjectStore, StreamingUploader, }; use crate::object::{ try_update_failure_metric, ObjectDataStream, ObjectMetadataIter, OperationType, @@ -301,7 +301,6 @@ impl S3StreamingUploader { } } -#[async_trait::async_trait] impl StreamingUploader for S3StreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal( @@ -321,7 +320,7 @@ impl StreamingUploader for S3StreamingUploader { /// If the multipart upload has not been initiated, we can use `PutObject` instead to save the /// `CreateMultipartUpload` and `CompleteMultipartUpload` requests. Otherwise flush the /// remaining data of the buffer to S3 as a new part. - async fn finish(mut self: Box) -> ObjectResult<()> { + async fn finish(mut self) -> ObjectResult<()> { fail_point!("s3_finish_streaming_upload_err", |_| Err( ObjectError::internal("s3 finish streaming upload error") )); @@ -381,6 +380,8 @@ pub struct S3ObjectStore { #[async_trait::async_trait] impl ObjectStore for S3ObjectStore { + type StreamingUploader = S3StreamingUploader; + fn get_object_prefix(&self, obj_id: u64) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. prefix::s3::get_object_prefix(obj_id) @@ -407,18 +408,18 @@ impl ObjectStore for S3ObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { + async fn streaming_upload(&self, path: &str) -> ObjectResult { fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal( "s3 streaming upload error" ))); - Ok(Box::new(S3StreamingUploader::new( + Ok(S3StreamingUploader::new( self.client.clone(), self.bucket.clone(), self.part_size, path.to_string(), self.metrics.clone(), self.config.clone(), - ))) + )) } /// Amazon S3 doesn't support retrieving multiple ranges of data per GET request. diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 31f34cbcd3267..494ef39173b98 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -53,14 +53,13 @@ impl SimStreamingUploader { } } -#[async_trait::async_trait] impl StreamingUploader for SimStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { self.buf.put(data); Ok(()) } - async fn finish(mut self: Box) -> ObjectResult<()> { + async fn finish(mut self) -> ObjectResult<()> { if self.buf.is_empty() { Err(ObjectError::internal("upload empty object")) } else { @@ -115,6 +114,8 @@ pub struct SimObjectStore { #[async_trait::async_trait] impl ObjectStore for SimObjectStore { + type StreamingUploader = SimStreamingUploader; + fn get_object_prefix(&self, _obj_id: u64) -> String { String::default() } @@ -136,11 +137,11 @@ impl ObjectStore for SimObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(Box::new(SimStreamingUploader::new( + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(SimStreamingUploader::new( self.client.clone(), path.to_string(), - ))) + )) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 6dfbe5468426a..cabe66ddff972 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -27,7 +27,7 @@ use risingwave_common::config::StorageMemoryConfig; use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX}; use risingwave_hummock_trace::TracedCachePolicy; use risingwave_object_store::object::{ - ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader, + ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader, StreamingUploader, }; use risingwave_pb::hummock::SstableInfo; use serde::{Deserialize, Serialize}; From 7155dae0cdf6564078e4c92c32fa6117d595e6ad Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 May 2024 16:32:22 +0800 Subject: [PATCH 2/4] refine macro --- src/object_store/src/object/mod.rs | 242 +++++++----------- .../opendal_engine/opendal_object_store.rs | 4 + src/object_store/src/object/sim/mod.rs | 2 + src/storage/src/hummock/sstable_store.rs | 2 +- 4 files changed, 105 insertions(+), 145 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index c80ae5c35d958..6ff682ebd04a7 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(madsim)] pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; @@ -47,24 +46,10 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use self::sim::SimObjectStore; pub type ObjectStoreRef = Arc; -pub type ObjectStreamingUploader = impl StreamingUploader; +pub type ObjectStreamingUploader = StreamingUploaderImpl; pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fmt::Debug + 'static; -/// Partitions a set of given paths into two vectors. The first vector contains all local paths, and -/// the second contains all remote paths. -fn partition_object_store_paths(paths: &[String]) -> Vec { - // ToDo: Currently the result is a copy of the input. Would it be worth it to use an in-place - // partition instead? - let mut vec_rem = vec![]; - - for path in paths { - vec_rem.push(path.to_string()); - } - - vec_rem -} - #[derive(Debug, Clone, PartialEq)] pub struct ObjectMetadata { // Full path @@ -75,9 +60,11 @@ pub struct ObjectMetadata { } pub trait StreamingUploader: Send { - fn write_bytes(&mut self, data: Bytes) -> impl Future> + Send + '_; + #[expect(async_fn_in_trait)] + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()>; - fn finish(self) -> impl Future> + Send; + #[expect(async_fn_in_trait)] + async fn finish(self) -> ObjectResult<()>; fn get_memory_usage(&self) -> u64; } @@ -132,6 +119,10 @@ pub trait ObjectStore: Send + Sync { async fn list(&self, prefix: &str) -> ObjectResult; fn store_media_type(&self) -> &'static str; + + fn support_streaming_upload(&self) -> bool { + true + } } #[cfg(not(madsim))] @@ -163,97 +154,35 @@ macro_rules! for_all_object_store { } } -macro_rules! define_object_store_impl { - () => { - for_all_object_store! { - define_object_store_impl - } - }; +macro_rules! enum_map { ( - {$( - {$variant:ident, $type_name:ty} - ),*} + { + $( + {$variant:ident, $_type_name:ty} + ),* + }, + $source_enum_type:ident, + $target_enum_type:ident, + $object_store:expr, + $var_name:ident, + $func:expr ) => { - pub enum ObjectStoreImpl { + match $object_store { $( - $variant(MonitoredObjectStore<$type_name>), + $source_enum_type::$variant($var_name) => $target_enum_type::$variant({ + $func + }), )* } }; -} - -define_object_store_impl!(); - -macro_rules! define_streaming_uploader_impl { - () => { + ($source_enum_type:ident, $target_enum_type:ident, $object_store:expr, |$var_name:ident| $func:expr) => { for_all_object_store! { - define_streaming_uploader_impl - } - }; - ( - {$( - {$variant:ident, $_type_name:ty} - ),*} - ) => { - enum StreamingUploaderImpl< - $($variant),* - > { - $( - $variant($variant), - )* - } - - impl< - $($variant: StreamingUploader),* - > StreamingUploader for StreamingUploaderImpl< - $($variant),* - > { - async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { - match self { - $( - StreamingUploaderImpl::$variant(uploader) => { - uploader.write_bytes(data).await - }, - )* - } - } - - async fn finish(self) -> ObjectResult<()> { - match self { - $( - StreamingUploaderImpl::$variant(uploader) => { - uploader.finish().await - }, - )* - } - } - - fn get_memory_usage(&self) -> u64 { - match self { - $( - StreamingUploaderImpl::$variant(uploader) => { - uploader.get_memory_usage() - }, - )* - } - } - } - - impl ObjectStoreImpl { - async fn streaming_upload_inner(&self, path: &str) -> ObjectResult { - Ok(match self { - $( - ObjectStoreImpl::$variant(object_store) => StreamingUploaderImpl::$variant(object_store.streaming_upload(path).await?), - )* - }) - } + enum_map, $source_enum_type, $target_enum_type, $object_store, $var_name, $func } }; } -define_streaming_uploader_impl!(); - -macro_rules! dispatch_object_store { +macro_rules! dispatch_object_store_enum { ( { $( @@ -266,7 +195,7 @@ macro_rules! dispatch_object_store { ) => { match $object_store { $( - ObjectStoreImpl::$variant($var_name) => { + ObjectStoreEnum::$variant($var_name) => { $func }, )* @@ -274,65 +203,100 @@ macro_rules! dispatch_object_store { }; ($object_store:expr, |$var_name:ident| $func:expr) => { for_all_object_store! { - dispatch_object_store, $object_store, $var_name, $func + dispatch_object_store_enum, $object_store, $var_name, $func } }; } -macro_rules! dispatch_async { - ($object_store:expr, $method_name:ident $(, $args:expr)*) => { - $object_store.$method_name($($args, )*).await - } +macro_rules! define_object_store_impl { + () => { + for_all_object_store! { + define_object_store_impl + } + }; + ( + {$( + {$variant:ident, $type_name:ty} + ),*} + ) => { + pub enum ObjectStoreEnum< + $($variant),* + > { + $( + $variant($variant), + )* + } + + pub type ObjectStoreImpl = ObjectStoreEnum< + $( + MonitoredObjectStore<$type_name>, + )* + >; + + pub type StreamingUploaderImpl = ObjectStoreEnum< + $( + MonitoredStreamingUploader<<$type_name as ObjectStore>::StreamingUploader> + ),* + >; + }; } +define_object_store_impl!(); + /// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl` /// enum type and the `path`. /// /// Except for `InMem`,the operation should be performed on remote object store. macro_rules! object_store_impl_method_body { - ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $path:expr $(, $args:expr)*) => { + // with await + ($object_store:expr, $method_name:ident ($($args:expr),*).await) => { { - let path = $path; - dispatch_object_store! {$object_store, |os| { - $dispatch_macro!(os, $method_name, path $(, $args)*) + dispatch_object_store_enum! {$object_store, |os| { + os.$method_name($($args),*).await }} } }; -} - -/// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl` -/// enum type and the `paths`. It is a modification of the macro above to work with a slice of -/// strings instead of just a single one. -/// -/// Except for `InMem`, the operation should be performed on remote object store. -macro_rules! object_store_impl_method_body_slice { - ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { + // no await + ($object_store:expr, $method_name:ident ($(, $args:expr)*)) => { { - let paths_rem = partition_object_store_paths($paths); - dispatch_object_store! { - $object_store, |os| { - $dispatch_macro!(os, $method_name, &paths_rem $(, $args)*) - } - } + dispatch_object_store_enum! {$object_store, |os| { + os.$method_name($($args),*) + }} } }; } +impl StreamingUploaderImpl { + pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { + object_store_impl_method_body!(self, write_bytes(data).await) + } + + pub async fn finish(self) -> ObjectResult<()> { + object_store_impl_method_body!(self, finish().await) + } + + pub fn get_memory_usage(&self) -> u64 { + object_store_impl_method_body!(self, get_memory_usage()) + } +} + impl ObjectStoreImpl { pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { - object_store_impl_method_body!(self, upload, dispatch_async, path, obj) + object_store_impl_method_body!(self, upload(path, obj).await) } pub async fn streaming_upload(&self, path: &str) -> ObjectResult { - self.streaming_upload_inner(path).await + Ok(enum_map!(Self, StreamingUploaderImpl, self, |store| { + store.streaming_upload(path).await? + })) } pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { - object_store_impl_method_body!(self, read, dispatch_async, path, range) + object_store_impl_method_body!(self, read(path, range).await) } pub async fn metadata(&self, path: &str) -> ObjectResult { - object_store_impl_method_body!(self, metadata, dispatch_async, path) + object_store_impl_method_body!(self, metadata(path).await) } /// Returns a stream reading the object specified in `path`. If given, the stream starts at the @@ -343,11 +307,11 @@ impl ObjectStoreImpl { path: &str, start_loc: Range, ) -> ObjectResult { - object_store_impl_method_body!(self, streaming_read, dispatch_async, path, start_loc) + object_store_impl_method_body!(self, streaming_read(path, start_loc).await) } pub async fn delete(&self, path: &str) -> ObjectResult<()> { - object_store_impl_method_body!(self, delete, dispatch_async, path) + object_store_impl_method_body!(self, delete(path).await) } /// Deletes the objects with the given paths permanently from the storage. If an object @@ -356,29 +320,19 @@ impl ObjectStoreImpl { /// If a hybrid storage is used, the method will first attempt to delete objects in local /// storage. Only if that is successful, it will remove objects from remote storage. pub async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { - object_store_impl_method_body_slice!(self, delete_objects, dispatch_async, paths) + object_store_impl_method_body!(self, delete_objects(paths).await) } pub async fn list(&self, prefix: &str) -> ObjectResult { - object_store_impl_method_body!(self, list, dispatch_async, prefix) + object_store_impl_method_body!(self, list(prefix).await) } pub fn get_object_prefix(&self, obj_id: u64) -> String { - dispatch_object_store!(self, |store| { - store.inner.get_object_prefix(obj_id) - }) + dispatch_object_store_enum!(self, |store| store.inner.get_object_prefix(obj_id)) } pub fn support_streaming_upload(&self) -> bool { - match self { - ObjectStoreImpl::InMem(_) => true, - ObjectStoreImpl::Opendal(store) => { - store.inner.op.info().native_capability().write_can_multi - } - ObjectStoreImpl::S3(_) => true, - #[cfg(madsim)] - ObjectStoreImpl::Sim(_) => true, - } + dispatch_object_store_enum!(self, |store| store.inner.support_streaming_upload()) } } @@ -430,7 +384,7 @@ impl MonitoredStreamingUploader { } } -impl StreamingUploader for MonitoredStreamingUploader { +impl MonitoredStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { let operation_type = OperationType::StreamingUpload; let operation_type_str = operation_type.as_str(); diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 6f3a2c881eb46..30c63b15ec5e5 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -246,6 +246,10 @@ impl ObjectStore for OpendalObjectStore { EngineType::Fs => "Fs", } } + + fn support_streaming_upload(&self) -> bool { + self.op.info().native_capability().write_can_multi + } } /// Store multiple parts in a map, and concatenate them on finish. diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 494ef39173b98..d65aae04c016c 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg(madsim)] + mod client; mod error; use bytes::{BufMut, BytesMut}; diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index cabe66ddff972..6dfbe5468426a 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -27,7 +27,7 @@ use risingwave_common::config::StorageMemoryConfig; use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX}; use risingwave_hummock_trace::TracedCachePolicy; use risingwave_object_store::object::{ - ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader, StreamingUploader, + ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader, }; use risingwave_pb::hummock::SstableInfo; use serde::{Deserialize, Serialize}; From 39b2a96d4169a9a0c16a6384abd60f506b0fa3f6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 May 2024 17:26:14 +0800 Subject: [PATCH 3/4] further refine --- src/object_store/src/object/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 6ff682ebd04a7..a8d1f50365958 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -161,23 +161,21 @@ macro_rules! enum_map { {$variant:ident, $_type_name:ty} ),* }, - $source_enum_type:ident, - $target_enum_type:ident, $object_store:expr, $var_name:ident, $func:expr ) => { match $object_store { $( - $source_enum_type::$variant($var_name) => $target_enum_type::$variant({ + ObjectStoreEnum::$variant($var_name) => ObjectStoreEnum::$variant({ $func }), )* } }; - ($source_enum_type:ident, $target_enum_type:ident, $object_store:expr, |$var_name:ident| $func:expr) => { + ($object_store:expr, |$var_name:ident| $func:expr) => { for_all_object_store! { - enum_map, $source_enum_type, $target_enum_type, $object_store, $var_name, $func + enum_map, $object_store, $var_name, $func } }; } @@ -286,7 +284,7 @@ impl ObjectStoreImpl { } pub async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(enum_map!(Self, StreamingUploaderImpl, self, |store| { + Ok(enum_map!(self, |store| { store.streaming_upload(path).await? })) } From f2bac50d22cc588da15dd34c2ab486a52fd391d6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 17 May 2024 14:02:03 +0800 Subject: [PATCH 4/4] fix madsim compile --- src/object_store/src/object/sim/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index d65aae04c016c..4c91531020ea3 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -35,8 +35,8 @@ use risingwave_common::range::RangeBoundsExt; use self::client::Client; use self::service::Response; use super::{ - BoxedStreamingUploader, Bytes, ObjectDataStream, ObjectError, ObjectMetadata, - ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, + Bytes, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds, + ObjectResult, ObjectStore, StreamingUploader, }; pub struct SimStreamingUploader {