diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 71b96f058c79..11d20b80ac06 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -39,7 +39,7 @@ use std::task::Poll; use std::time::SystemTime; use std::{collections::BTreeSet, convert::TryFrom, io}; use std::{collections::VecDeque, path::PathBuf}; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use url::Url; use walkdir::{DirEntry, WalkDir}; @@ -64,6 +64,11 @@ pub(crate) enum Error { path: String, }, + #[snafu(display("Unable to access metadata for the give file: {}", source))] + FileMetadata { + source: Box, + }, + #[snafu(display("Unable to copy data to file: {}", source))] UnableToCopyDataToFile { source: io::Error, @@ -104,6 +109,16 @@ pub(crate) enum Error { path: PathBuf, }, + #[snafu(display("Unable to read data from the given file: {}", source))] + UnableToReadBytesFromFile { + source: io::Error, + }, + + #[snafu(display("Unable to write data to the given file: {}", source))] + UnableToWriteBytesToFile { + source: io::Error, + }, + #[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))] OutOfRange { path: PathBuf, @@ -1082,6 +1097,58 @@ fn convert_walkdir_result( } } + +/// Download a remote object to a local [`File`] +pub async fn upload(store: &dyn ObjectStore, location: &Path, opts: PutOptions, file: &mut std::fs::File) -> Result<()> { + // Determine file size + let metadata = file.metadata().map_err(|e| Error::FileMetadata { + source: e.into(), + })?; + let file_size = metadata.len(); + + // Set a threshold for when to switch to multipart_put + let multipart_threshold: u64 = 50 * 1024 * 1024; + + if file_size <= multipart_threshold { + let mut buffer = Vec::with_capacity(file_size as usize); + file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{ + source: e + })?; + let bytes = Bytes::from(buffer); + store.put_opts(&location, bytes, opts).await?; + Ok(()) + } else { + let (_id, mut writer) = store.put_multipart(&location).await?; + let mut buffer = vec![0u8; 5 * 1024 * 1024]; + while let Ok(size) = file.read(&mut buffer) { + if size == 0 { + break; + } + writer.write_all(&buffer[..size]).await.unwrap(); + } + + writer.flush().await.unwrap(); + writer.shutdown().await.unwrap(); + Ok(()) + } +} + + +/// Upload a local [`File`] to a remote object store +pub async fn download(store: &dyn ObjectStore, location: &Path, opts: GetOptions, file: &mut File) -> Result<()> { + let get_result = store.get_opts(location, opts).await?; + let mut stream = get_result.into_stream(); + + while let Some(bytes_result) = stream.next().await { + let bytes = bytes_result?; + file.write_all(&bytes).map_err(|e| Error::UnableToWriteBytesToFile{ + source: e.into() + })?; + } + Ok(()) +} + + #[cfg(test)] mod tests { use super::*;