Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object_store: put_file and get_file methods #5277

Open
troychiu opened this issue Jan 3, 2024 · 11 comments · May be fixed by #6837
Open

Object_store: put_file and get_file methods #5277

troychiu opened this issue Jan 3, 2024 · 11 comments · May be fixed by #6837
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers help wanted

Comments

@troychiu
Copy link

troychiu commented Jan 3, 2024

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As a user, I would like to utilize the object_store to put and retrieve files. Currently, I can employ the put and get methods for this purpose. However, I am responsible for handling any issues that arise. For instance, when uploading a file to the store, I must verify its size to determine whether multipart upload is necessary. Similarly, when retrieving a large file, I cannot download it in its entirety before writing it to the local file system, as this would cause excessive memory usage. Instead, I may need to use get_range and write specific portions of the file locally. Moreover, downloading different ranges could potentially be executed in parallel.

Describe the solution you'd like
Expose get_file(local_path, remote_path) and put_file(local_path, remote_path) to be more user-friendly.

Describe alternatives you've considered
Stay on what we have now, which means users should implement the functionalities by themself.

@troychiu troychiu added the enhancement Any new improvement worthy of a entry in the changelog label Jan 3, 2024
@tustvold
Copy link
Contributor

tustvold commented Jan 3, 2024

I think adding some free functions that perform this logic for an arbitrary &dyn ObjectStore would be a worthwhile addition, and should be relatively straightforward.

I would suggest the method take a std::fs::File as opposed to a path, to make it more flexible and support things like temporary files.

I may need to use get_range and write specific portions of the file locally.

FWIW you should be able to make use of https://docs.rs/object_store/latest/object_store/struct.GetResult.html#method.into_stream to get a stream of bytes, and avoid buffering the entire file in memory

@troychiu
Copy link
Author

troychiu commented Jan 4, 2024

Sounds good. I think we can have four methods.

get_file_opts(&self, location: &Path, file: &std::fs::File, options: GetOptions) -> Result<()> // Implement it with get
get_file(&self, location: &Path, file: &std::fs::File) -> Result<()> // (which will call get_file_opts with default option)
put_file_opts(&self, location: &Path, file: &std::fs::File, options: PutOptions) -> Result<()>  // Implement it with put and put_multipart depending on the file size
put_file(&self, location: &Path, file: &std::fs::File) -> Result<()> // (which will call put_file_opts with default option)

How do you think?

@tustvold
Copy link
Contributor

tustvold commented Jan 4, 2024

I would expect something along the lines of the following free functions added to local.rs

/// Download a remote object to a local [`File`]
pub async fn download(store: &dyn ObjectStore, location: &Path, opts: GetOptions, file: &mut std::fs::File) -> Result<()> {
    ...
}

/// Upload a local [`File`] to a remote object store
pub async fn upload(store: &dyn ObjectStore, location: &Path, opts: PutOptions, file: &mut std::fs::File) -> Result<()> {
    ...
}

I would like to avoid adding these methods to the ObjectStore trait, as it is already getting quite large and these methods are unlikely to benefit from specialization

@troychiu
Copy link
Author

troychiu commented Jan 4, 2024

Oh, I got what you meant. Sounds good to me. Thank you.

@troychiu
Copy link
Author

troychiu commented Jan 4, 2024

I can try to implement them!

@midnattsol
Copy link

take

@midnattsol
Copy link

midnattsol commented Nov 29, 2024

Hello, I'm reading the need, and how it could be implemented.
I have a question @tustvold. Do you think the functions could look like this, for example?

pub async fn download(
    store: &dyn ObjectStore,
    location: &Path,
    opts: GetOptions,
    file: &mut std::fs::File,
    transfer_opts: Option<&DownloadTransferConfig>,
) -> Result<()> 

The idea of transfer_opts (I can change the name for anything else) it's implement something similar to this:

pub struct DownloadTransferConfig {
    /// The maximum number of concurrent chunks to download
    pub max_concurrent_chunks: usize,
    /// The maximum number of bytes to buffer in memory
    pub chunk_queue_size: usize,
}

impl Default for DownloadTransferConfig {
    fn default() -> Self {
        Self {
            max_concurrent_chunks: 1,
            chunk_queue_size: 2,
        }
    }
}

So in the download function, would be possible to use something similar to this

pub async fn download(
    store: &dyn ObjectStore,
    location: &Path,
    opts: GetOptions,
    file: &mut std::fs::File,
    transfer_opts: Option<&DownloadTransferConfig>,
) -> Result<()> {
    let result = store.get_opts(&location, opts).await?;
    let transfer_opts = *transfer_opts.unwrap_or(&DownloadTransferConfig::default());
    let (sender, mut receiver) =
        tokio::sync::mpsc::channel::<Bytes>(transfer_opts.chunk_queue_size);

    match result.payload {
        GetResultPayload::Stream(stream) => {
            let sender_task = tokio::spawn(async move {
                let mut buffered_stream = stream
                    .map(|chunk| async move {
                        let chunk = chunk.map_err(crate::Error::from)?;
                        Ok::<Bytes, crate::Error>(chunk)
                    })
                    .buffered(transfer_opts.max_concurrent_chunks);

                while let Some(chunk) = buffered_stream.next().await {
                    let chunk = chunk?;
                    if let Err(e) = sender.send(chunk).await {
                        eprintln!("Error sending the chunk: {:?}", e);
                        break;
                    }
                }
                drop(sender);
                Ok::<(), crate::Error>(())
            });

            while let Some(chunk) = receiver.recv().await {
                file.write_all(&chunk).context(UnableToWriteFileSnafu)?;
            }

            sender_task.await.context(UnableToJoinTaskSnafu)??;
            Ok(())
        }

        GetResultPayload::File(mut source_file, _path) => {
            std::io::copy(&mut source_file, file).context(UnableToWriteFileSnafu)?;
            Ok(())
        }
    }
}

I'm still making tests, to find the best way to do it. But I was wondering if adding the possibility to configure the buffered, or the size of the channel would fit with the expectatives of thiss issue.

Regards

@tustvold
Copy link
Contributor

tustvold commented Nov 29, 2024

Making the behaviour configurable and automatically handle concurrency sounds valuable to me. You may be able to take inspiration from the S3 TransferManager.

That being said I'm not sure your implementation will actually make concurrent requests, rather distribute chunks from the same streaming request, this is unlikely to be any faster.

You'd need to make the initial request and then inspect the returned object size in order to determine if spawning parallel range requests is warranted

@midnattsol
Copy link

Thanks for the feedback, I will check the S3 TransferManager, and look in the direction you said related with the concurrency.

Thank you.

@midnattsol
Copy link

I've created another issue to implement the method upload for easy review. #6832

Tomorrow I will create a draft to discuss about the download method,

Regards

@midnattsol
Copy link

I've created a pull request draft: #6837

I have to add tests anyway, but if someone is interested about checking it and give feedback, it will be welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers help wanted
Projects
None yet
3 participants