Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(object store): bump OpenDAL to v0.41 #13101

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
hyper-tls = "0.5.0"
itertools = "0.11"
opendal = "0.39"
opendal = "0.41"
prometheus = { version = "0.13", features = ["process"] }
risingwave_common = { workspace = true }
spin = "0.9"
Expand Down
7 changes: 1 addition & 6 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,7 @@ impl ObjectStoreImpl {
match self {
ObjectStoreImpl::InMem(_) => true,
ObjectStoreImpl::Opendal(store) => {
store
.inner
.op
.info()
.capability()
.write_without_content_length
store.inner.op.info().native_capability().write_can_multi
}
ObjectStoreImpl::S3(_) => true,
}
Expand Down
8 changes: 0 additions & 8 deletions src/object_store/src/object/opendal_engine/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ use opendal::Operator;
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

/// The fixed number of bytes that is buffered before they are uploaded as a part, will be used in
/// streaing upload.
///
/// Reference: <https://cloud.google.com/storage/docs/streaming-uploads>
const GCS_PART_SIZE: usize = 16 * 1024 * 1024;

impl OpendalObjectStore {
/// create opendal gcs engine.
pub fn new_gcs_engine(bucket: String, root: String) -> ObjectResult<Self> {
Expand All @@ -35,8 +29,6 @@ impl OpendalObjectStore {

builder.root(&root);

builder.write_fixed_size(GCS_PART_SIZE);

// if credential env is set, use it. Otherwise, ADC will be used.
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
Expand Down
150 changes: 45 additions & 105 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::Bytes;
use fail::fail_point;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream, StreamExt};
use futures::{stream, StreamExt};
use opendal::services::Memory;
use opendal::{Entry, Error, Lister, Metakey, Operator, Writer};
use opendal::{Metakey, Operator, Writer};
use risingwave_common::range::RangeBoundsExt;
use tokio::io::AsyncRead;

Expand Down Expand Up @@ -84,7 +80,10 @@ impl ObjectStore for OpendalObjectStore {
let data = if range.is_full() {
self.op.read(path).await?
} else {
self.op.range_read(path, range.map(|v| *v as u64)).await?
self.op
.read_with(path)
.range(range.map(|v| *v as u64))
.await?
};

if let Some(len) = range.len() && len != data.len() {
Expand Down Expand Up @@ -112,7 +111,12 @@ impl ObjectStore for OpendalObjectStore {
ObjectError::internal("opendal streaming read error")
));
let reader = match start_pos {
Some(start_position) => self.op.range_reader(path, start_position as u64..).await?,
Some(start_position) => {
self.op
.reader_with(path)
.range(start_position as u64..)
.await?
}
None => self.op.reader(path).await?,
};

Expand Down Expand Up @@ -149,8 +153,36 @@ impl ObjectStore for OpendalObjectStore {
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let lister = self.op.scan(prefix).await?;
Ok(Box::pin(OpenDalObjectIter::new(lister, self.op.clone())))
let object_lister = self
.op
.lister_with(prefix)
.delimiter("")
.metakey(Metakey::ContentLength | Metakey::ContentType)
.await?;

let stream = stream::unfold(object_lister, |mut object_lister| async move {
match object_lister.next().await {
Some(Ok(object)) => {
let key = object.path().to_string();
let om = object.metadata();
let last_modified = match om.last_modified() {
Some(t) => t.timestamp() as f64,
None => 0_f64,
};
let total_size = om.content_length() as usize;
let metadata = ObjectMetadata {
key,
last_modified,
total_size,
};
Some((Ok(metadata), object_lister))
}
Some(Err(err)) => Some((Err(err.into()), object_lister)),
None => None,
}
});

Ok(stream.boxed())
}

fn store_media_type(&self) -> &'static str {
Expand All @@ -172,12 +204,12 @@ pub struct OpenDalStreamingUploader {
}
impl OpenDalStreamingUploader {
pub async fn new(op: Operator, path: String) -> ObjectResult<Self> {
let writer = op.writer(&path).await?;
let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?;
Ok(Self { writer })
}
}

const OPENDAL_BUFFER_SIZE: u64 = 8 * 1024 * 1024;
const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024;

#[async_trait::async_trait]
impl StreamingUploader for OpenDalStreamingUploader {
Expand All @@ -199,99 +231,7 @@ impl StreamingUploader for OpenDalStreamingUploader {
}

fn get_memory_usage(&self) -> u64 {
OPENDAL_BUFFER_SIZE
}
}

struct OpenDalObjectIter {
lister: Option<Lister>,
op: Option<Operator>,
#[allow(clippy::type_complexity)]
next_future: Option<BoxFuture<'static, (Option<Result<Entry, Error>>, Lister)>>,
#[allow(clippy::type_complexity)]
metadata_future: Option<BoxFuture<'static, (Result<ObjectMetadata, Error>, Operator)>>,
}

impl OpenDalObjectIter {
fn new(lister: Lister, op: Operator) -> Self {
Self {
lister: Some(lister),
op: Some(op),
next_future: None,
metadata_future: None,
}
}
}

impl Stream for OpenDalObjectIter {
type Item = ObjectResult<ObjectMetadata>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(metadata_future) = self.metadata_future.as_mut() {
let (result, op) = ready!(metadata_future.poll_unpin(cx));
self.op = Some(op);
return match result {
Ok(m) => {
self.metadata_future = None;
Poll::Ready(Some(Ok(m)))
}
Err(e) => {
self.metadata_future = None;
Poll::Ready(Some(Err(e.into())))
}
};
}
if let Some(next_future) = self.next_future.as_mut() {
let (option, lister) = ready!(next_future.poll_unpin(cx));
self.lister = Some(lister);
return match option {
None => {
self.next_future = None;
Poll::Ready(None)
}
Some(result) => {
self.next_future = None;
match result {
Ok(object) => {
let op = self.op.take().expect("op should not be None");
let f = async move {
let key = object.path().to_string();
// FIXME: How does opendal metadata cache work?
// Will below line result in one IO per object?
let om = match op
.metadata(
&object,
Metakey::LastModified | Metakey::ContentLength,
)
.await
{
Ok(om) => om,
Err(e) => return (Err(e), op),
};
let last_modified = match om.last_modified() {
Some(t) => t.timestamp() as f64,
None => 0_f64,
};
let total_size = om.content_length() as usize;
let metadata = ObjectMetadata {
key,
last_modified,
total_size,
};
(Ok(metadata), op)
};
self.metadata_future = Some(Box::pin(f));
self.poll_next(cx)
}
Err(e) => Poll::Ready(Some(Err(e.into()))),
}
}
};
}
let mut lister = self.lister.take().expect("list should not be None");
let f = async move { (lister.next().await, lister) };
self.next_future = Some(Box::pin(f));
self.poll_next(cx)
OPENDAL_BUFFER_SIZE as u64
}
}

Expand Down
Loading