Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use usize range instead of BlockLocation to read obj #12225

Merged
merged 9 commits into from
Sep 12, 2023
32 changes: 12 additions & 20 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionExt;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{BlockLocation, ObjectMetadata, ObjectStoreImpl};
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_pb::hummock::{Level, SstableInfo};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -176,20 +176,15 @@ async fn get_meta_offset_from_object(
obj: &ObjectMetadata,
obj_store: &ObjectStoreImpl,
) -> anyhow::Result<u64> {
let meta_offset_loc = BlockLocation {
offset: obj.total_size
- (
// version, magic
2 * std::mem::size_of::<u32>() +
// footer, checksum
2 * std::mem::size_of::<u64>()
),
size: std::mem::size_of::<u64>(),
};
Ok(obj_store
.read(&obj.key, Some(meta_offset_loc))
.await?
.get_u64_le())
let start = obj.total_size
- (
// version, magic
2 * std::mem::size_of::<u32>() +
// footer, checksum
2 * std::mem::size_of::<u64>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

);
let end = start + std::mem::size_of::<u64>();
Ok(obj_store.read(&obj.key, start..end).await?.get_u64_le())
}

pub async fn sst_dump_via_sstable_store(
Expand Down Expand Up @@ -281,11 +276,8 @@ async fn print_block(

// Retrieve encoded block data in bytes
let store = sstable_store.store();
let block_loc = BlockLocation {
offset: block_meta.offset as usize,
size: block_meta.len as usize,
};
let block_data = store.read(&data_path, Some(block_loc)).await?;
let range = block_meta.offset as usize..block_meta.offset as usize + block_meta.len as usize;
let block_data = store.read(&data_path, range).await?;

// Retrieve checksum and compression algorithm used from the encoded block data
let len = block_data.len();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl HummockManager {
use prost::Message;
let data = match self
.object_store
.read(&self.version_checkpoint_path, None)
.read(&self.version_checkpoint_path, ..)
.await
{
Ok(data) => data,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2843,7 +2843,7 @@ async fn write_exclusive_cluster_id(
const CLUSTER_ID_NAME: &str = "0";
let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
match object_store.read(&cluster_id_full_path, None).await {
match object_store.read(&cluster_id_full_path, ..).await {
Ok(cluster_id) => Err(ObjectError::internal(format!(
"Data directory is already used by another cluster with id {:?}, path {}.",
String::from_utf8(cluster_id.to_vec()).unwrap(),
Expand Down
1 change: 1 addition & 0 deletions src/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
#![feature(lint_reasons)]
#![feature(error_generic_member_access)]
#![feature(let_chains)]
#![feature(bound_map)]

pub mod object;
111 changes: 43 additions & 68 deletions src/object_store/src/object/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@

use std::collections::{HashMap, VecDeque};
use std::io::Cursor;
use std::ops::RangeBounds;
use std::pin::Pin;
use std::sync::{Arc, LazyLock};
use std::task::{Context, Poll};
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use futures::future::try_join_all;
use futures::Stream;
use itertools::Itertools;
use thiserror::Error;
use tokio::io::AsyncRead;
use tokio::sync::Mutex;

use super::{
BlockLocation, BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectResult, ObjectStore,
BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectResult, ObjectStore,
StreamingUploader,
};
use crate::object::ObjectMetadataIter;
Expand Down Expand Up @@ -130,23 +130,15 @@ impl ObjectStore for InMemObjectStore {
}))
}

async fn read(&self, path: &str, block: Option<BlockLocation>) -> ObjectResult<Bytes> {
async fn read(
&self,
path: &str,
range: impl RangeBounds<usize> + Clone + Send + Sync + 'static,
MrCroxx marked this conversation as resolved.
Show resolved Hide resolved
) -> ObjectResult<Bytes> {
fail_point!("mem_read_err", |_| Err(ObjectError::internal(
"mem read error"
)));
if let Some(loc) = block {
self.get_object(path, |obj| find_block(obj, loc)).await?
} else {
self.get_object(path, |obj| Ok(obj.clone())).await?
}
}

async fn readv(&self, path: &str, block_locs: &[BlockLocation]) -> ObjectResult<Vec<Bytes>> {
let futures = block_locs
.iter()
.map(|block_loc| self.read(path, Some(*block_loc)))
.collect_vec();
try_join_all(futures).await
self.get_object(path, range).await
}

/// Returns a stream reading the object specified in `path`. If given, the stream starts at the
Expand All @@ -160,23 +152,10 @@ impl ObjectStore for InMemObjectStore {
fail_point!("mem_streaming_read_err", |_| Err(ObjectError::internal(
"mem streaming read error"
)));

let bytes = if let Some(pos) = start_pos {
self.get_object(path, |obj| {
find_block(
obj,
BlockLocation {
offset: pos,
size: obj.len() - pos,
},
)
})
.await?
} else {
self.get_object(path, |obj| Ok(obj.clone())).await?
};

Ok(Box::new(Cursor::new(bytes?)))
let bytes = self
.get_object(path, start_pos.unwrap_or_default()..)
.await?;
Ok(Box::new(Cursor::new(bytes)))
}

async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
Expand Down Expand Up @@ -254,25 +233,34 @@ impl InMemObjectStore {
*SHARED.lock() = InMemObjectStore::new();
}

async fn get_object<R, F>(&self, path: &str, f: F) -> ObjectResult<R>
where
F: Fn(&Bytes) -> R,
{
self.objects
.lock()
.await
async fn get_object(
&self,
path: &str,
range: impl RangeBounds<usize> + Clone + Send + Sync + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/storage/src/store.rs StaticSendSync

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

range here requires a lot more trait bounds, IMO, expend them here is also okay.

) -> ObjectResult<Bytes> {
let objects = self.objects.lock().await;

let obj = objects
.get(path)
.map(|(_, obj)| obj)
.ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)).into())
.map(f)
}
}
.ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)))?;

let start = match range.start_bound() {
std::ops::Bound::Included(v) => *v,
std::ops::Bound::Excluded(v) => *v - 1,
std::ops::Bound::Unbounded => 0,
};
let end = match range.end_bound() {
std::ops::Bound::Included(v) => *v + 1,
std::ops::Bound::Excluded(v) => *v,
std::ops::Bound::Unbounded => obj.len(),
};

if end > obj.len() {
return Err(Error::other("bad block offset and size").into());
}

fn find_block(obj: &Bytes, block: BlockLocation) -> ObjectResult<Bytes> {
if block.offset + block.size > obj.len() {
Err(Error::other("bad block offset and size").into())
} else {
Ok(obj.slice(block.offset..(block.offset + block.size)))
Ok(obj.slice(start..end))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not passng the range directly to obj.slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code here needs to check overflow.

}
}

Expand Down Expand Up @@ -326,29 +314,19 @@ mod tests {
s3.upload("/abc", block).await.unwrap();

// No such object.
let err = s3
.read("/ab", Some(BlockLocation { offset: 0, size: 3 }))
.await
.unwrap_err();
let err = s3.read("/ab", 0..3).await.unwrap_err();
assert!(err.is_object_not_found_error());

let bytes = s3
.read("/abc", Some(BlockLocation { offset: 4, size: 2 }))
.await
.unwrap();
let bytes = s3.read("/abc", 4..6).await.unwrap();
assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_string());

// Overflow.
s3.read("/abc", Some(BlockLocation { offset: 4, size: 4 }))
.await
.unwrap_err();
s3.read("/abc", 4..8).await.unwrap_err();

s3.delete("/abc").await.unwrap();

// No such object.
s3.read("/abc", Some(BlockLocation { offset: 0, size: 3 }))
.await
.unwrap_err();
s3.read("/abc", 0..3).await.unwrap_err();
}

#[tokio::test]
Expand All @@ -365,14 +343,11 @@ mod tests {
uploader.finish().await.unwrap();

// Read whole object.
let read_obj = store.read("/abc", None).await.unwrap();
let read_obj = store.read("/abc", ..).await.unwrap();
assert!(read_obj.eq(&obj));

// Read part of the object.
let read_obj = store
.read("/abc", Some(BlockLocation { offset: 4, size: 2 }))
.await
.unwrap();
let read_obj = store.read("/abc", 4..6).await.unwrap();
assert_eq!(
String::from_utf8(read_obj.to_vec()).unwrap(),
"56".to_string()
Expand Down
Loading
Loading