Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable download method to object_store for enhanced usability #6837

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
40 changes: 40 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,46 @@ impl GetResult {
}
}

/// Configuration for controlling transfer behavior.
#[derive(Debug, Clone, Copy)]
pub struct TransferOptions {
/// Maximum number of concurrent chunks to transfer.
pub concurrent_tasks: usize,
/// Maximum number of chunks to buffer in memory during the transfer.
/// Defaults to `concurrent_tasks` if `None`.
pub buffer_capacity: Option<usize>,
/// Maximum number of retries for a chunk transfer.
pub max_retries: Option<usize>,
}

impl TransferOptions {
/// Creates a new `TransferOptions` with the specified parameters.
pub fn new(
concurrent_tasks: usize,
buffer_capacity: Option<usize>,
max_retries: Option<usize>,
) -> Self {
let buffer_capacity = buffer_capacity.or(Some(concurrent_tasks));
let max_retries = max_retries.or(Some(3));
Self {
concurrent_tasks,
buffer_capacity,
max_retries,
}
}
}

/// Default implementation for `TransferOptions`.
impl Default for TransferOptions {
fn default() -> Self {
Self {
concurrent_tasks: 1,
buffer_capacity: Some(1),
max_retries: None,
}
}
}

/// Configure preconditions for the put operation
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum PutMode {
Expand Down
254 changes: 252 additions & 2 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
TransferOptions, UploadPart,
};

/// A specialized `Error` for filesystem object store-related errors
Expand Down Expand Up @@ -155,6 +156,30 @@ pub(crate) enum Error {

#[snafu(display("Upload aborted"))]
Aborted,

#[snafu(display("Failed to seek to position in file: {}", source))]
SeekFile {
source: io::Error,
},

#[snafu(display("Failed to write to file: {}", source))]
WriteFile {
source: io::Error,
},

#[snafu(display("Failed to send to channel: {}", source))]
UnableSendToChannel {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to download file from '{}': {}", path, source))]
DownloadFile {
source: io::Error,
path: String,
},

#[snafu(display("Download was aborted by user or system"))]
DownloadAborted,
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -995,6 +1020,231 @@ fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
})
}

/// Downloads a single chunk from the object store.
///
/// # Arguments
/// - `store`: A reference-counted `ObjectStore` instance used for downloading.
/// - `location`: The path in the object store to download from.
/// - `opts`: Options for the download request, such as range and metadata preferences.
/// - `sender`: A sender for transmitting downloaded data chunks to the processing pipeline.
/// - `cancellation_alert`: A watch receiver to monitor for cancellation signals, allowing
/// the download to abort gracefully if needed.
/// - `max_retries`: The maximum number of retry attempts for a failed download.
///
/// # Returns
/// A `Result` indicating success or failure. On success, it returns `()`; on failure, it returns
/// an `Error`.
///
/// # Details
/// The function listens for cancellation signals using `cancellation_alert` and exits early
/// if a cancellation is detected. It also retries download attempts up to `max_retries` in case
/// of transient errors.
async fn download_chunk(
store: Arc<dyn ObjectStore>,
location: Path,
opts: GetOptions,
sender: tokio::sync::mpsc::Sender<(usize, Bytes)>,
cancellation_alert: tokio::sync::watch::Receiver<bool>,
max_retries: usize,
) -> Result<(), Error> {
let mut attempt = 0;
let request = store
.get_opts(&location, opts)
.await
.map_err(|e| Error::Metadata {
source: e.into(),
path: location.to_string(),
})?;

if let GetResultPayload::Stream(mut stream) = request.payload {
let mut offset = request.range.start;
'download_chunk: while !*cancellation_alert.borrow() {
let buffer = match stream.try_next().await {
Ok(Some(buffer)) => buffer,
Ok(None) => break 'download_chunk,
Err(_) if attempt < max_retries => {
attempt += 1;
continue;
}
Err(e) => {
return Err(Error::DownloadFile {
source: e.into(),
path: location.to_string(),
});
}
};

let bytes_readed = buffer.len();
sender
.send((offset, buffer))
.await
.map_err(|e| Error::UnableSendToChannel { source: e.into() })?;
offset += bytes_readed;
}

if *cancellation_alert.borrow() {
return Err(Error::DownloadAborted);
}
}
Ok(())
}

/// Writes multiple chunks of downloaded data into a local file.
///
/// # Arguments
/// - `file`: A mutable reference to the target local file for writing.
/// - `receiver`: A mutable receiver for fetching data chunks from the download pipeline.
/// - `cancellation_alert`: A watch receiver to monitor for cancellation signals, allowing
/// the writing process to abort gracefully if needed.
///
/// # Returns
/// A `Result` containing the total number of bytes written or an error if the write operation fails.
///
/// # Details
/// This function listens for cancellation signals via `cancellation_alert` and stops processing
/// further chunks if cancellation is detected. It ensures that no unnecessary writes are performed
/// once a cancellation request is received.

async fn write_multi_chunks(
file: &mut File,
receiver: &mut tokio::sync::mpsc::Receiver<(usize, Bytes)>,
cancellation_alert: tokio::sync::watch::Receiver<bool>,
) -> Result<u64, Error> {
let mut data = 0;

while let Some((offset, buffer)) = receiver.recv().await {
if *cancellation_alert.borrow() {
return Err(Error::DownloadAborted);
}

file.seek(SeekFrom::Start(offset as u64))
.map_err(|e| Error::SeekFile { source: e })?;
file.write_all(&buffer)
.map_err(|e| Error::WriteFile { source: e })?;

data += buffer.len() as u64;
}

Ok(data)
}

/// Downloads a file from the object store to a local file.
///
/// # Arguments
/// - `store`: A reference-counted `ObjectStore` instance used for the download.
/// - `location`: The path in the object store to download from.
/// - `opts`: Options for the download request.
/// - `file`: A mutable reference to the local file where the downloaded data will be written.
/// - `transfer_opts`: Optional transfer configuration for managing concurrent downloads and memory usage.
///
/// # Details
/// The function determines the number of concurrent tasks (`concurrent_tasks`) and the size of the
/// channel buffer (`channel_size`). If `buffer_capacity` is not provided, it defaults to the value of
/// `concurrent_tasks`. The total size of the file is divided into chunks, and each chunk is
/// processed by a concurrent task.
///
/// The download process listens for cancellation signals via a shared `cancellation_alert`. If a
/// cancellation is requested, the process terminates early, and no further data is downloaded or written.
///
/// # Returns
/// A `Result` containing the total number of bytes written to the local file or an error if the download fails.
///
/// # Notes
/// This function spawns concurrent tasks for downloading and writing data chunks. The `write_multi_chunks`
/// task ensures that cancellation signals are respected, stopping all operations promptly.

pub async fn download(
store: Arc<dyn ObjectStore>,
location: &Path,
opts: GetOptions,
mut file: std::fs::File,
transfer_opts: Option<&TransferOptions>,
) -> Result<u64> {
let req = store.get_opts(&location, opts.clone()).await?;
let transfer_opts = *transfer_opts.unwrap_or(&TransferOptions::default());
let concurrent_tasks = transfer_opts.concurrent_tasks;
let channel_size = transfer_opts.buffer_capacity.unwrap_or(concurrent_tasks);
let mut written_bytes: u64 = 0;
match req.payload {
GetResultPayload::Stream(_) => {
let obj_size = req.meta.size;
let chunk_size = (obj_size as f64 / concurrent_tasks as f64).ceil() as usize;
let (sender, mut receiver) = tokio::sync::mpsc::channel(channel_size);
let (notify_cancellation, cancellation_alert) = tokio::sync::watch::channel(false);

#[derive(Debug)]
enum TaskResult {
Download(Result<(), Error>),
Write(Result<u64, Error>),
}

let mut tasks = tokio::task::JoinSet::new();
for i in 0..transfer_opts.concurrent_tasks {
let chunk_start = i * chunk_size;
let chunk_end = std::cmp::min((i + 1) * chunk_size - 1, obj_size - 1);
let ranged_opts = GetOptions {
range: Some(GetRange::Bounded(chunk_start..(chunk_end + 1)).into()),
..opts.clone()
};
let location_clone = location.clone();
let sender_clone = sender.clone();
let store_clone = Arc::clone(&store);
let max_retries = transfer_opts.max_retries.unwrap_or(0);
let cancellation_alert_clone = cancellation_alert.clone();
tasks.spawn(async move {
TaskResult::Download(
download_chunk(
store_clone,
location_clone,
ranged_opts,
sender_clone,
cancellation_alert_clone,
max_retries,
)
.await,
)
});
}
drop(sender);
tasks.spawn(async move {
TaskResult::Write(
write_multi_chunks(&mut file, &mut receiver, cancellation_alert).await,
)
});

while let Some(result) = tasks.join_next().await {
match result {
Ok(TaskResult::Download(Ok(()))) => {}
Ok(TaskResult::Download(Err(e))) => {
eprintln!("Error en descarga: {:?}", e);
let _ = notify_cancellation.send(true);
return Err(e.into());
}
Ok(TaskResult::Write(Ok(bytes))) => {
written_bytes = bytes;
}
Ok(TaskResult::Write(Err(e))) => {
eprintln!("Error en escritura: {:?}", e);
let _ = notify_cancellation.send(true);
return Err(e.into());
}
Err(e) => {
eprintln!("Error crítico al ejecutar una tarea: {:?}", e);
let _ = notify_cancellation.send(true);
return Err(e.into());
}
}
}
}
GetResultPayload::File(mut source_file, _path) => {
let mut file = file.try_clone().unwrap();
written_bytes = std::io::copy(&mut source_file, &mut file)
.map_err(|e| Error::WriteFile { source: e })?;
}
}
Ok(written_bytes)
}

#[cfg(unix)]
/// We include the inode when available to yield an ETag more resistant to collisions
/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
Expand Down
Loading