Skip to content

Commit

Permalink
Use u64 range instead of usize, for better wasm32 support (apache…
Browse files Browse the repository at this point in the history
…#6961)

* u64 ranges

* more u64

* make clippy happy

* even more u64

* Update object_store/src/lib.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update object_store/src/lib.rs

Co-authored-by: Andrew Lamb <[email protected]>

* address comments

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and totoroyyb committed Jan 20, 2025
1 parent 288df81 commit 4b8ef3b
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 113 deletions.
2 changes: 1 addition & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ impl TryFrom<Blob> for ObjectMeta {
Ok(Self {
location: Path::parse(value.name)?,
last_modified: value.properties.last_modified,
size: value.properties.content_length as usize,
size: value.properties.content_length,
e_tag: value.properties.e_tag,
version: None, // For consistency with S3 and GCP which don't include this
})
Expand Down
8 changes: 4 additions & 4 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{PutPayload, Result};
#[derive(Debug)]
pub struct ChunkedStore {
inner: Arc<dyn ObjectStore>,
chunk_size: usize,
chunk_size: usize, // chunks are in memory, so we use usize not u64
}

impl ChunkedStore {
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ObjectStore for ChunkedStore {
})
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.inner.get_range(location, range).await
}

Expand Down Expand Up @@ -203,8 +203,8 @@ mod tests {

let mut remaining = 1001;
while let Some(next) = s.next().await {
let size = next.unwrap().len();
let expected = remaining.min(chunk_size);
let size = next.unwrap().len() as u64;
let expected = remaining.min(chunk_size as u64);
assert_eq!(size, expected);
remaining -= expected;
}
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ impl<T: GetClient> GetClientExt for T {

struct ContentRange {
/// The range of the object returned
range: Range<usize>,
range: Range<u64>,
/// The total size of the object being requested
size: usize,
size: u64,
}

impl ContentRange {
Expand All @@ -84,7 +84,7 @@ impl ContentRange {
let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;
let end: u64 = end_s.parse().ok()?;

Some(Self {
size,
Expand Down Expand Up @@ -140,8 +140,8 @@ enum GetResultError {

#[error("Requested {expected:?}, got {actual:?}")]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
expected: Range<u64>,
actual: Range<u64>,
},
}

Expand Down
2 changes: 1 addition & 1 deletion object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct ListPrefix {
#[serde(rename_all = "PascalCase")]
pub struct ListContents {
pub key: String,
pub size: usize,
pub size: u64,
pub last_modified: DateTime<Utc>,
#[serde(rename = "ETag")]
pub e_tag: Option<String>,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl MultiStatusResponse {
})?)
}

fn size(&self) -> Result<usize> {
fn size(&self) -> Result<u64> {
let size = self
.prop_stat
.prop
Expand Down Expand Up @@ -462,7 +462,7 @@ pub(crate) struct Prop {
last_modified: DateTime<Utc>,

#[serde(rename = "getcontentlength")]
content_length: Option<usize>,
content_length: Option<u64>,

#[serde(rename = "resourcetype")]
resource_type: ResourceType,
Expand Down
10 changes: 5 additions & 5 deletions object_store/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
let range_result = storage.get_range(&location, range.clone()).await;

let bytes = range_result.unwrap();
assert_eq!(bytes, data.slice(range.clone()));
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));

let opts = GetOptions {
range: Some(GetRange::Bounded(2..5)),
Expand Down Expand Up @@ -190,11 +190,11 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, data.slice(range.clone()))
assert_eq!(bytes, data.slice(range.start as usize..range.end as usize));
}

let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, data.len());
assert_eq!(head.size, data.len() as u64);

storage.delete(&location).await.unwrap();

Expand Down Expand Up @@ -934,7 +934,7 @@ pub async fn list_with_delimiter(storage: &DynObjectStore) {
let object = &result.objects[0];

assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
assert_eq!(object.size, data.len() as u64);

// ==================== check: prefix-list `mydb/wb/000/000/001` (partial filename doesn't match) ====================
let prefix = Path::from("mydb/wb/000/000/001");
Expand Down Expand Up @@ -1085,7 +1085,7 @@ pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore
.unwrap();

let meta = storage.head(&path).await.unwrap();
assert_eq!(meta.size, chunk_size * 2);
assert_eq!(meta.size, chunk_size as u64 * 2);

// Empty case
let path = Path::from("test_empty_multipart");
Expand Down
26 changes: 17 additions & 9 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@
//!
//! // Buffer the entire object in memory
//! let object: Bytes = result.bytes().await.unwrap();
//! assert_eq!(object.len(), meta.size);
//! assert_eq!(object.len() as u64, meta.size);
//!
//! // Alternatively stream the bytes from object storage
//! let stream = object_store.get(&path).await.unwrap().into_stream();
Expand Down Expand Up @@ -630,7 +630,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.into()),
..Default::default()
Expand All @@ -640,7 +640,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {

/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
Expand Down Expand Up @@ -820,14 +820,14 @@ macro_rules! as_ref_impl {
self.as_ref().get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.as_ref().get_range(location, range).await
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
ranges: &[Range<u64>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
}
Expand Down Expand Up @@ -903,8 +903,10 @@ pub struct ObjectMeta {
pub location: Path,
/// The last modified time
pub last_modified: DateTime<Utc>,
/// The size in bytes of the object
pub size: usize,
/// The size in bytes of the object.
///
/// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
pub size: u64,
/// The unique identifier for the object
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
Expand Down Expand Up @@ -1019,7 +1021,9 @@ pub struct GetResult {
/// The [`ObjectMeta`] for this object
pub meta: ObjectMeta,
/// The range of bytes returned by this request
pub range: Range<usize>,
///
/// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
pub range: Range<u64>,
/// Additional object attributes
pub attributes: Attributes,
}
Expand Down Expand Up @@ -1060,7 +1064,11 @@ impl GetResult {
path: path.clone(),
})?;

let mut buffer = Vec::with_capacity(len);
let mut buffer = if let Ok(len) = len.try_into() {
Vec::with_capacity(len)
} else {
Vec::new()
};
file.take(len as _)
.read_to_end(&mut buffer)
.map_err(|source| local::Error::UnableToReadBytes { source, path })?;
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
Ok(permit_get_result(r, permit))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
}
Expand Down
72 changes: 32 additions & 40 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::sync::Arc;
use std::time::SystemTime;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::BTreeSet, io};
use std::{collections::VecDeque, path::PathBuf};

use async_trait::async_trait;
Expand All @@ -44,12 +44,6 @@ use crate::{
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("File size for {} did not fit in a usize: {}", path, source)]
FileSizeOverflowedUsize {
source: std::num::TryFromIntError,
path: String,
},

#[error("Unable to walk dir: {}", source)]
UnableToWalkDir { source: walkdir::Error },

Expand Down Expand Up @@ -83,8 +77,8 @@ pub(crate) enum Error {
#[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)]
OutOfRange {
path: PathBuf,
expected: usize,
actual: usize,
expected: u64,
actual: u64,
},

#[error("Requested range was invalid")]
Expand Down Expand Up @@ -410,7 +404,7 @@ impl ObjectStore for LocalFileSystem {
let path = self.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
let (file, metadata) = open_file(&path)?;
let meta = convert_metadata(metadata, location)?;
let meta = convert_metadata(metadata, location);
options.check_preconditions(&meta)?;

let range = match options.range {
Expand All @@ -430,7 +424,7 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, _) = open_file(&path)?;
Expand All @@ -439,7 +433,7 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let path = self.path_to_filesystem(location)?;
let ranges = ranges.to_vec();
maybe_spawn_blocking(move || {
Expand Down Expand Up @@ -825,7 +819,7 @@ impl Drop for LocalUpload {
pub(crate) fn chunked_stream(
mut file: File,
path: PathBuf,
range: Range<usize>,
range: Range<u64>,
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
futures::stream::once(async move {
Expand All @@ -847,17 +841,23 @@ pub(crate) fn chunked_stream(
return Ok(None);
}

let to_read = remaining.min(chunk_size);
let mut buffer = Vec::with_capacity(to_read);
let to_read = remaining.min(chunk_size as u64);
let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
source: InvalidGetRange::TooLarge {
requested: to_read,
max: usize::MAX as u64,
},
})?;
let mut buffer = Vec::with_capacity(cap);
let read = (&mut file)
.take(to_read as u64)
.take(to_read)
.read_to_end(&mut buffer)
.map_err(|e| Error::UnableToReadBytes {
source: e,
path: path.clone(),
})?;

Ok(Some((buffer.into(), (file, path, remaining - read))))
Ok(Some((buffer.into(), (file, path, remaining - read as u64))))
})
},
);
Expand All @@ -867,22 +867,18 @@ pub(crate) fn chunked_stream(
.boxed()
}

pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<usize>) -> Result<Bytes> {
pub(crate) fn read_range(file: &mut File, path: &PathBuf, range: Range<u64>) -> Result<Bytes> {
let to_read = range.end - range.start;
file.seek(SeekFrom::Start(range.start as u64))
.map_err(|source| {
let path = path.into();
Error::Seek { source, path }
})?;
file.seek(SeekFrom::Start(range.start)).map_err(|source| {
let path = path.into();
Error::Seek { source, path }
})?;

let mut buf = Vec::with_capacity(to_read);
let read = file
.take(to_read as u64)
.read_to_end(&mut buf)
.map_err(|source| {
let path = path.into();
Error::UnableToReadBytes { source, path }
})?;
let mut buf = Vec::with_capacity(to_read as usize);
let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| {
let path = path.into();
Error::UnableToReadBytes { source, path }
})? as u64;

if read != to_read {
let error = Error::OutOfRange {
Expand Down Expand Up @@ -922,7 +918,7 @@ fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {

fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
match entry.metadata() {
Ok(metadata) => convert_metadata(metadata, location).map(Some),
Ok(metadata) => Ok(Some(convert_metadata(metadata, location))),
Err(e) => {
if let Some(io_err) = e.io_error() {
if io_err.kind() == ErrorKind::NotFound {
Expand Down Expand Up @@ -960,20 +956,16 @@ fn get_etag(metadata: &Metadata) -> String {
format!("{inode:x}-{mtime:x}-{size:x}")
}

fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta {
let last_modified = last_modified(&metadata);
let size = usize::try_from(metadata.len()).map_err(|source| {
let path = location.as_ref().into();
Error::FileSizeOverflowedUsize { source, path }
})?;

Ok(ObjectMeta {
ObjectMeta {
location,
last_modified,
size,
size: metadata.len(),
e_tag: Some(get_etag(&metadata)),
version: None,
})
}
}

#[cfg(unix)]
Expand Down
Loading

0 comments on commit 4b8ef3b

Please sign in to comment.