Skip to content

Commit

Permalink
refactor: use usize range instead of BlockLocation to read obj (#12225)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Sep 12, 2023
1 parent 62901e1 commit 1650a3b
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 309 deletions.
3 changes: 3 additions & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#![feature(result_option_inspect)]
#![feature(negative_impls)]
#![feature(async_fn_in_trait)]
#![feature(bound_map)]

#[macro_use]
pub mod jemalloc;
Expand Down Expand Up @@ -76,6 +77,8 @@ pub mod test_utils;
pub mod types;
pub mod vnode_mapping;

pub mod range;

pub mod test_prelude {
pub use super::array::{DataChunkTestExt, StreamChunkTestExt};
pub use super::catalog::test_utils::ColumnDescTestExt;
Expand Down
103 changes: 103 additions & 0 deletions src/common/src/range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Add, Bound, RangeBounds, Sub};

mod private {

pub trait ZeroOne {
fn zero() -> Self;
fn one() -> Self;
}

macro_rules! impl_one {
($($t:ty),*) => {
$(
impl ZeroOne for $t {
fn zero() -> Self {
0 as $t
}

fn one() -> Self {
1 as $t
}
}
)*
};
}

macro_rules! for_all_num_type {
($macro:ident) => {
$macro! { u8, u16, u32, u64, usize, i8, i16, i32, i64, isize, f32, f64 }
};
}

for_all_num_type! { impl_one }
}

use private::ZeroOne;

pub trait Idx = PartialOrd<Self>
+ Add<Output = Self>
+ Sub<Output = Self>
+ Clone
+ Copy
+ Send
+ Sync
+ 'static
+ ZeroOne;

pub trait RangeBoundsExt<T: Idx>: RangeBounds<T> {
fn start(&self) -> Option<T> {
match self.start_bound() {
Bound::Included(v) => Some(*v),
Bound::Excluded(v) => Some(*v + ZeroOne::one()),
Bound::Unbounded => None,
}
}

fn end(&self) -> Option<T> {
match self.end_bound() {
Bound::Included(v) => Some(*v + ZeroOne::one()),
Bound::Excluded(v) => Some(*v),
Bound::Unbounded => None,
}
}

fn len(&self) -> Option<T> {
let start = self.start()?;
let end = self.end()?;
Some(end - start)
}

fn is_empty(&self) -> bool {
match self.len() {
Some(len) => len == ZeroOne::zero(),
None => false,
}
}

fn is_full(&self) -> bool {
self.start_bound() == Bound::Unbounded && self.end_bound() == Bound::Unbounded
}

fn map<F, R>(&self, f: F) -> (Bound<R>, Bound<R>)
where
F: Fn(&T) -> R,
{
(self.start_bound().map(&f), self.end_bound().map(&f))
}
}

impl<T: Idx, RB: RangeBounds<T>> RangeBoundsExt<T> for RB {}
32 changes: 12 additions & 20 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{BlockLocation, ObjectMetadata, ObjectStoreImpl};
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_pb::hummock::{Level, SstableInfo};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -176,20 +176,15 @@ async fn get_meta_offset_from_object(
obj: &ObjectMetadata,
obj_store: &ObjectStoreImpl,
) -> anyhow::Result<u64> {
let meta_offset_loc = BlockLocation {
offset: obj.total_size
- (
// version, magic
2 * std::mem::size_of::<u32>() +
// footer, checksum
2 * std::mem::size_of::<u64>()
),
size: std::mem::size_of::<u64>(),
};
Ok(obj_store
.read(&obj.key, Some(meta_offset_loc))
.await?
.get_u64_le())
let start = obj.total_size
- (
// version, magic
2 * std::mem::size_of::<u32>() +
// footer, checksum
2 * std::mem::size_of::<u64>()
);
let end = start + std::mem::size_of::<u64>();
Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
}

pub async fn sst_dump_via_sstable_store(
Expand Down Expand Up @@ -281,11 +276,8 @@ async fn print_block(

// Retrieve encoded block data in bytes
let store = sstable_store.store();
let block_loc = BlockLocation {
offset: block_meta.offset as usize,
size: block_meta.len as usize,
};
let block_data = store.read(&data_path, Some(block_loc)).await?;
let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
let block_data = store.read(&data_path, range).await?;

// Retrieve checksum and compression algorithm used from the encoded block data
let len = block_data.len();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl HummockManager {
use prost::Message;
let data = match self
.object_store
.read(&self.version_checkpoint_path, None)
.read(&self.version_checkpoint_path, ..)
.await
{
Ok(data) => data,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2843,7 +2843,7 @@ async fn write_exclusive_cluster_id(
const CLUSTER_ID_NAME: &str = "0";
let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
match object_store.read(&cluster_id_full_path, None).await {
match object_store.read(&cluster_id_full_path, ..).await {
Ok(cluster_id) => Err(ObjectError::internal(format!(
"Data directory is already used by another cluster with id {:?}, path {}.",
String::from_utf8(cluster_id.to_vec()).unwrap(),
Expand Down
94 changes: 25 additions & 69 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ use std::time::{SystemTime, UNIX_EPOCH};

use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use futures::future::try_join_all;
use futures::Stream;
use itertools::Itertools;
use risingwave_common::range::RangeBoundsExt;
use thiserror::Error;
use tokio::io::AsyncRead;
use tokio::sync::Mutex;

use super::{
BlockLocation, BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectResult, ObjectStore,
StreamingUploader,
BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectRangeBounds, ObjectResult,
ObjectStore, StreamingUploader,
};
use crate::object::ObjectMetadataIter;

Expand Down Expand Up @@ -130,23 +130,11 @@ impl ObjectStore for InMemObjectStore {
}))
}

async fn read(&self, path: &str, block: Option<BlockLocation>) -> ObjectResult<Bytes> {
async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
fail_point!("mem_read_err", |_| Err(ObjectError::internal(
"mem read error"
)));
if let Some(loc) = block {
self.get_object(path, |obj| find_block(obj, loc)).await?
} else {
self.get_object(path, |obj| Ok(obj.clone())).await?
}
}

async fn readv(&self, path: &str, block_locs: &[BlockLocation]) -> ObjectResult<Vec<Bytes>> {
let futures = block_locs
.iter()
.map(|block_loc| self.read(path, Some(*block_loc)))
.collect_vec();
try_join_all(futures).await
self.get_object(path, range).await
}

/// Returns a stream reading the object specified in `path`. If given, the stream starts at the
Expand All @@ -160,23 +148,10 @@ impl ObjectStore for InMemObjectStore {
fail_point!("mem_streaming_read_err", |_| Err(ObjectError::internal(
"mem streaming read error"
)));

let bytes = if let Some(pos) = start_pos {
self.get_object(path, |obj| {
find_block(
obj,
BlockLocation {
offset: pos,
size: obj.len() - pos,
},
)
})
.await?
} else {
self.get_object(path, |obj| Ok(obj.clone())).await?
};

Ok(Box::new(Cursor::new(bytes?)))
let bytes = self
.get_object(path, start_pos.unwrap_or_default()..)
.await?;
Ok(Box::new(Cursor::new(bytes)))
}

async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
Expand Down Expand Up @@ -254,25 +229,19 @@ impl InMemObjectStore {
*SHARED.lock() = InMemObjectStore::new();
}

async fn get_object<R, F>(&self, path: &str, f: F) -> ObjectResult<R>
where
F: Fn(&Bytes) -> R,
{
self.objects
.lock()
.await
async fn get_object(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
let objects = self.objects.lock().await;

let obj = objects
.get(path)
.map(|(_, obj)| obj)
.ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)).into())
.map(f)
}
}
.ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)))?;

if let Some(end) = range.end() && end > obj.len() {
return Err(Error::other("bad block offset and size").into());
}

fn find_block(obj: &Bytes, block: BlockLocation) -> ObjectResult<Bytes> {
if block.offset + block.size > obj.len() {
Err(Error::other("bad block offset and size").into())
} else {
Ok(obj.slice(block.offset..(block.offset + block.size)))
Ok(obj.slice(range))
}
}

Expand Down Expand Up @@ -326,29 +295,19 @@ mod tests {
s3.upload("/abc", block).await.unwrap();

// No such object.
let err = s3
.read("/ab", Some(BlockLocation { offset: 0, size: 3 }))
.await
.unwrap_err();
let err = s3.read("/ab", 0..3).await.unwrap_err();
assert!(err.is_object_not_found_error());

let bytes = s3
.read("/abc", Some(BlockLocation { offset: 4, size: 2 }))
.await
.unwrap();
let bytes = s3.read("/abc", 4..6).await.unwrap();
assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_string());

// Overflow.
s3.read("/abc", Some(BlockLocation { offset: 4, size: 4 }))
.await
.unwrap_err();
s3.read("/abc", 4..8).await.unwrap_err();

s3.delete("/abc").await.unwrap();

// No such object.
s3.read("/abc", Some(BlockLocation { offset: 0, size: 3 }))
.await
.unwrap_err();
s3.read("/abc", 0..3).await.unwrap_err();
}

#[tokio::test]
Expand All @@ -365,14 +324,11 @@ mod tests {
uploader.finish().await.unwrap();

// Read whole object.
let read_obj = store.read("/abc", None).await.unwrap();
let read_obj = store.read("/abc", ..).await.unwrap();
assert!(read_obj.eq(&obj));

// Read part of the object.
let read_obj = store
.read("/abc", Some(BlockLocation { offset: 4, size: 2 }))
.await
.unwrap();
let read_obj = store.read("/abc", 4..6).await.unwrap();
assert_eq!(
String::from_utf8(read_obj.to_vec()).unwrap(),
"56".to_string()
Expand Down
Loading

0 comments on commit 1650a3b

Please sign in to comment.