-
Notifications
You must be signed in to change notification settings - Fork 824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Object_store: get_file and put_file #5281
Conversation
Hi @tustvold,
Thank you for your time. |
@@ -1082,6 +1097,58 @@ fn convert_walkdir_result( | |||
} | |||
} | |||
|
|||
|
|||
/// Download a remote object to a local [`File`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these docstrings are the wrong way round.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the reminder.
@@ -1082,6 +1097,58 @@ fn convert_walkdir_result( | |||
} | |||
} | |||
|
|||
|
|||
/// Download a remote object to a local [`File`] | |||
pub async fn upload(store: &dyn ObjectStore, location: &Path, opts: PutOptions, file: &mut std::fs::File) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the bounds on file
be any looser? It might be nice to have any impl Read
, although you'd then need to supply the length or additionally use + Seek
(which would be less efficient for finding length than a File
would be). You could make a container struct which just contains a reader and its total length, then give easy ways of constructing that from a known length, or impl Read + Seek
, or a File
(although I think you couldn't use TryFrom
without specialization as a File
is also Read + Seek
).
Could also return the number of bytes written rather than an empty tuple, just in case it's useful to anyone (more useful if the source is an arbitrary readable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose an AsyncRead (and AsyncWrite for below) might be better, so that other functions can run while waiting for that IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the difficulty with AsyncRead / AsyncWrite is there isn't actually an efficient way to implement them, tokio::fs::File calls spawn_blocking
for every call, which is hopelessly inefficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because most file systems don't actually have an async API. I guess it's a balance of that overhead vs having this function block in the middle.
Another possibility would be for these upload and download functions to be replaced with something like
pub async fn copy_between_stores(
src_store: &dyn ObjectStore, src_location: &Path, get_opts: GetOptions,
tgt_store: &dyn ObjectStore, tgt_location: &Path, put_opts: PutOptions,
) -> Result<usize> {...}
and if either store is a local file system, that's fine. Then we re-use any optimisations we have elsewhere in the crate and it's more flexible, if a bit more verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've filled #5284 for this, I think I would rather keep the scope of this specific PR down, supporting that properly is very subtle and not easy.
|
||
|
||
/// Upload a local [`File`] to a remote object store | ||
pub async fn download(store: &dyn ObjectStore, location: &Path, opts: GetOptions, file: &mut File) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, could File
be impl (Async)Write
? And it could return the number of bytes written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the intention of using File
directly is to handle the specifics that arise from filesystems - e.g. the need for buffering and the lack of async APIs (unless you count platform specific APIs like io_uring)
This could be in the file writing step; I believe python does some buffering under the hood where rust blocks until the bytes are actually on the file system. Wrap the It's also worth noting that the |
let mut stream = get_result.into_stream(); | ||
|
||
while let Some(bytes_result) = stream.next().await { | ||
let bytes = bytes_result?; | ||
file.write_all(&bytes).map_err(|e| Error::UnableToWriteBytesToFile{ | ||
source: e.into() | ||
})?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As written this will currently perform blocking IO on the current thread, you probably want to spawn_blocking the IO task, and separate it from the producer with a mpsc channel or something. I can try to get an example of this up later today
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will be great if you can provide an example. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I ran out of time today, I'll try to get you something on Monday
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry had a crazy few days, something like this should work (not at all tested)
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index e985ff070c..0969731f32 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -155,6 +155,11 @@ pub(crate) enum Error {
InvalidPath {
path: String,
},
+
+ #[snafu(display("Unable to download data to file: {}", source))]
+ Download {
+ source: io::Error,
+ },
}
impl From<Error> for super::Error {
@@ -1093,6 +1098,37 @@ fn convert_walkdir_result(
}
}
+pub async fn download(
+ store: &dyn ObjectStore,
+ location: &Path,
+ opts: GetOptions,
+ file: &mut File,
+) -> Result<()> {
+ let (mut sender, mut receiver) = tokio::sync::mpsc::channel(2);
+ let mut download = store.get_opts(location, opts).await?.into_stream();
+ let forwarder = async move {
+ while let Some(n) = download.next().await.transpose()? {
+ if sender.send(n).await.is_err() {
+ break;
+ }
+ }
+ Ok::<_, crate::Error>(())
+ };
+
+ let mut captured = file.try_clone().context(DownloadSnafu)?;
+ let writer = maybe_spawn_blocking(move || {
+ Ok(async move {
+ while let Some(b) = receiver.blocking_recv() {
+ captured.write_all(&b).context(DownloadSnafu)?;
+ }
+ Ok::<_, crate::Error>(())
+ })
+ });
+
+ let _ = futures::future::try_join(forwarder, writer).await?;
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use super::*;
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{ | ||
source: e | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{ | |
source: e | |
})?; | |
file.read_to_end(&mut buffer).context(UnableToReadBytesFromFileSnafu)?; |
Which issue does this PR close?
Closes #5277.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?