Skip to content

Commit

Permalink
refactor(object-store): remove async_trait macro on StreamingUploader…
Browse files Browse the repository at this point in the history
… and refine object store macro (#16800)
  • Loading branch information
wenym1 authored May 20, 2024
1 parent 1cc36e1 commit 8b5a7bc
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 141 deletions.
6 changes: 3 additions & 3 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut};
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_object_store::object::{ObjectStore, StreamingUploader};
use risingwave_object_store::object::{ObjectStore, OpendalStreamingUploader, StreamingUploader};
use serde::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
Expand Down Expand Up @@ -184,7 +184,7 @@ pub struct SnowflakeSinkWriter {
/// note: the option here *implicitly* indicates whether we have at
/// least call `streaming_upload` once during this epoch,
/// which is mainly used to prevent uploading empty data.
streaming_uploader: Option<(Box<dyn StreamingUploader>, String)>,
streaming_uploader: Option<(OpendalStreamingUploader, String)>,
}

impl SnowflakeSinkWriter {
Expand Down Expand Up @@ -242,7 +242,7 @@ impl SnowflakeSinkWriter {
/// and `streaming_upload` being called the first time.
/// i.e., lazily initialization of the internal `streaming_uploader`.
/// plus, this function is *pure*, the `&mut self` here is to make rustc (and tokio) happy.
async fn new_streaming_uploader(&mut self) -> Result<(Box<dyn StreamingUploader>, String)> {
async fn new_streaming_uploader(&mut self) -> Result<(OpendalStreamingUploader, String)> {
let file_suffix = self.file_suffix();
let path = generate_s3_file_name(self.s3_client.s3_path(), &file_suffix);
let uploader = self
Expand Down
14 changes: 7 additions & 7 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use thiserror::Error;
use tokio::sync::Mutex;

use super::{
BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult,
ObjectStore, StreamingUploader,
ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader,
};
use crate::object::{ObjectDataStream, ObjectMetadataIter};

Expand Down Expand Up @@ -64,7 +63,6 @@ pub struct InMemStreamingUploader {
objects: Arc<Mutex<HashMap<String, (ObjectMetadata, Bytes)>>>,
}

#[async_trait::async_trait]
impl StreamingUploader for InMemStreamingUploader {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
fail_point!("mem_write_bytes_err", |_| Err(ObjectError::internal(
Expand All @@ -74,7 +72,7 @@ impl StreamingUploader for InMemStreamingUploader {
Ok(())
}

async fn finish(self: Box<Self>) -> ObjectResult<()> {
async fn finish(self) -> ObjectResult<()> {
fail_point!("mem_finish_streaming_upload_err", |_| Err(
ObjectError::internal("mem finish streaming upload error")
));
Expand All @@ -101,6 +99,8 @@ pub struct InMemObjectStore {

#[async_trait::async_trait]
impl ObjectStore for InMemObjectStore {
type StreamingUploader = InMemStreamingUploader;

fn get_object_prefix(&self, _obj_id: u64) -> String {
String::default()
}
Expand All @@ -121,12 +121,12 @@ impl ObjectStore for InMemObjectStore {
}
}

async fn streaming_upload(&self, path: &str) -> ObjectResult<BoxedStreamingUploader> {
Ok(Box::new(InMemStreamingUploader {
async fn streaming_upload(&self, path: &str) -> ObjectResult<Self::StreamingUploader> {
Ok(InMemStreamingUploader {
path: path.to_string(),
buf: BytesMut::new(),
objects: self.objects.clone(),
}))
})
}

async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
Expand Down
Loading

0 comments on commit 8b5a7bc

Please sign in to comment.