-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: use usize range instead of BlockLocation to read obj #12225
Changes from 2 commits
cb24ac6
dcf7b97
8969e5b
a7d9a1f
8f75b26
a73d8cd
59a1617
167e1b0
974f75a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,22 +14,22 @@ | |
|
||
use std::collections::{HashMap, VecDeque}; | ||
use std::io::Cursor; | ||
use std::ops::RangeBounds; | ||
use std::pin::Pin; | ||
use std::sync::{Arc, LazyLock}; | ||
use std::task::{Context, Poll}; | ||
use std::time::{SystemTime, UNIX_EPOCH}; | ||
|
||
use bytes::{BufMut, Bytes, BytesMut}; | ||
use fail::fail_point; | ||
use futures::future::try_join_all; | ||
use futures::Stream; | ||
use itertools::Itertools; | ||
use thiserror::Error; | ||
use tokio::io::AsyncRead; | ||
use tokio::sync::Mutex; | ||
|
||
use super::{ | ||
BlockLocation, BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectResult, ObjectStore, | ||
BoxedStreamingUploader, ObjectError, ObjectMetadata, ObjectResult, ObjectStore, | ||
StreamingUploader, | ||
}; | ||
use crate::object::ObjectMetadataIter; | ||
|
@@ -130,23 +130,15 @@ impl ObjectStore for InMemObjectStore { | |
})) | ||
} | ||
|
||
async fn read(&self, path: &str, block: Option<BlockLocation>) -> ObjectResult<Bytes> { | ||
async fn read( | ||
&self, | ||
path: &str, | ||
range: impl RangeBounds<usize> + Clone + Send + Sync + 'static, | ||
MrCroxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> ObjectResult<Bytes> { | ||
fail_point!("mem_read_err", |_| Err(ObjectError::internal( | ||
"mem read error" | ||
))); | ||
if let Some(loc) = block { | ||
self.get_object(path, |obj| find_block(obj, loc)).await? | ||
} else { | ||
self.get_object(path, |obj| Ok(obj.clone())).await? | ||
} | ||
} | ||
|
||
async fn readv(&self, path: &str, block_locs: &[BlockLocation]) -> ObjectResult<Vec<Bytes>> { | ||
let futures = block_locs | ||
.iter() | ||
.map(|block_loc| self.read(path, Some(*block_loc))) | ||
.collect_vec(); | ||
try_join_all(futures).await | ||
self.get_object(path, range).await | ||
} | ||
|
||
/// Returns a stream reading the object specified in `path`. If given, the stream starts at the | ||
|
@@ -160,23 +152,10 @@ impl ObjectStore for InMemObjectStore { | |
fail_point!("mem_streaming_read_err", |_| Err(ObjectError::internal( | ||
"mem streaming read error" | ||
))); | ||
|
||
let bytes = if let Some(pos) = start_pos { | ||
self.get_object(path, |obj| { | ||
find_block( | ||
obj, | ||
BlockLocation { | ||
offset: pos, | ||
size: obj.len() - pos, | ||
}, | ||
) | ||
}) | ||
.await? | ||
} else { | ||
self.get_object(path, |obj| Ok(obj.clone())).await? | ||
}; | ||
|
||
Ok(Box::new(Cursor::new(bytes?))) | ||
let bytes = self | ||
.get_object(path, start_pos.unwrap_or_default()..) | ||
.await?; | ||
Ok(Box::new(Cursor::new(bytes))) | ||
} | ||
|
||
async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> { | ||
|
@@ -254,25 +233,34 @@ impl InMemObjectStore { | |
*SHARED.lock() = InMemObjectStore::new(); | ||
} | ||
|
||
async fn get_object<R, F>(&self, path: &str, f: F) -> ObjectResult<R> | ||
where | ||
F: Fn(&Bytes) -> R, | ||
{ | ||
self.objects | ||
.lock() | ||
.await | ||
async fn get_object( | ||
&self, | ||
path: &str, | ||
range: impl RangeBounds<usize> + Clone + Send + Sync + 'static, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. src/storage/src/store.rs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
) -> ObjectResult<Bytes> { | ||
let objects = self.objects.lock().await; | ||
|
||
let obj = objects | ||
.get(path) | ||
.map(|(_, obj)| obj) | ||
.ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)).into()) | ||
.map(f) | ||
} | ||
} | ||
.ok_or_else(|| Error::not_found(format!("no object at path '{}'", path)))?; | ||
|
||
let start = match range.start_bound() { | ||
std::ops::Bound::Included(v) => *v, | ||
std::ops::Bound::Excluded(v) => *v - 1, | ||
std::ops::Bound::Unbounded => 0, | ||
}; | ||
let end = match range.end_bound() { | ||
std::ops::Bound::Included(v) => *v + 1, | ||
std::ops::Bound::Excluded(v) => *v, | ||
std::ops::Bound::Unbounded => obj.len(), | ||
}; | ||
|
||
if end > obj.len() { | ||
return Err(Error::other("bad block offset and size").into()); | ||
} | ||
|
||
fn find_block(obj: &Bytes, block: BlockLocation) -> ObjectResult<Bytes> { | ||
if block.offset + block.size > obj.len() { | ||
Err(Error::other("bad block offset and size").into()) | ||
} else { | ||
Ok(obj.slice(block.offset..(block.offset + block.size))) | ||
Ok(obj.slice(start..end)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not passng the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code here needs to check overflow. |
||
} | ||
} | ||
|
||
|
@@ -326,29 +314,19 @@ mod tests { | |
s3.upload("/abc", block).await.unwrap(); | ||
|
||
// No such object. | ||
let err = s3 | ||
.read("/ab", Some(BlockLocation { offset: 0, size: 3 })) | ||
.await | ||
.unwrap_err(); | ||
let err = s3.read("/ab", 0..3).await.unwrap_err(); | ||
assert!(err.is_object_not_found_error()); | ||
|
||
let bytes = s3 | ||
.read("/abc", Some(BlockLocation { offset: 4, size: 2 })) | ||
.await | ||
.unwrap(); | ||
let bytes = s3.read("/abc", 4..6).await.unwrap(); | ||
assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), "56".to_string()); | ||
|
||
// Overflow. | ||
s3.read("/abc", Some(BlockLocation { offset: 4, size: 4 })) | ||
.await | ||
.unwrap_err(); | ||
s3.read("/abc", 4..8).await.unwrap_err(); | ||
|
||
s3.delete("/abc").await.unwrap(); | ||
|
||
// No such object. | ||
s3.read("/abc", Some(BlockLocation { offset: 0, size: 3 })) | ||
.await | ||
.unwrap_err(); | ||
s3.read("/abc", 0..3).await.unwrap_err(); | ||
} | ||
|
||
#[tokio::test] | ||
|
@@ -365,14 +343,11 @@ mod tests { | |
uploader.finish().await.unwrap(); | ||
|
||
// Read whole object. | ||
let read_obj = store.read("/abc", None).await.unwrap(); | ||
let read_obj = store.read("/abc", ..).await.unwrap(); | ||
assert!(read_obj.eq(&obj)); | ||
|
||
// Read part of the object. | ||
let read_obj = store | ||
.read("/abc", Some(BlockLocation { offset: 4, size: 2 })) | ||
.await | ||
.unwrap(); | ||
let read_obj = store.read("/abc", 4..6).await.unwrap(); | ||
assert_eq!( | ||
String::from_utf8(read_obj.to_vec()).unwrap(), | ||
"56".to_string() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo