diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java b/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java index 5c35abd18..46a372e6a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/NoopStorageBackend.java @@ -37,7 +37,8 @@ public void configure(final Map configs) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final String key) throws StorageBackendException { + return 0; } @Override diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java index 0b3e8f76f..a31366afe 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectUploader.java @@ -21,7 +21,8 @@ public interface ObjectUploader { /** * @param inputStream content to upload. Not closed as part of the upload. - * @param key path to an object within a storage backend. + * @param key path to an object within a storage backend. + * @return number of bytes uploaded */ - void upload(InputStream inputStream, String key) throws StorageBackendException; + long upload(InputStream inputStream, String key) throws StorageBackendException; } diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 57dd906b3..45c5dd38a 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -35,7 +35,8 @@ public abstract class BaseStorageTest { void testUploadFetchDelete() throws IOException, StorageBackendException { final byte[] data = "some file".getBytes(); final InputStream file = new ByteArrayInputStream(data); - storage().upload(file, TOPIC_PARTITION_SEGMENT_KEY); + final long size = storage().upload(file, TOPIC_PARTITION_SEGMENT_KEY); + assertThat(size).isEqualTo(data.length); try (final InputStream fetch = storage().fetch(TOPIC_PARTITION_SEGMENT_KEY)) { final String r = new String(fetch.readAllBytes()); @@ -59,7 +60,8 @@ void testUploadFetchDelete() throws IOException, StorageBackendException { void testUploadANewFile() throws StorageBackendException, IOException { final String content = "content"; final ByteArrayInputStream in = new ByteArrayInputStream(content.getBytes()); - storage().upload(in, TOPIC_PARTITION_SEGMENT_KEY); + final long size = storage().upload(in, TOPIC_PARTITION_SEGMENT_KEY); + assertThat(size).isEqualTo(content.length()); assertThat(in).isEmpty(); in.close(); diff --git a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java index b54f6285b..120dbb0b2 100644 --- a/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java +++ b/storage/filesystem/src/main/java/io/aiven/kafka/tieredstorage/storage/filesystem/FileSystemStorage.java @@ -18,10 +18,10 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.Map; import io.aiven.kafka.tieredstorage.storage.BytesRange; @@ -47,13 +47,12 @@ public void configure(final Map configs) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { + public long upload(final InputStream inputStream, final String key) throws StorageBackendException { try { final Path path = fsRoot.resolve(key); Files.createDirectories(path.getParent()); - try (final OutputStream outputStream = Files.newOutputStream(path)) { - inputStream.transferTo(outputStream); - } + Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING); + return Files.size(path); } catch (final IOException e) { throw new StorageBackendException("Failed to upload " + key, e); } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java index d7d951469..9515c0ab0 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java @@ -57,6 +57,7 @@ public class S3MultiPartOutputStream extends OutputStream { private final List partETags = new ArrayList<>(); private boolean closed; + private long uploadedBytes = 0L; public S3MultiPartOutputStream(final String bucketName, final String key, @@ -128,6 +129,7 @@ private void flushBuffer(final int offset, final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize); uploadPart(in, actualPartSize); partBuffer.clear(); + uploadedBytes += actualPartSize; } catch (final Exception e) { log.error("Failed to upload part in multipart upload {}, aborting transaction", uploadId, e); client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId)); @@ -149,4 +151,11 @@ private void uploadPart(final InputStream in, final int actualPartSize) { final UploadPartResult uploadResult = client.uploadPart(uploadPartRequest); partETags.add(uploadResult.getPartETag()); } + + public long uploadedBytes() { + if (!closed) { + throw new IllegalStateException("Stream must be closed before checking size"); + } + return uploadedBytes; + } } diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index bc8b9e100..511fb1adf 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -46,9 +46,23 @@ public void configure(final Map configs) { } @Override - public void upload(final InputStream inputStream, final String key) throws StorageBackendException { - try (final var out = s3OutputStream(key)) { - inputStream.transferTo(out); + public long upload(final InputStream in, final String key) throws StorageBackendException { + try { + final var out = s3OutputStream(key); + transferContent(key, in, out); + return out.uploadedBytes(); + } catch (final AmazonS3Exception e) { + throw new StorageBackendException("Failed to initiate uploading " + key, e); + } + } + + void transferContent(final String key, + final InputStream in, + final S3MultiPartOutputStream out) + throws StorageBackendException { + try { + in.transferTo(out); + out.close(); } catch (final AmazonS3Exception | IOException e) { throw new StorageBackendException("Failed to upload " + key, e); }