Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(object-store): remove async_trait macro on StreamingUploader and refine object store macro #16800

Merged
merged 4 commits into from
May 20, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refine macro
  • Loading branch information
wenym1 committed May 16, 2024
commit 7155dae0cdf6564078e4c92c32fa6117d595e6ad
242 changes: 98 additions & 144 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(madsim)]
pub mod sim;
use std::ops::{Range, RangeBounds};
use std::sync::Arc;
@@ -47,24 +46,10 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff};
use self::sim::SimObjectStore;

pub type ObjectStoreRef = Arc<ObjectStoreImpl>;
pub type ObjectStreamingUploader = impl StreamingUploader;
pub type ObjectStreamingUploader = StreamingUploaderImpl;

pub trait ObjectRangeBounds = RangeBounds<usize> + Clone + Send + Sync + std::fmt::Debug + 'static;

/// Partitions a set of given paths into two vectors. The first vector contains all local paths, and
/// the second contains all remote paths.
fn partition_object_store_paths(paths: &[String]) -> Vec<String> {
// ToDo: Currently the result is a copy of the input. Would it be worth it to use an in-place
// partition instead?
let mut vec_rem = vec![];

for path in paths {
vec_rem.push(path.to_string());
}

vec_rem
}

#[derive(Debug, Clone, PartialEq)]
pub struct ObjectMetadata {
// Full path
@@ -75,9 +60,11 @@ pub struct ObjectMetadata {
}

pub trait StreamingUploader: Send {
fn write_bytes(&mut self, data: Bytes) -> impl Future<Output = ObjectResult<()>> + Send + '_;
#[expect(async_fn_in_trait)]
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()>;

fn finish(self) -> impl Future<Output = ObjectResult<()>> + Send;
#[expect(async_fn_in_trait)]
async fn finish(self) -> ObjectResult<()>;

fn get_memory_usage(&self) -> u64;
}
@@ -132,6 +119,10 @@ pub trait ObjectStore: Send + Sync {
async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter>;

fn store_media_type(&self) -> &'static str;

fn support_streaming_upload(&self) -> bool {
true
}
}

#[cfg(not(madsim))]
@@ -163,97 +154,35 @@ macro_rules! for_all_object_store {
}
}

macro_rules! define_object_store_impl {
() => {
for_all_object_store! {
define_object_store_impl
}
};
macro_rules! enum_map {
(
{$(
{$variant:ident, $type_name:ty}
),*}
{
$(
{$variant:ident, $_type_name:ty}
),*
},
$source_enum_type:ident,
$target_enum_type:ident,
$object_store:expr,
$var_name:ident,
$func:expr
) => {
pub enum ObjectStoreImpl {
match $object_store {
$(
$variant(MonitoredObjectStore<$type_name>),
$source_enum_type::$variant($var_name) => $target_enum_type::$variant({
$func
}),
)*
}
};
}

define_object_store_impl!();

macro_rules! define_streaming_uploader_impl {
() => {
($source_enum_type:ident, $target_enum_type:ident, $object_store:expr, |$var_name:ident| $func:expr) => {
for_all_object_store! {
define_streaming_uploader_impl
}
};
(
{$(
{$variant:ident, $_type_name:ty}
),*}
) => {
enum StreamingUploaderImpl<
$($variant),*
> {
$(
$variant($variant),
)*
}

impl<
$($variant: StreamingUploader),*
> StreamingUploader for StreamingUploaderImpl<
$($variant),*
> {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
match self {
$(
StreamingUploaderImpl::$variant(uploader) => {
uploader.write_bytes(data).await
},
)*
}
}

async fn finish(self) -> ObjectResult<()> {
match self {
$(
StreamingUploaderImpl::$variant(uploader) => {
uploader.finish().await
},
)*
}
}

fn get_memory_usage(&self) -> u64 {
match self {
$(
StreamingUploaderImpl::$variant(uploader) => {
uploader.get_memory_usage()
},
)*
}
}
}

impl ObjectStoreImpl {
async fn streaming_upload_inner(&self, path: &str) -> ObjectResult<ObjectStreamingUploader> {
Ok(match self {
$(
ObjectStoreImpl::$variant(object_store) => StreamingUploaderImpl::$variant(object_store.streaming_upload(path).await?),
)*
})
}
enum_map, $source_enum_type, $target_enum_type, $object_store, $var_name, $func
}
};
}

define_streaming_uploader_impl!();

macro_rules! dispatch_object_store {
macro_rules! dispatch_object_store_enum {
(
{
$(
@@ -266,73 +195,108 @@ macro_rules! dispatch_object_store {
) => {
match $object_store {
$(
ObjectStoreImpl::$variant($var_name) => {
ObjectStoreEnum::$variant($var_name) => {
$func
},
)*
}
};
($object_store:expr, |$var_name:ident| $func:expr) => {
for_all_object_store! {
dispatch_object_store, $object_store, $var_name, $func
dispatch_object_store_enum, $object_store, $var_name, $func
}
};
}

macro_rules! dispatch_async {
($object_store:expr, $method_name:ident $(, $args:expr)*) => {
$object_store.$method_name($($args, )*).await
}
macro_rules! define_object_store_impl {
() => {
for_all_object_store! {
define_object_store_impl
}
};
(
{$(
{$variant:ident, $type_name:ty}
),*}
) => {
pub enum ObjectStoreEnum<
$($variant),*
> {
$(
$variant($variant),
)*
}

pub type ObjectStoreImpl = ObjectStoreEnum<
$(
MonitoredObjectStore<$type_name>,
)*
>;

pub type StreamingUploaderImpl = ObjectStoreEnum<
$(
MonitoredStreamingUploader<<$type_name as ObjectStore>::StreamingUploader>
),*
>;
};
}

define_object_store_impl!();

/// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl`
/// enum type and the `path`.
///
/// Except for `InMem`,the operation should be performed on remote object store.
macro_rules! object_store_impl_method_body {
($object_store:expr, $method_name:ident, $dispatch_macro:ident, $path:expr $(, $args:expr)*) => {
// with await
($object_store:expr, $method_name:ident ($($args:expr),*).await) => {
{
let path = $path;
dispatch_object_store! {$object_store, |os| {
$dispatch_macro!(os, $method_name, path $(, $args)*)
dispatch_object_store_enum! {$object_store, |os| {
os.$method_name($($args),*).await
}}
}
};
}

/// This macro routes the object store operation to the real implementation by the `ObjectStoreImpl`
/// enum type and the `paths`. It is a modification of the macro above to work with a slice of
/// strings instead of just a single one.
///
/// Except for `InMem`, the operation should be performed on remote object store.
macro_rules! object_store_impl_method_body_slice {
($object_store:expr, $method_name:ident, $dispatch_macro:ident, $paths:expr $(, $args:expr)*) => {
// no await
($object_store:expr, $method_name:ident ($(, $args:expr)*)) => {
{
let paths_rem = partition_object_store_paths($paths);
dispatch_object_store! {
$object_store, |os| {
$dispatch_macro!(os, $method_name, &paths_rem $(, $args)*)
}
}
dispatch_object_store_enum! {$object_store, |os| {
os.$method_name($($args),*)
}}
}
};
}

impl StreamingUploaderImpl {
pub async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
object_store_impl_method_body!(self, write_bytes(data).await)
}

pub async fn finish(self) -> ObjectResult<()> {
object_store_impl_method_body!(self, finish().await)
}

pub fn get_memory_usage(&self) -> u64 {
object_store_impl_method_body!(self, get_memory_usage())
}
}

impl ObjectStoreImpl {
pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
object_store_impl_method_body!(self, upload, dispatch_async, path, obj)
object_store_impl_method_body!(self, upload(path, obj).await)
}

pub async fn streaming_upload(&self, path: &str) -> ObjectResult<ObjectStreamingUploader> {
self.streaming_upload_inner(path).await
Ok(enum_map!(Self, StreamingUploaderImpl, self, |store| {
store.streaming_upload(path).await?
}))
}

pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
object_store_impl_method_body!(self, read, dispatch_async, path, range)
object_store_impl_method_body!(self, read(path, range).await)
}

pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
object_store_impl_method_body!(self, metadata, dispatch_async, path)
object_store_impl_method_body!(self, metadata(path).await)
}

/// Returns a stream reading the object specified in `path`. If given, the stream starts at the
@@ -343,11 +307,11 @@ impl ObjectStoreImpl {
path: &str,
start_loc: Range<usize>,
) -> ObjectResult<MonitoredStreamingReader> {
object_store_impl_method_body!(self, streaming_read, dispatch_async, path, start_loc)
object_store_impl_method_body!(self, streaming_read(path, start_loc).await)
}

pub async fn delete(&self, path: &str) -> ObjectResult<()> {
object_store_impl_method_body!(self, delete, dispatch_async, path)
object_store_impl_method_body!(self, delete(path).await)
}

/// Deletes the objects with the given paths permanently from the storage. If an object
@@ -356,29 +320,19 @@ impl ObjectStoreImpl {
/// If a hybrid storage is used, the method will first attempt to delete objects in local
/// storage. Only if that is successful, it will remove objects from remote storage.
pub async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
object_store_impl_method_body_slice!(self, delete_objects, dispatch_async, paths)
object_store_impl_method_body!(self, delete_objects(paths).await)
}

pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
object_store_impl_method_body!(self, list, dispatch_async, prefix)
object_store_impl_method_body!(self, list(prefix).await)
}

pub fn get_object_prefix(&self, obj_id: u64) -> String {
dispatch_object_store!(self, |store| {
store.inner.get_object_prefix(obj_id)
})
dispatch_object_store_enum!(self, |store| store.inner.get_object_prefix(obj_id))
}

pub fn support_streaming_upload(&self) -> bool {
match self {
ObjectStoreImpl::InMem(_) => true,
ObjectStoreImpl::Opendal(store) => {
store.inner.op.info().native_capability().write_can_multi
}
ObjectStoreImpl::S3(_) => true,
#[cfg(madsim)]
ObjectStoreImpl::Sim(_) => true,
}
dispatch_object_store_enum!(self, |store| store.inner.support_streaming_upload())
}
}

@@ -430,7 +384,7 @@ impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
}
}

impl<U: StreamingUploader> StreamingUploader for MonitoredStreamingUploader<U> {
impl<U: StreamingUploader> MonitoredStreamingUploader<U> {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
let operation_type = OperationType::StreamingUpload;
let operation_type_str = operation_type.as_str();
Original file line number Diff line number Diff line change
@@ -246,6 +246,10 @@ impl ObjectStore for OpendalObjectStore {
EngineType::Fs => "Fs",
}
}

fn support_streaming_upload(&self) -> bool {
self.op.info().native_capability().write_can_multi
}
}

/// Store multiple parts in a map, and concatenate them on finish.
2 changes: 2 additions & 0 deletions src/object_store/src/object/sim/mod.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg(madsim)]

mod client;
mod error;
use bytes::{BufMut, BytesMut};
2 changes: 1 addition & 1 deletion src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ use risingwave_common::config::StorageMemoryConfig;
use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX};
use risingwave_hummock_trace::TracedCachePolicy;
use risingwave_object_store::object::{
ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader, StreamingUploader,
ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader,
};
use risingwave_pb::hummock::SstableInfo;
use serde::{Deserialize, Serialize};