Skip to content

Commit

Permalink
s3s-fs: fix consistency of multipart uploads via a temp file (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
amunra authored Dec 6, 2023
1 parent 7b69f21 commit 9824baf
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
32 changes: 18 additions & 14 deletions crates/s3s-fs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::io::{AsyncReadExt, BufWriter};

use md5::{Digest, Md5};
use path_absolutize::Absolutize;
use s3s::dto::PartNumber;
use uuid::Uuid;

#[derive(Debug)]
Expand All @@ -35,7 +36,7 @@ fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
let entry = entry?;
let file_name = entry.file_name();
let Some(file_name) = file_name.to_str() else { continue };
// See `FileSystem::write_file`
// See `FileSystem::prepare_file_write`
if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") {
std::fs::remove_file(entry.path())?;
}
Expand All @@ -55,6 +56,10 @@ impl FileSystem {
Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
}

pub(crate) fn resolve_upload_part_path(&self, upload_id: Uuid, part_number: PartNumber) -> Result<PathBuf> {
self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))
}

/// resolve object path under the virtual root
pub(crate) fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
let dir = Path::new(&bucket);
Expand Down Expand Up @@ -171,53 +176,52 @@ impl FileSystem {

/// Write to the filesystem atomically.
/// This is done by first writing to a temporary location and then moving the file.
pub(crate) async fn prepare_file_write(&self, bucket: &str, key: &str) -> Result<FileWriter> {
let final_path = Some(self.get_object_path(bucket, key)?);
pub(crate) async fn prepare_file_write<'a>(&self, path: &'a Path) -> Result<FileWriter<'a>> {
let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst));
let tmp_path = self.resolve_abs_path(tmp_name)?;
let file = File::create(&tmp_path).await?;
let writer = BufWriter::new(file);
Ok(FileWriter {
tmp_path,
final_path,
dest_path: path,
writer,
clean_tmp: true,
})
}
}

pub(crate) struct FileWriter {
pub(crate) struct FileWriter<'a> {
tmp_path: PathBuf,
final_path: Option<PathBuf>,
dest_path: &'a Path,
writer: BufWriter<File>,
clean_tmp: bool,
}

impl FileWriter {
impl<'a> FileWriter<'a> {
pub(crate) fn tmp_path(&self) -> &Path {
&self.tmp_path
}

pub(crate) fn final_path(&self) -> &Path {
self.final_path.as_ref().unwrap()
pub(crate) fn dest_path(&self) -> &'a Path {
self.dest_path
}

pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
&mut self.writer
}

pub(crate) async fn done(mut self) -> Result<PathBuf> {
if let Some(final_dir_path) = self.final_path().parent() {
pub(crate) async fn done(mut self) -> Result<()> {
if let Some(final_dir_path) = self.dest_path().parent() {
fs::create_dir_all(&final_dir_path).await?;
}

fs::rename(&self.tmp_path, &self.final_path()).await?;
fs::rename(&self.tmp_path, self.dest_path()).await?;
self.clean_tmp = false;
Ok(self.final_path.take().unwrap())
Ok(())
}
}

impl Drop for FileWriter {
impl<'a> Drop for FileWriter<'a> {
fn drop(&mut self) {
if self.clean_tmp {
let _ = std::fs::remove_file(&self.tmp_path);
Expand Down
33 changes: 17 additions & 16 deletions crates/s3s-fs/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::path::{Path, PathBuf};

use tokio::fs;
use tokio::io::AsyncSeekExt;
use tokio::io::BufWriter;
use tokio_util::io::ReaderStream;

use futures::TryStreamExt;
Expand Down Expand Up @@ -466,7 +465,8 @@ impl S3 for FileSystem {
return Ok(S3Response::new(output));
}

let mut file_writer = self.prepare_file_write(&bucket, &key).await?;
let object_path = self.get_object_path(&bucket, &key)?;
let mut file_writer = self.prepare_file_write(&object_path).await?;

let mut md5_hash = Md5::new();
let stream = body.inspect_ok(|bytes| {
Expand All @@ -475,7 +475,7 @@ impl S3 for FileSystem {
});

let size = copy_bytes(stream, file_writer.writer()).await?;
let object_path = file_writer.done().await?;
file_writer.done().await?;

let md5_sum = hex(md5_hash.finalize());

Expand Down Expand Up @@ -550,15 +550,15 @@ impl S3 for FileSystem {
return Err(s3_error!(AccessDenied));
}

let file_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?;
let file_path = self.resolve_upload_part_path(upload_id, part_number)?;

let mut md5_hash = Md5::new();
let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));

let file = try_!(fs::File::create(&file_path).await);
let mut writer = BufWriter::new(file);
let mut file_writer = self.prepare_file_write(&file_path).await?;
let size = copy_bytes(stream, file_writer.writer()).await?;
file_writer.done().await?;

let size = copy_bytes(stream, &mut writer).await?;
let md5_sum = hex(md5_hash.finalize());

debug!(path = %file_path.display(), ?size, %md5_sum, "write file");
Expand All @@ -585,7 +585,7 @@ impl S3 for FileSystem {
CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
};
let src_path = self.get_object_path(src_bucket, src_key)?;
let dst_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?;
let dst_path = self.resolve_upload_part_path(upload_id, part_number)?;

let mut src_file = fs::File::open(&src_path).await.map_err(|e| s3_error!(e, NoSuchKey))?;
let file_len = try_!(src_file.metadata().await).len();
Expand Down Expand Up @@ -616,13 +616,13 @@ impl S3 for FileSystem {
let _ = try_!(src_file.seek(io::SeekFrom::Start(start)).await);
let body = StreamingBlob::wrap(bytes_stream(ReaderStream::with_capacity(src_file, 4096), content_length_usize));

let dst_file = try_!(fs::File::create(&dst_path).await);
let mut writer = BufWriter::new(dst_file);

let mut md5_hash = Md5::new();
let stream = body.inspect_ok(|bytes| md5_hash.update(bytes.as_ref()));

let size = copy_bytes(stream, &mut writer).await?;
let mut file_writer = self.prepare_file_write(&dst_path).await?;
let size = copy_bytes(stream, file_writer.writer()).await?;
file_writer.done().await?;

let md5_sum = hex(md5_hash.finalize());

debug!(path = %dst_path.display(), ?size, %md5_sum, "write file");
Expand Down Expand Up @@ -707,7 +707,8 @@ impl S3 for FileSystem {

self.delete_upload_id(&upload_id).await?;

let mut file_writer = self.prepare_file_write(&bucket, &key).await?;
let object_path = self.get_object_path(&bucket, &key)?;
let mut file_writer = self.prepare_file_write(&object_path).await?;

let mut cnt: i32 = 0;
for part in multipart_upload.parts.into_iter().flatten() {
Expand All @@ -717,15 +718,15 @@ impl S3 for FileSystem {
return Err(s3_error!(InvalidRequest, "invalid part order"));
}

let part_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?;
let part_path = self.resolve_upload_part_path(upload_id, part_number)?;

let mut reader = try_!(fs::File::open(&part_path).await);
let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await);

debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.final_path().display(), ?size, "write file");
debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.dest_path().display(), ?size, "write file");
try_!(fs::remove_file(&part_path).await);
}
let object_path = file_writer.done().await?;
file_writer.done().await?;

let file_size = try_!(fs::metadata(&object_path).await).len();
let md5_sum = self.get_md5_sum(&bucket, &key).await?;
Expand Down

0 comments on commit 9824baf

Please sign in to comment.