Skip to content

Commit

Permalink
feat(storage): upload to return size
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jun 14, 2023
1 parent b4d97d8 commit 2d28abc
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public void configure(final Map<String, ?> configs) {
}

@Override
public void upload(final InputStream inputStream, final String key) throws StorageBackendException {
public long upload(final InputStream inputStream, final String key) throws StorageBackendException {
return 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
public interface ObjectUploader {
/**
* @param inputStream content to upload. Not closed as part of the upload.
* @param key path to an object within a storage backend.
* @param key path to an object within a storage backend.
* @return number of bytes uploaded
*/
void upload(InputStream inputStream, String key) throws StorageBackendException;
long upload(InputStream inputStream, String key) throws StorageBackendException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public abstract class BaseStorageTest {
void testUploadFetchDelete() throws IOException, StorageBackendException {
final byte[] data = "some file".getBytes();
final InputStream file = new ByteArrayInputStream(data);
storage().upload(file, TOPIC_PARTITION_SEGMENT_KEY);
final long size = storage().upload(file, TOPIC_PARTITION_SEGMENT_KEY);
assertThat(size).isEqualTo(data.length);

try (final InputStream fetch = storage().fetch(TOPIC_PARTITION_SEGMENT_KEY)) {
final String r = new String(fetch.readAllBytes());
Expand All @@ -59,7 +60,8 @@ void testUploadFetchDelete() throws IOException, StorageBackendException {
void testUploadANewFile() throws StorageBackendException, IOException {
final String content = "content";
final ByteArrayInputStream in = new ByteArrayInputStream(content.getBytes());
storage().upload(in, TOPIC_PARTITION_SEGMENT_KEY);
final long size = storage().upload(in, TOPIC_PARTITION_SEGMENT_KEY);
assertThat(size).isEqualTo(content.length());

assertThat(in).isEmpty();
in.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Map;

import io.aiven.kafka.tieredstorage.storage.BytesRange;
Expand All @@ -47,13 +47,12 @@ public void configure(final Map<String, ?> configs) {
}

@Override
public void upload(final InputStream inputStream, final String key) throws StorageBackendException {
public long upload(final InputStream inputStream, final String key) throws StorageBackendException {
try {
final Path path = fsRoot.resolve(key);
Files.createDirectories(path.getParent());
try (final OutputStream outputStream = Files.newOutputStream(path)) {
inputStream.transferTo(outputStream);
}
Files.copy(inputStream, path, StandardCopyOption.REPLACE_EXISTING);
return Files.size(path);
} catch (final IOException e) {
throw new StorageBackendException("Failed to upload " + key, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class S3MultiPartOutputStream extends OutputStream {
private final List<PartETag> partETags = new ArrayList<>();

private boolean closed;
private long uploadedBytes = 0L;

public S3MultiPartOutputStream(final String bucketName,
final String key,
Expand Down Expand Up @@ -128,6 +129,7 @@ private void flushBuffer(final int offset,
final ByteArrayInputStream in = new ByteArrayInputStream(partBuffer.array(), offset, actualPartSize);
uploadPart(in, actualPartSize);
partBuffer.clear();
uploadedBytes += actualPartSize;
} catch (final Exception e) {
log.error("Failed to upload part in multipart upload {}, aborting transaction", uploadId, e);
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key, uploadId));
Expand All @@ -149,4 +151,11 @@ private void uploadPart(final InputStream in, final int actualPartSize) {
final UploadPartResult uploadResult = client.uploadPart(uploadPartRequest);
partETags.add(uploadResult.getPartETag());
}

public long uploadedBytes() {
if (!closed) {
throw new IllegalStateException("Stream must be closed before checking size");
}
return uploadedBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,23 @@ public void configure(final Map<String, ?> configs) {
}

@Override
public void upload(final InputStream inputStream, final String key) throws StorageBackendException {
try (final var out = s3OutputStream(key)) {
inputStream.transferTo(out);
public long upload(final InputStream in, final String key) throws StorageBackendException {
try {
final var out = s3OutputStream(key);
transferContent(key, in, out);
return out.uploadedBytes();
} catch (final AmazonS3Exception e) {
throw new StorageBackendException("Failed to initiate uploading " + key, e);
}
}

void transferContent(final String key,
final InputStream in,
final S3MultiPartOutputStream out)
throws StorageBackendException {
try {
in.transferTo(out);
out.close();
} catch (final AmazonS3Exception | IOException e) {
throw new StorageBackendException("Failed to upload " + key, e);
}
Expand Down

0 comments on commit 2d28abc

Please sign in to comment.