diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index 9e0c031c3ef7d..075f98427f36e 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_object_store::object::{ObjectStore, StreamingUploader}; +use risingwave_object_store::object::{ObjectStore, OpendalStreamingUploader, StreamingUploader}; use serde::Deserialize; use serde_json::Value; use serde_with::serde_as; @@ -184,7 +184,7 @@ pub struct SnowflakeSinkWriter { /// note: the option here *implicitly* indicates whether we have at /// least call `streaming_upload` once during this epoch, /// which is mainly used to prevent uploading empty data. - streaming_uploader: Option<(Box, String)>, + streaming_uploader: Option<(OpendalStreamingUploader, String)>, } impl SnowflakeSinkWriter { @@ -242,7 +242,7 @@ impl SnowflakeSinkWriter { /// and `streaming_upload` being called the first time. /// i.e., lazily initialization of the internal `streaming_uploader`. /// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy. - async fn new_streaming_uploader(&mut self) -> Result<(Box, String)> { + async fn new_streaming_uploader(&mut self) -> Result<(OpendalStreamingUploader, String)> { let file_suffix = self.file_suffix(); let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix); let uploader = self diff --git a/src/object_store/src/object/mem.rs b/src/object_store/src/object/mem.rs index 3a1a7ed655e81..eff14c98abae5 100644 --- a/src/object_store/src/object/mem.rs +++ b/src/object_store/src/object/mem.rs @@ -28,8 +28,7 @@ use thiserror::Error; use tokio::sync::Mutex; use super::{ - BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, - ObjectStore, StreamingUploader, + ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, }; use crate::object::{ObjectDataStream, ObjectMetadataIter}; @@ -64,7 +63,6 @@ pub struct InMemStreamingUploader { objects: Arc>>, } -#[async_trait::async_trait] impl StreamingUploader for InMemStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { fail_point!("mem_write_bytes_err", |_| Err(ObjectError::internal( @@ -74,7 +72,7 @@ impl StreamingUploader for InMemStreamingUploader { Ok(()) } - async fn finish(self: Box) -> ObjectResult<()> { + async fn finish(self) -> ObjectResult<()> { fail_point!("mem_finish_streaming_upload_err", |_| Err( ObjectError::internal("mem finish streaming upload error") )); @@ -101,6 +99,8 @@ pub struct InMemObjectStore { #[async_trait::async_trait] impl ObjectStore for InMemObjectStore { + type StreamingUploader = InMemStreamingUploader; + fn get_object_prefix(&self, _obj_id: u64) -> String { String::default() } @@ -121,12 +121,12 @@ impl ObjectStore for InMemObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(Box::new(InMemStreamingUploader { + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(InMemStreamingUploader { path: path.to_string(), buf: BytesMut::new(), objects: self.objects.clone(), - })) + }) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 125ae9b9d33bb..a8d1f50365958 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(madsim)] pub mod sim; use std::ops::{Range, RangeBounds}; use std::sync::Arc; @@ -47,26 +46,10 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use self::sim::SimObjectStore; pub type ObjectStoreRef = Arc; -pub type ObjectStreamingUploader = MonitoredStreamingUploader; - -type BoxedStreamingUploader = Box; +pub type ObjectStreamingUploader = StreamingUploaderImpl; pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fmt::Debug + 'static; -/// Partitions a set of given paths into two vectors. The first vector contains all local paths, and -/// the second contains all remote paths. -fn partition_object_store_paths(paths: &[String]) -> Vec { - // ToDo: Currently the result is a copy of the input. Would it be worth it to use an in-place - // partition instead? - let mut vec_rem = vec![]; - - for path in paths { - vec_rem.push(path.to_string()); - } - - vec_rem -} - #[derive(Debug, Clone, PartialEq)] pub struct ObjectMetadata { // Full path @@ -76,11 +59,12 @@ pub struct ObjectMetadata { pub total_size: usize, } -#[async_trait::async_trait] pub trait StreamingUploader: Send { + #[expect(async_fn_in_trait)] async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()>; - async fn finish(self: Box) -> ObjectResult<()>; + #[expect(async_fn_in_trait)] + async fn finish(self) -> ObjectResult<()>; fn get_memory_usage(&self) -> u64; } @@ -88,13 +72,14 @@ pub trait StreamingUploader: Send { /// The implementation must be thread-safe. #[async_trait::async_trait] pub trait ObjectStore: Send + Sync { + type StreamingUploader: StreamingUploader; /// Get the key prefix for object fn get_object_prefix(&self, obj_id: u64) -> String; /// Uploads the object to `ObjectStore`. async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()>; - async fn streaming_upload(&self, path: &str) -> ObjectResult; + async fn streaming_upload(&self, path: &str) -> ObjectResult; /// If objects are PUT using a multipart upload, it's a good practice to GET them in the same /// part sizes (or at least aligned to part boundaries) for best performance. @@ -134,94 +119,182 @@ pub trait ObjectStore: Send + Sync { async fn list(&self, prefix: &str) -> ObjectResult; fn store_media_type(&self) -> &'static str; + + fn support_streaming_upload(&self) -> bool { + true + } } -pub enum ObjectStoreImpl { - InMem(MonitoredObjectStore), - Opendal(MonitoredObjectStore), - S3(MonitoredObjectStore), - #[cfg(madsim)] - Sim(MonitoredObjectStore), +#[cfg(not(madsim))] +macro_rules! for_all_object_store { + ($macro:ident $($args:tt)*) => { + $macro! { + { + { InMem, InMemObjectStore }, + { Opendal, OpendalObjectStore }, + { S3, S3ObjectStore } + } + $($args)* + } + } } -macro_rules! dispatch_async { - ($object_store:expr, $method_name:ident $(, $args:expr)*) => { - $object_store.$method_name($($args, )*).await +#[cfg(madsim)] +macro_rules! for_all_object_store { + ($macro:ident $($args:tt)*) => { + $macro! { + { + { InMem, InMemObjectStore }, + { Opendal, OpendalObjectStore }, + { S3, S3ObjectStore }, + { Sim, SimObjectStore } + } + $($args)* + } } } -/// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl` -/// enum type and the `path`. -/// -/// Except for `InMem`,the operation should be performed on remote object store. -macro_rules! object_store_impl_method_body { - ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $path:expr $(, $args:expr)*) => { +macro_rules! enum_map { + ( { - let path = $path; - match $object_store { - ObjectStoreImpl::InMem(in_mem) => { - $dispatch_macro!(in_mem, $method_name, path $(, $args)*) - }, - ObjectStoreImpl::Opendal(opendal) => { - $dispatch_macro!(opendal, $method_name, path $(, $args)*) - }, - ObjectStoreImpl::S3(s3) => { - $dispatch_macro!(s3, $method_name, path $(, $args)*) - }, - #[cfg(madsim)] - ObjectStoreImpl::Sim(in_mem) => { - $dispatch_macro!(in_mem, $method_name, path $(, $args)*) + $( + {$variant:ident, $_type_name:ty} + ),* + }, + $object_store:expr, + $var_name:ident, + $func:expr + ) => { + match $object_store { + $( + ObjectStoreEnum::$variant($var_name) => ObjectStoreEnum::$variant({ + $func + }), + )* + } + }; + ($object_store:expr, |$var_name:ident| $func:expr) => { + for_all_object_store! { + enum_map, $object_store, $var_name, $func + } + }; +} + +macro_rules! dispatch_object_store_enum { + ( + { + $( + {$variant:ident, $_type_name:ty} + ),* + }, + $object_store:expr, + $var_name:ident, + $func:expr + ) => { + match $object_store { + $( + ObjectStoreEnum::$variant($var_name) => { + $func }, - } + )* + } + }; + ($object_store:expr, |$var_name:ident| $func:expr) => { + for_all_object_store! { + dispatch_object_store_enum, $object_store, $var_name, $func + } + }; +} +macro_rules! define_object_store_impl { + () => { + for_all_object_store! { + define_object_store_impl } }; + ( + {$( + {$variant:ident, $type_name:ty} + ),*} + ) => { + pub enum ObjectStoreEnum< + $($variant),* + > { + $( + $variant($variant), + )* + } + + pub type ObjectStoreImpl = ObjectStoreEnum< + $( + MonitoredObjectStore<$type_name>, + )* + >; + + pub type StreamingUploaderImpl = ObjectStoreEnum< + $( + MonitoredStreamingUploader<<$type_name as ObjectStore>::StreamingUploader> + ),* + >; + }; } +define_object_store_impl!(); + /// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl` -/// enum type and the `paths`. It is a modification of the macro above to work with a slice of -/// strings instead of just a single one. +/// enum type and the `path`. /// -/// Except for `InMem`, the operation should be performed on remote object store. -macro_rules! object_store_impl_method_body_slice { - ($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => { +/// Except for `InMem`,the operation should be performed on remote object store. +macro_rules! object_store_impl_method_body { + // with await + ($object_store:expr, $method_name:ident ($($args:expr),*).await) => { { - let paths_rem = partition_object_store_paths($paths); - - match $object_store { - ObjectStoreImpl::InMem(in_mem) => { - $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) - }, - ObjectStoreImpl::Opendal(opendal) => { - $dispatch_macro!(opendal, $method_name, &paths_rem $(, $args)*) - }, - ObjectStoreImpl::S3(s3) => { - $dispatch_macro!(s3, $method_name, &paths_rem $(, $args)*) - }, - #[cfg(madsim)] - ObjectStoreImpl::Sim(in_mem) => { - $dispatch_macro!(in_mem, $method_name, &paths_rem $(, $args)*) - }, - } + dispatch_object_store_enum! {$object_store, |os| { + os.$method_name($($args),*).await + }} + } + }; + // no await + ($object_store:expr, $method_name:ident ($(, $args:expr)*)) => { + { + dispatch_object_store_enum! {$object_store, |os| { + os.$method_name($($args),*) + }} } }; } +impl StreamingUploaderImpl { + pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { + object_store_impl_method_body!(self, write_bytes(data).await) + } + + pub async fn finish(self) -> ObjectResult<()> { + object_store_impl_method_body!(self, finish().await) + } + + pub fn get_memory_usage(&self) -> u64 { + object_store_impl_method_body!(self, get_memory_usage()) + } +} + impl ObjectStoreImpl { pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { - object_store_impl_method_body!(self, upload, dispatch_async, path, obj) + object_store_impl_method_body!(self, upload(path, obj).await) } - pub async fn streaming_upload(&self, path: &str) -> ObjectResult { - object_store_impl_method_body!(self, streaming_upload, dispatch_async, path) + pub async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(enum_map!(self, |store| { + store.streaming_upload(path).await? + })) } pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { - object_store_impl_method_body!(self, read, dispatch_async, path, range) + object_store_impl_method_body!(self, read(path, range).await) } pub async fn metadata(&self, path: &str) -> ObjectResult { - object_store_impl_method_body!(self, metadata, dispatch_async, path) + object_store_impl_method_body!(self, metadata(path).await) } /// Returns a stream reading the object specified in `path`. If given, the stream starts at the @@ -232,11 +305,11 @@ impl ObjectStoreImpl { path: &str, start_loc: Range, ) -> ObjectResult { - object_store_impl_method_body!(self, streaming_read, dispatch_async, path, start_loc) + object_store_impl_method_body!(self, streaming_read(path, start_loc).await) } pub async fn delete(&self, path: &str) -> ObjectResult<()> { - object_store_impl_method_body!(self, delete, dispatch_async, path) + object_store_impl_method_body!(self, delete(path).await) } /// Deletes the objects with the given paths permanently from the storage. If an object @@ -245,36 +318,19 @@ impl ObjectStoreImpl { /// If a hybrid storage is used, the method will first attempt to delete objects in local /// storage. Only if that is successful, it will remove objects from remote storage. pub async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { - object_store_impl_method_body_slice!(self, delete_objects, dispatch_async, paths) + object_store_impl_method_body!(self, delete_objects(paths).await) } pub async fn list(&self, prefix: &str) -> ObjectResult { - object_store_impl_method_body!(self, list, dispatch_async, prefix) + object_store_impl_method_body!(self, list(prefix).await) } pub fn get_object_prefix(&self, obj_id: u64) -> String { - // FIXME: ObjectStoreImpl lacks flexibility for adding new interface to ObjectStore - // trait. Macro object_store_impl_method_body routes to local or remote only depending on - // the path - match self { - ObjectStoreImpl::InMem(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::Opendal(store) => store.inner.get_object_prefix(obj_id), - ObjectStoreImpl::S3(store) => store.inner.get_object_prefix(obj_id), - #[cfg(madsim)] - ObjectStoreImpl::Sim(store) => store.inner.get_object_prefix(obj_id), - } + dispatch_object_store_enum!(self, |store| store.inner.get_object_prefix(obj_id)) } pub fn support_streaming_upload(&self) -> bool { - match self { - ObjectStoreImpl::InMem(_) => true, - ObjectStoreImpl::Opendal(store) => { - store.inner.op.info().native_capability().write_can_multi - } - ObjectStoreImpl::S3(_) => true, - #[cfg(madsim)] - ObjectStoreImpl::Sim(_) => true, - } + dispatch_object_store_enum!(self, |store| store.inner.support_streaming_upload()) } } @@ -303,18 +359,18 @@ fn try_update_failure_metric( /// - `streaming_upload_finish`: The time spent calling `finish`. /// - `failure_count`: `streaming_upload_start`, `streaming_upload_write_bytes`, /// `streaming_upload_finish` -pub struct MonitoredStreamingUploader { - inner: BoxedStreamingUploader, +pub struct MonitoredStreamingUploader { + inner: U, object_store_metrics: Arc, /// Length of data uploaded with this uploader. operation_size: usize, media_type: &'static str, } -impl MonitoredStreamingUploader { +impl MonitoredStreamingUploader { pub fn new( media_type: &'static str, - handle: BoxedStreamingUploader, + handle: U, object_store_metrics: Arc, ) -> Self { Self { @@ -326,8 +382,8 @@ impl MonitoredStreamingUploader { } } -impl MonitoredStreamingUploader { - pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { +impl MonitoredStreamingUploader { + async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { let operation_type = OperationType::StreamingUpload; let operation_type_str = operation_type.as_str(); let data_len = data.len(); @@ -358,7 +414,7 @@ impl MonitoredStreamingUploader { res } - pub async fn finish(self) -> ObjectResult<()> { + async fn finish(self) -> ObjectResult<()> { let operation_type = OperationType::StreamingUploadFinish; let operation_type_str = operation_type.as_str(); let _timer = self @@ -381,7 +437,7 @@ impl MonitoredStreamingUploader { res } - pub fn get_memory_usage(&self) -> u64 { + fn get_memory_usage(&self) -> u64 { self.inner.get_memory_usage() } } @@ -544,7 +600,10 @@ impl MonitoredObjectStore { res } - pub async fn streaming_upload(&self, path: &str) -> ObjectResult { + pub async fn streaming_upload( + &self, + path: &str, + ) -> ObjectResult> { let operation_type = OperationType::StreamingUploadInit; let operation_type_str = operation_type.as_str(); let media_type = self.media_type(); diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index c073df7c1e102..30c63b15ec5e5 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -27,8 +27,8 @@ use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; use crate::object::{ - prefix, BoxedStreamingUploader, ObjectDataStream, ObjectError, ObjectMetadata, - ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, + prefix, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds, + ObjectResult, ObjectStore, StreamingUploader, }; /// Opendal object storage. @@ -70,6 +70,8 @@ impl OpendalObjectStore { #[async_trait::async_trait] impl ObjectStore for OpendalObjectStore { + type StreamingUploader = OpendalStreamingUploader; + fn get_object_prefix(&self, obj_id: u64) -> String { match self.engine_type { EngineType::S3 => prefix::s3::get_object_prefix(obj_id), @@ -94,11 +96,11 @@ impl ObjectStore for OpendalObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(Box::new( + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok( OpendalStreamingUploader::new(self.op.clone(), path.to_string(), self.config.clone()) .await?, - )) + ) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { @@ -244,6 +246,10 @@ impl ObjectStore for OpendalObjectStore { EngineType::Fs => "Fs", } } + + fn support_streaming_upload(&self) -> bool { + self.op.info().native_capability().write_can_multi + } } /// Store multiple parts in a map, and concatenate them on finish. @@ -280,7 +286,6 @@ impl OpendalStreamingUploader { const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024; -#[async_trait::async_trait] impl StreamingUploader for OpendalStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { self.writer.write(data).await?; @@ -288,7 +293,7 @@ impl StreamingUploader for OpendalStreamingUploader { Ok(()) } - async fn finish(mut self: Box) -> ObjectResult<()> { + async fn finish(mut self) -> ObjectResult<()> { match self.writer.close().await { Ok(_) => (), Err(err) => { diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index d7c151e867465..9cc559f22d043 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -59,8 +59,8 @@ use tokio::task::JoinHandle; use super::object_metrics::ObjectStoreMetrics; use super::{ - prefix, retry_request, BoxedStreamingUploader, Bytes, ObjectError, ObjectErrorInner, - ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, + prefix, retry_request, Bytes, ObjectError, ObjectErrorInner, ObjectMetadata, ObjectRangeBounds, + ObjectResult, ObjectStore, StreamingUploader, }; use crate::object::{ try_update_failure_metric, ObjectDataStream, ObjectMetadataIter, OperationType, @@ -301,7 +301,6 @@ impl S3StreamingUploader { } } -#[async_trait::async_trait] impl StreamingUploader for S3StreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { fail_point!("s3_write_bytes_err", |_| Err(ObjectError::internal( @@ -321,7 +320,7 @@ impl StreamingUploader for S3StreamingUploader { /// If the multipart upload has not been initiated, we can use `PutObject` instead to save the /// `CreateMultipartUpload` and `CompleteMultipartUpload` requests. Otherwise flush the /// remaining data of the buffer to S3 as a new part. - async fn finish(mut self: Box) -> ObjectResult<()> { + async fn finish(mut self) -> ObjectResult<()> { fail_point!("s3_finish_streaming_upload_err", |_| Err( ObjectError::internal("s3 finish streaming upload error") )); @@ -381,6 +380,8 @@ pub struct S3ObjectStore { #[async_trait::async_trait] impl ObjectStore for S3ObjectStore { + type StreamingUploader = S3StreamingUploader; + fn get_object_prefix(&self, obj_id: u64) -> String { // Delegate to static method to avoid creating an `S3ObjectStore` in unit test. prefix::s3::get_object_prefix(obj_id) @@ -407,18 +408,18 @@ impl ObjectStore for S3ObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { + async fn streaming_upload(&self, path: &str) -> ObjectResult { fail_point!("s3_streaming_upload_err", |_| Err(ObjectError::internal( "s3 streaming upload error" ))); - Ok(Box::new(S3StreamingUploader::new( + Ok(S3StreamingUploader::new( self.client.clone(), self.bucket.clone(), self.part_size, path.to_string(), self.metrics.clone(), self.config.clone(), - ))) + )) } /// Amazon S3 doesn't support retrieving multiple ranges of data per GET request. diff --git a/src/object_store/src/object/sim/mod.rs b/src/object_store/src/object/sim/mod.rs index 31f34cbcd3267..4c91531020ea3 100644 --- a/src/object_store/src/object/sim/mod.rs +++ b/src/object_store/src/object/sim/mod.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![cfg(madsim)] + mod client; mod error; use bytes::{BufMut, BytesMut}; @@ -33,8 +35,8 @@ use risingwave_common::range::RangeBoundsExt; use self::client::Client; use self::service::Response; use super::{ - BoxedStreamingUploader, Bytes, ObjectDataStream, ObjectError, ObjectMetadata, - ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader, + Bytes, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, ObjectRangeBounds, + ObjectResult, ObjectStore, StreamingUploader, }; pub struct SimStreamingUploader { @@ -53,14 +55,13 @@ impl SimStreamingUploader { } } -#[async_trait::async_trait] impl StreamingUploader for SimStreamingUploader { async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { self.buf.put(data); Ok(()) } - async fn finish(mut self: Box) -> ObjectResult<()> { + async fn finish(mut self) -> ObjectResult<()> { if self.buf.is_empty() { Err(ObjectError::internal("upload empty object")) } else { @@ -115,6 +116,8 @@ pub struct SimObjectStore { #[async_trait::async_trait] impl ObjectStore for SimObjectStore { + type StreamingUploader = SimStreamingUploader; + fn get_object_prefix(&self, _obj_id: u64) -> String { String::default() } @@ -136,11 +139,11 @@ impl ObjectStore for SimObjectStore { } } - async fn streaming_upload(&self, path: &str) -> ObjectResult { - Ok(Box::new(SimStreamingUploader::new( + async fn streaming_upload(&self, path: &str) -> ObjectResult { + Ok(SimStreamingUploader::new( self.client.clone(), path.to_string(), - ))) + )) } async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult {