Skip to content

Commit

Permalink
fix(core): Gcs's RangeWrite doesn't support concurrent write
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jun 25, 2024
1 parent 9c601c0 commit 5bd3806
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
6 changes: 3 additions & 3 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl Access for GcsBackend {
// It's recommended that you use at least 8 MiB for the chunk size.
//
// Reference: [Perform resumable uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
write_multi_align_size: Some(256 * 1024 * 1024),
write_multi_align_size: Some(8 * 1024 * 1024),

delete: true,
copy: true,
Expand Down Expand Up @@ -433,10 +433,10 @@ impl Access for GcsBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let executor = args.executor().cloned();
let w = GcsWriter::new(self.core.clone(), path, args);
let w = oio::RangeWriter::new(w, executor, concurrent);
// Gcs can't support concurrent write, always use concurrent=1 for now.
let w = oio::RangeWriter::new(w, executor, 1);

Ok((RpWrite::default(), w))
}
Expand Down
26 changes: 18 additions & 8 deletions core/tests/behavior/async_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,30 +300,40 @@ pub async fn test_writer_write_with_concurrent(op: Operator) -> Result<()> {
}

let path = TEST_FIXTURE.new_file_path();
let size = 5 * 1024 * 1024; // write file with 5 MiB
let content_a = gen_fixed_bytes(size);
let content_b = gen_fixed_bytes(size);
// We need at least 3 part to make sure concurrent happened.
let (content_a, size_a) = gen_bytes_with_range(5 * 1024 * 1024..6 * 1024 * 1024);
let (content_b, size_b) = gen_bytes_with_range(5 * 1024 * 1024..6 * 1024 * 1024);
let (content_c, size_c) = gen_bytes_with_range(5 * 1024 * 1024..6 * 1024 * 1024);

let mut w = op.writer_with(&path).concurrent(2).await?;
let mut w = op.writer_with(&path).concurrent(3).await?;
w.write(content_a.clone()).await?;
w.write(content_b.clone()).await?;
w.write(content_c.clone()).await?;
w.close().await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.content_length(), (size * 2) as u64);
assert_eq!(meta.content_length(), (size_a + size_b + size_c) as u64);

let bs = op.read(&path).await?.to_bytes();
assert_eq!(bs.len(), size * 2, "read size");
assert_eq!(bs.len(), size_a + size_b + size_c, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[..size])),
format!("{:x}", Sha256::digest(&bs[..size_a])),
format!("{:x}", Sha256::digest(content_a)),
"read content a"
);
assert_eq!(
format!("{:x}", Sha256::digest(&bs[size..])),
format!("{:x}", Sha256::digest(&bs[size_a..size_a + size_b])),
format!("{:x}", Sha256::digest(content_b)),
"read content b"
);
assert_eq!(
format!(
"{:x}",
Sha256::digest(&bs[size_a + size_b..size_a + size_b + size_c])
),
format!("{:x}", Sha256::digest(content_c)),
"read content b"
);

Ok(())
}
Expand Down

0 comments on commit 5bd3806

Please sign in to comment.