Skip to content

Commit

Permalink
fix(storage): use correct object prefix for s3 for opendal object sto…
Browse files Browse the repository at this point in the history
…re (#15833)
  • Loading branch information
hzxa21 authored Mar 25, 2024
1 parent 7adb748 commit e3f3fb8
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 19 deletions.
2 changes: 2 additions & 0 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub use s3::*;
pub mod error;
pub mod object_metrics;

mod prefix;

pub use error::*;
use object_metrics::ObjectStoreMetrics;
use thiserror_ext::AsReport;
Expand Down
18 changes: 14 additions & 4 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use risingwave_common::range::RangeBoundsExt;
use thiserror_ext::AsReport;

use crate::object::{
BoxedStreamingUploader, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter,
ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader,
prefix, BoxedStreamingUploader, ObjectDataStream, ObjectError, ObjectMetadata,
ObjectMetadataIter, ObjectRangeBounds, ObjectResult, ObjectStore, StreamingUploader,
};

/// Opendal object storage.
Expand Down Expand Up @@ -61,8 +61,18 @@ impl OpendalObjectStore {

#[async_trait::async_trait]
impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, _obj_id: u64) -> String {
String::default()
fn get_object_prefix(&self, obj_id: u64) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
EngineType::Obs => String::default(),
EngineType::Oss => String::default(),
EngineType::Webhdfs => String::default(),
EngineType::Azblob => String::default(),
EngineType::Fs => String::default(),
}
}

async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
Expand Down
25 changes: 25 additions & 0 deletions src/object_store/src/object/prefix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) mod s3 {
/// The number of S3 bucket prefixes
pub(crate) const NUM_BUCKET_PREFIXES: u32 = 256;

pub(crate) fn get_object_prefix(obj_id: u64) -> String {
let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES;
let mut obj_prefix = prefix.to_string();
obj_prefix.push('/');
obj_prefix
}
}
20 changes: 5 additions & 15 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff};

use super::object_metrics::ObjectStoreMetrics;
use super::{
BoxedStreamingUploader, Bytes, ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult,
ObjectStore, StreamingUploader,
prefix, BoxedStreamingUploader, Bytes, ObjectError, ObjectMetadata, ObjectRangeBounds,
ObjectResult, ObjectStore, StreamingUploader,
};
use crate::object::{try_update_failure_metric, ObjectDataStream, ObjectMetadataIter};

Expand All @@ -69,8 +69,6 @@ const MIN_PART_ID: PartId = 1;
const S3_PART_SIZE: usize = 16 * 1024 * 1024;
// TODO: we should do some benchmark to determine the proper part size for MinIO
const MINIO_PART_SIZE: usize = 16 * 1024 * 1024;
/// The number of S3/MinIO bucket prefixes
const NUM_BUCKET_PREFIXES: u32 = 256;
/// Stop multipart uploads that don't complete within a specified number of days after being
/// initiated. (Day is the smallest granularity)
const S3_INCOMPLETE_MULTIPART_UPLOAD_RETENTION_DAYS: i32 = 1;
Expand Down Expand Up @@ -318,7 +316,7 @@ pub struct S3ObjectStore {
impl ObjectStore for S3ObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
// Delegate to static method to avoid creating an `S3ObjectStore` in unit test.
Self::get_object_prefix(obj_id)
prefix::s3::get_object_prefix(obj_id)
}

async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
Expand Down Expand Up @@ -686,13 +684,6 @@ impl S3ObjectStore {
}
}

fn get_object_prefix(obj_id: u64) -> String {
let prefix = crc32fast::hash(&obj_id.to_be_bytes()) % NUM_BUCKET_PREFIXES;
let mut obj_prefix = prefix.to_string();
obj_prefix.push('/');
obj_prefix
}

/// Generates an HTTP GET request to download the object specified in `path`. If given,
/// `start_pos` and `end_pos` specify the first and last byte to download, respectively. Both
/// are inclusive and 0-based. For example, set `start_pos = 0` and `end_pos = 7` to download
Expand Down Expand Up @@ -1001,8 +992,7 @@ impl tokio_retry::Condition<RetryError> for RetryCondition {
#[cfg(test)]
#[cfg(not(madsim))]
mod tests {
use crate::object::s3::NUM_BUCKET_PREFIXES;
use crate::object::S3ObjectStore;
use crate::object::prefix::s3::{get_object_prefix, NUM_BUCKET_PREFIXES};

fn get_hash_of_object(obj_id: u64) -> u32 {
let crc_hash = crc32fast::hash(&obj_id.to_be_bytes());
Expand All @@ -1013,7 +1003,7 @@ mod tests {
async fn test_get_object_prefix() {
for obj_id in 0..99999 {
let hash = get_hash_of_object(obj_id);
let prefix = S3ObjectStore::get_object_prefix(obj_id);
let prefix = get_object_prefix(obj_id);
assert_eq!(format!("{}/", hash), prefix);
}

Expand Down

0 comments on commit e3f3fb8

Please sign in to comment.