-
Notifications
You must be signed in to change notification settings - Fork 824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Object_store: get_file and put_file #5281
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -39,7 +39,7 @@ use std::task::Poll; | |||||||||
use std::time::SystemTime; | ||||||||||
use std::{collections::BTreeSet, convert::TryFrom, io}; | ||||||||||
use std::{collections::VecDeque, path::PathBuf}; | ||||||||||
use tokio::io::AsyncWrite; | ||||||||||
use tokio::io::{AsyncWrite, AsyncWriteExt}; | ||||||||||
use url::Url; | ||||||||||
use walkdir::{DirEntry, WalkDir}; | ||||||||||
|
||||||||||
|
@@ -64,6 +64,11 @@ pub(crate) enum Error { | |||||||||
path: String, | ||||||||||
}, | ||||||||||
|
||||||||||
#[snafu(display("Unable to access metadata for the give file: {}", source))] | ||||||||||
FileMetadata { | ||||||||||
source: Box<dyn std::error::Error + Send + Sync + 'static>, | ||||||||||
}, | ||||||||||
|
||||||||||
#[snafu(display("Unable to copy data to file: {}", source))] | ||||||||||
UnableToCopyDataToFile { | ||||||||||
source: io::Error, | ||||||||||
|
@@ -104,6 +109,16 @@ pub(crate) enum Error { | |||||||||
path: PathBuf, | ||||||||||
}, | ||||||||||
|
||||||||||
#[snafu(display("Unable to read data from the given file: {}", source))] | ||||||||||
UnableToReadBytesFromFile { | ||||||||||
source: io::Error, | ||||||||||
}, | ||||||||||
|
||||||||||
#[snafu(display("Unable to write data to the given file: {}", source))] | ||||||||||
UnableToWriteBytesToFile { | ||||||||||
source: io::Error, | ||||||||||
}, | ||||||||||
|
||||||||||
#[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))] | ||||||||||
OutOfRange { | ||||||||||
path: PathBuf, | ||||||||||
|
@@ -1082,6 +1097,58 @@ fn convert_walkdir_result( | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
||||||||||
/// Download a remote object to a local [`File`] | ||||||||||
pub async fn upload(store: &dyn ObjectStore, location: &Path, opts: PutOptions, file: &mut std::fs::File) -> Result<()> { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could the bounds on Could also return the number of bytes written rather than an empty tuple, just in case it's useful to anyone (more useful if the source is an arbitrary readable). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose an AsyncRead (and AsyncWrite for below) might be better, so that other functions can run while waiting for that IO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the difficulty with AsyncRead / AsyncWrite is there isn't actually an efficient way to implement them, tokio::fs::File calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, because most file systems don't actually have an async API. I guess it's a balance of that overhead vs having this function block in the middle. Another possibility would be for these upload and download functions to be replaced with something like pub async fn copy_between_stores(
src_store: &dyn ObjectStore, src_location: &Path, get_opts: GetOptions,
tgt_store: &dyn ObjectStore, tgt_location: &Path, put_opts: PutOptions,
) -> Result<usize> {...} and if either store is a local file system, that's fine. Then we re-use any optimisations we have elsewhere in the crate and it's more flexible, if a bit more verbose. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've filled #5284 for this, I think I would rather keep the scope of this specific PR down, supporting that properly is very subtle and not easy. |
||||||||||
// Determine file size | ||||||||||
let metadata = file.metadata().map_err(|e| Error::FileMetadata { | ||||||||||
source: e.into(), | ||||||||||
})?; | ||||||||||
let file_size = metadata.len(); | ||||||||||
|
||||||||||
// Set a threshold for when to switch to multipart_put | ||||||||||
let multipart_threshold: u64 = 50 * 1024 * 1024; | ||||||||||
|
||||||||||
if file_size <= multipart_threshold { | ||||||||||
let mut buffer = Vec::with_capacity(file_size as usize); | ||||||||||
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{ | ||||||||||
source: e | ||||||||||
})?; | ||||||||||
Comment on lines
+1114
to
+1116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
let bytes = Bytes::from(buffer); | ||||||||||
store.put_opts(&location, bytes, opts).await?; | ||||||||||
Ok(()) | ||||||||||
} else { | ||||||||||
let (_id, mut writer) = store.put_multipart(&location).await?; | ||||||||||
let mut buffer = vec![0u8; 5 * 1024 * 1024]; | ||||||||||
while let Ok(size) = file.read(&mut buffer) { | ||||||||||
if size == 0 { | ||||||||||
break; | ||||||||||
} | ||||||||||
writer.write_all(&buffer[..size]).await.unwrap(); | ||||||||||
} | ||||||||||
|
||||||||||
writer.flush().await.unwrap(); | ||||||||||
writer.shutdown().await.unwrap(); | ||||||||||
Ok(()) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
|
||||||||||
/// Upload a local [`File`] to a remote object store | ||||||||||
pub async fn download(store: &dyn ObjectStore, location: &Path, opts: GetOptions, file: &mut File) -> Result<()> { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, could There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the intention of using |
||||||||||
let get_result = store.get_opts(location, opts).await?; | ||||||||||
let mut stream = get_result.into_stream(); | ||||||||||
|
||||||||||
while let Some(bytes_result) = stream.next().await { | ||||||||||
let bytes = bytes_result?; | ||||||||||
file.write_all(&bytes).map_err(|e| Error::UnableToWriteBytesToFile{ | ||||||||||
source: e.into() | ||||||||||
})?; | ||||||||||
} | ||||||||||
Comment on lines
+1140
to
+1147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As written this will currently perform blocking IO on the current thread, you probably want to spawn_blocking the IO task, and separate it from the producer with a mpsc channel or something. I can try to get an example of this up later today There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That will be great if you can provide an example. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I ran out of time today, I'll try to get you something on Monday There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry had a crazy few days, something like this should work (not at all tested)
|
||||||||||
Ok(()) | ||||||||||
} | ||||||||||
|
||||||||||
|
||||||||||
#[cfg(test)] | ||||||||||
mod tests { | ||||||||||
use super::*; | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these docstrings are the wrong way round.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the reminder.