Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object_store: get_file and put_file #5281

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::task::Poll;
use std::time::SystemTime;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::VecDeque, path::PathBuf};
use tokio::io::AsyncWrite;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use url::Url;
use walkdir::{DirEntry, WalkDir};

Expand All @@ -64,6 +64,11 @@ pub(crate) enum Error {
path: String,
},

#[snafu(display("Unable to access metadata for the give file: {}", source))]
FileMetadata {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

#[snafu(display("Unable to copy data to file: {}", source))]
UnableToCopyDataToFile {
source: io::Error,
Expand Down Expand Up @@ -104,6 +109,16 @@ pub(crate) enum Error {
path: PathBuf,
},

#[snafu(display("Unable to read data from the given file: {}", source))]
UnableToReadBytesFromFile {
source: io::Error,
},

#[snafu(display("Unable to write data to the given file: {}", source))]
UnableToWriteBytesToFile {
source: io::Error,
},

#[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
OutOfRange {
path: PathBuf,
Expand Down Expand Up @@ -1082,6 +1097,58 @@ fn convert_walkdir_result(
}
}


/// Download a remote object to a local [`File`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these docstrings are the wrong way round.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the reminder.

pub async fn upload(store: &dyn ObjectStore, location: &Path, opts: PutOptions, file: &mut std::fs::File) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the bounds on file be any looser? It might be nice to have any impl Read, although you'd then need to supply the length or additionally use + Seek (which would be less efficient for finding length than a File would be). You could make a container struct which just contains a reader and its total length, then give easy ways of constructing that from a known length, or impl Read + Seek, or a File (although I think you couldn't use TryFrom without specialization as a File is also Read + Seek).

Could also return the number of bytes written rather than an empty tuple, just in case it's useful to anyone (more useful if the source is an arbitrary readable).

Copy link
Contributor

@clbarnes clbarnes Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose an AsyncRead (and AsyncWrite for below) might be better, so that other functions can run while waiting for that IO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the difficulty with AsyncRead / AsyncWrite is there isn't actually an efficient way to implement them, tokio::fs::File calls spawn_blocking for every call, which is hopelessly inefficient

Copy link
Contributor

@clbarnes clbarnes Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because most file systems don't actually have an async API. I guess it's a balance of that overhead vs having this function block in the middle.

Another possibility would be for these upload and download functions to be replaced with something like

pub async fn copy_between_stores(
    src_store: &dyn ObjectStore, src_location: &Path, get_opts: GetOptions,
    tgt_store: &dyn ObjectStore, tgt_location: &Path, put_opts: PutOptions,
) -> Result<usize> {...}

and if either store is a local file system, that's fine. Then we re-use any optimisations we have elsewhere in the crate and it's more flexible, if a bit more verbose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filled #5284 for this, I think I would rather keep the scope of this specific PR down, supporting that properly is very subtle and not easy.

// Determine file size
let metadata = file.metadata().map_err(|e| Error::FileMetadata {
source: e.into(),
})?;
let file_size = metadata.len();

// Set a threshold for when to switch to multipart_put
let multipart_threshold: u64 = 50 * 1024 * 1024;

if file_size <= multipart_threshold {
let mut buffer = Vec::with_capacity(file_size as usize);
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{
source: e
})?;
Comment on lines +1114 to +1116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{
source: e
})?;
file.read_to_end(&mut buffer).context(UnableToReadBytesFromFileSnafu)?;

let bytes = Bytes::from(buffer);
store.put_opts(&location, bytes, opts).await?;
Ok(())
} else {
let (_id, mut writer) = store.put_multipart(&location).await?;
let mut buffer = vec![0u8; 5 * 1024 * 1024];
while let Ok(size) = file.read(&mut buffer) {
if size == 0 {
break;
}
writer.write_all(&buffer[..size]).await.unwrap();
}

writer.flush().await.unwrap();
writer.shutdown().await.unwrap();
Ok(())
}
}


/// Upload a local [`File`] to a remote object store
pub async fn download(store: &dyn ObjectStore, location: &Path, opts: GetOptions, file: &mut File) -> Result<()> {
Copy link
Contributor

@clbarnes clbarnes Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, could File be impl (Async)Write? And it could return the number of bytes written.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intention of using File directly is to handle the specifics that arise from filesystems - e.g. the need for buffering and the lack of async APIs (unless you count platform specific APIs like io_uring)

let get_result = store.get_opts(location, opts).await?;
let mut stream = get_result.into_stream();

while let Some(bytes_result) = stream.next().await {
let bytes = bytes_result?;
file.write_all(&bytes).map_err(|e| Error::UnableToWriteBytesToFile{
source: e.into()
})?;
}
Comment on lines +1140 to +1147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written this will currently perform blocking IO on the current thread, you probably want to spawn_blocking the IO task, and separate it from the producer with a mpsc channel or something. I can try to get an example of this up later today

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be great if you can provide an example. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I ran out of time today, I'll try to get you something on Monday

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry had a crazy few days, something like this should work (not at all tested)

diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index e985ff070c..0969731f32 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -155,6 +155,11 @@ pub(crate) enum Error {
     InvalidPath {
         path: String,
     },
+
+    #[snafu(display("Unable to download data to file: {}", source))]
+    Download {
+        source: io::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -1093,6 +1098,37 @@ fn convert_walkdir_result(
     }
 }
 
+pub async fn download(
+    store: &dyn ObjectStore,
+    location: &Path,
+    opts: GetOptions,
+    file: &mut File,
+) -> Result<()> {
+    let (mut sender, mut receiver) = tokio::sync::mpsc::channel(2);
+    let mut download = store.get_opts(location, opts).await?.into_stream();
+    let forwarder = async move {
+        while let Some(n) = download.next().await.transpose()? {
+            if sender.send(n).await.is_err() {
+                break;
+            }
+        }
+        Ok::<_, crate::Error>(())
+    };
+
+    let mut captured = file.try_clone().context(DownloadSnafu)?;
+    let writer = maybe_spawn_blocking(move || {
+        Ok(async move {
+            while let Some(b) = receiver.blocking_recv() {
+                captured.write_all(&b).context(DownloadSnafu)?;
+            }
+            Ok::<_, crate::Error>(())
+        })
+    });
+
+    let _ = futures::future::try_join(forwarder, writer).await?;
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Ok(())
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down