Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(object store): bump OpenDAL to v0.41 #13101

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
hyper = "0.14"
hyper-tls = "0.5.0"
itertools = "0.11"
opendal = "0.39"
opendal = "0.41"
prometheus = { version = "0.13", features = ["process"] }
risingwave_common = { workspace = true }
spin = "0.9"
Expand Down
7 changes: 1 addition & 6 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,7 @@ impl ObjectStoreImpl {
match self {
ObjectStoreImpl::InMem(_) => true,
ObjectStoreImpl::Opendal(store) => {
store
.inner
.op
.info()
.capability()
.write_without_content_length
store.inner.op.info().native_capability().write_can_multi
}
ObjectStoreImpl::S3(_) => true,
}
Expand Down
8 changes: 0 additions & 8 deletions src/object_store/src/object/opendal_engine/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ use opendal::Operator;
use super::{EngineType, OpendalObjectStore};
use crate::object::ObjectResult;

/// The fixed number of bytes that is buffered before they are uploaded as a part, will be used in
/// streaing upload.
///
/// Reference: <https://cloud.google.com/storage/docs/streaming-uploads>
const GCS_PART_SIZE: usize = 16 * 1024 * 1024;

impl OpendalObjectStore {
/// create opendal gcs engine.
pub fn new_gcs_engine(bucket: String, root: String) -> ObjectResult<Self> {
Expand All @@ -35,8 +29,6 @@ impl OpendalObjectStore {

builder.root(&root);

builder.write_fixed_size(GCS_PART_SIZE);

// if credential env is set, use it. Otherwise, ADC will be used.
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
Expand Down
34 changes: 17 additions & 17 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fail::fail_point;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream, StreamExt};
use opendal::services::Memory;
use opendal::{Entry, Error, Lister, Metakey, Operator, Writer};
use opendal::{Entry, Error, Lister, Operator, Writer};
use risingwave_common::range::RangeBoundsExt;
use tokio::io::AsyncRead;

Expand Down Expand Up @@ -84,7 +84,10 @@ impl ObjectStore for OpendalObjectStore {
let data = if range.is_full() {
self.op.read(path).await?
} else {
self.op.range_read(path, range.map(|v| *v as u64)).await?
self.op
.read_with(path)
.range(range.map(|v| *v as u64))
.await?
};

if let Some(len) = range.len() && len != data.len() {
Expand Down Expand Up @@ -112,7 +115,12 @@ impl ObjectStore for OpendalObjectStore {
ObjectError::internal("opendal streaming read error")
));
let reader = match start_pos {
Some(start_position) => self.op.range_reader(path, start_position as u64..).await?,
Some(start_position) => {
self.op
.reader_with(path)
.range(start_position as u64..)
.await?
}
None => self.op.reader(path).await?,
};

Expand Down Expand Up @@ -149,7 +157,7 @@ impl ObjectStore for OpendalObjectStore {
}

async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let lister = self.op.scan(prefix).await?;
let lister = self.op.lister_with(prefix).delimiter("").await?;
Ok(Box::pin(OpenDalObjectIter::new(lister, self.op.clone())))
}

Expand All @@ -172,12 +180,12 @@ pub struct OpenDalStreamingUploader {
}
impl OpenDalStreamingUploader {
pub async fn new(op: Operator, path: String) -> ObjectResult<Self> {
let writer = op.writer(&path).await?;
let writer = op.writer_with(&path).buffer(OPENDAL_BUFFER_SIZE).await?;
Ok(Self { writer })
}
}

const OPENDAL_BUFFER_SIZE: u64 = 8 * 1024 * 1024;
const OPENDAL_BUFFER_SIZE: usize = 16 * 1024 * 1024;

#[async_trait::async_trait]
impl StreamingUploader for OpenDalStreamingUploader {
Expand All @@ -199,7 +207,7 @@ impl StreamingUploader for OpenDalStreamingUploader {
}

fn get_memory_usage(&self) -> u64 {
OPENDAL_BUFFER_SIZE
OPENDAL_BUFFER_SIZE as u64
}
}

Expand Down Expand Up @@ -258,16 +266,8 @@ impl Stream for OpenDalObjectIter {
let key = object.path().to_string();
// FIXME: How does opendal metadata cache work?
// Will below line result in one IO per object?
let om = match op
.metadata(
&object,
Metakey::LastModified | Metakey::ContentLength,
)
.await
{
Ok(om) => om,
Err(e) => return (Err(e), op),
};
let om = object.metadata();
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

let last_modified = match om.last_modified() {
Some(t) => t.timestamp() as f64,
None => 0_f64,
Expand Down
Loading