Skip to content

Commit

Permalink
Support for multipart upload
Browse files Browse the repository at this point in the history
  • Loading branch information
t3t5u committed Dec 18, 2024
1 parent 6a70969 commit 2077c09
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 1 deletion.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
- **https**: use https or not (boolean, default true)
- **user**: proxy user (string, optional)
- **password**: proxy password (string, optional)
- **multipart_upload**: configuration to use multipart upload. (optional)
- **part_size**: size of each upload a part. (string, defualt: '5g')
- **max_threads**: maximum number of threads to upload concurrently. (int, defualt: 4)
- **retry_limit**: maximum number of retries for each upload a part. (int, defualt: 3)

- **auth_method**: name of mechanism to authenticate requests (basic, env, instance, profile, properties, anonymous, or session. default: basic)

Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ dependencies {
}

implementation "org.embulk:embulk-util-file:0.1.3"
implementation "org.embulk:embulk-util-retryhelper:0.9.0"

// Jackson libraries are once excluded from transitive dependencies of other dependencies, and re-added explicitly here.
// The versions 2.6.7 are exactly the same with embulk-core's dependency before v0.10.32.
Expand Down
1 change: 1 addition & 0 deletions gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ org.embulk:embulk-spi:0.10.37=compileClasspath
org.embulk:embulk-util-aws-credentials:0.4.1=compileClasspath,runtimeClasspath
org.embulk:embulk-util-config:0.3.2=compileClasspath,runtimeClasspath
org.embulk:embulk-util-file:0.1.3=compileClasspath,runtimeClasspath
org.embulk:embulk-util-retryhelper:0.9.0=compileClasspath,runtimeClasspath
org.msgpack:msgpack-core:0.8.11=compileClasspath
org.slf4j:jcl-over-slf4j:1.7.12=compileClasspath,runtimeClasspath
org.slf4j:slf4j-api:1.7.30=compileClasspath
Expand Down
255 changes: 254 additions & 1 deletion src/main/java/org/embulk/output/s3/S3FileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@

package org.embulk.output.s3;

import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.IllegalFormatException;
import java.util.List;
import java.util.Locale;
Expand All @@ -31,6 +37,15 @@
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.util.Md5Utils;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
Expand All @@ -47,6 +62,9 @@
import org.embulk.spi.FileOutputPlugin;
import org.embulk.spi.TransactionalFileOutput;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.retryhelper.RetryExecutor;
import org.embulk.util.retryhelper.RetryGiveupException;
import org.embulk.util.retryhelper.Retryable;
import org.slf4j.Logger;

import com.amazonaws.ClientConfiguration;
Expand All @@ -57,6 +75,11 @@
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class S3FileOutputPlugin
implements FileOutputPlugin
Expand Down Expand Up @@ -116,6 +139,10 @@ public interface PluginTask
@Config("region")
@ConfigDefault("null")
Optional<String> getRegion();

@Config("multipart_upload")
@ConfigDefault("null")
Optional<MultipartUpload> getMultipartUpload();
}

public static class S3FileOutput
Expand All @@ -128,13 +155,15 @@ public static class S3FileOutput
private final String fileNameExtension;
private final String tempPathPrefix;
private final Optional<CannedAccessControlList> cannedAccessControlListOptional;
private final MultipartUpload multipartUpload;

private int taskIndex;
private int fileIndex;
private AmazonS3 client;
private OutputStream current;
private Path tempFilePath;
private String tempPath = null;
private String multipartUploadId = null;

private AmazonS3 newS3Client(final PluginTask task)
{
Expand Down Expand Up @@ -264,6 +293,7 @@ public S3FileOutput(PluginTask task, int taskIndex)
this.tempPath = task.getTempPath().get();
}
this.cannedAccessControlListOptional = task.getCannedAccessControlList();
this.multipartUpload = task.getMultipartUpload().orElse(null);
}

private static Path newTempFile(String tmpDir, String prefix)
Expand Down Expand Up @@ -299,6 +329,148 @@ private String buildCurrentKey()
return pathPrefix + sequence + fileNameExtension;
}

private void multipartUploadOrPutFile(Path from, String key)
{
if (from == null) {
return;
}
if (multipartUpload != null) {
multipartUploadFile(from, key);
}
else {
putFile(from, key);
}
}

private void multipartUploadFile(Path from, String key)
{
ExecutorService executor = Executors.newFixedThreadPool(multipartUpload.maxThreads);
try {
executeMultipartUpload(from, key, executor);
}
finally {
abortMultipartUploadIfNecessary(key, executor);
}
}

private void executeMultipartUpload(Path from, String key, ExecutorService executor)
{
File file = from.toFile();
long fileSize = file.length();
long fileOffset = 0;
long partSize = multipartUpload.partSize;
int partNumber = 1;
int totalParts = (int) (fileSize / partSize) + (fileSize % partSize == 0 ? 0 : 1);
List<Future<PartETag>> partETags = new ArrayList<>();
multipartUploadId = client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key)).getUploadId();
for (; fileOffset < fileSize; fileOffset += partSize, partNumber++) {
partETags.add(submitUploadPart(key, file, fileSize, fileOffset, partSize, partNumber, totalParts, executor));
}
client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, key, multipartUploadId, collect(partETags)));
multipartUploadId = null; // Successfully completed
}

private Future<PartETag> submitUploadPart(String key, File file, long fileSize, long fileOffset, long partSize, int partNumber, int totalParts, ExecutorService executor)
{
return executor.submit(() -> new UploadPart(key, file, fileSize, fileOffset, partSize, partNumber, totalParts).runInterruptible());
}

private class UploadPart implements Retryable<PartETag>
{
final RetryExecutor re = RetryExecutor.builder().withRetryLimit(multipartUpload.retryLimit).build();
final DecimalFormat df = new DecimalFormat("#,###"); // Not thread safe
final String key;
final File file;
final long fileSize;
final long fileOffset;
final long partSize;
final int partNumber;
final int totalParts;
final boolean isLastPart;
final String md5Digest;

UploadPart(String key, File file, long fileSize, long fileOffset, long partSize, int partNumber, int totalParts)
{
this.key = key;
this.file = file;
this.fileSize = fileSize;
this.fileOffset = fileOffset;
this.partSize = Math.min(partSize, fileSize - fileOffset);
this.partNumber = partNumber;
this.totalParts = totalParts;
isLastPart = partNumber >= totalParts;
md5Digest = md5AsBase64(file, fileOffset, partSize);
}

PartETag runInterruptible() throws InterruptedException, RetryGiveupException
{
logger.info("Uploading a part {} / {}. bucket '{}', key '{}', upload id '{}'", partNumber, totalParts, bucket, key, multipartUploadId);
PartETag partETag = re.runInterruptible(this);
logger.info("Uploaded {} / {} bytes of the file. entity tag '{}'", df.format(fileOffset + partSize), df.format(fileSize), partETag.getETag());
return partETag;
}

@Override
public PartETag call()
{
return uploadPart(key, file, fileOffset, partSize, partNumber, isLastPart, md5Digest);
}

@Override
public boolean isRetryableException(Exception exception)
{
return exception instanceof AmazonS3Exception;
}

@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
{
logger.info("An error occurred while uploading a part {} / {}, will retry {} / {} after {} milliseconds.", partNumber, totalParts, retryCount, retryLimit, df.format(retryWait), exception);
}

@Override
public void onGiveup(Exception firstException, Exception lastException)
{
logger.warn("An error occurred while uploading a part {} / {}, give up on retries.", partNumber, totalParts, lastException);
}
}

private PartETag uploadPart(String key, File file, long fileOffset, long partSize, int partNumber, boolean isLastPart, String md5Digest)
{
return client.uploadPart(new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(multipartUploadId)
.withFile(file)
.withFileOffset(fileOffset)
.withPartSize(partSize)
.withPartNumber(partNumber)
.withLastPart(isLastPart)
.withMD5Digest(md5Digest))
.getPartETag();
}

private void abortMultipartUploadIfNecessary(String key, ExecutorService executor)
{
if (multipartUploadId == null) { // Successfully completed
return;
}
try {
abortMultipartUpload(key, executor);
logger.info("Aborts a multipart upload. bucket '{}', key '{}', upload id '{}'", bucket, key, multipartUploadId);
}
catch (RuntimeException e) {
logger.warn("An error occurred while aborting a multipart upload.", e);
logger.warn("An incomplete multipart upload may remain. bucket '{}', key '{}', upload id '{}'", bucket, key, multipartUploadId);
}
}

private void abortMultipartUpload(String key, ExecutorService executor)
{
executor.shutdownNow(); // Attempts to terminate if possible
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket, key, multipartUploadId));
}

private void putFile(Path from, String key)
{
PutObjectRequest request = new PutObjectRequest(bucket, key, from.toFile());
Expand All @@ -315,7 +487,7 @@ private void closeCurrent()
}

try {
putFile(tempFilePath, buildCurrentKey());
multipartUploadOrPutFile(tempFilePath, buildCurrentKey());
fileIndex++;
}
finally {
Expand Down Expand Up @@ -393,6 +565,87 @@ public TaskReport commit()
TaskReport report = CONFIG_MAPPER_FACTORY.newTaskReport();
return report;
}

private static String md5AsBase64(File file, long fileOffset, long partSize)
{
try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
raf.seek(fileOffset);
return Md5Utils.md5AsBase64(new FilePartInputStream(raf, partSize));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class FilePartInputStream extends FilterInputStream
{
final long partSize;
long partOffset;

FilePartInputStream(RandomAccessFile raf, long partSize)
{
super(Channels.newInputStream(raf.getChannel()));
this.partSize = partSize;
}

@Override
public int read(byte[] b, int off, int len) throws IOException
{
if (partOffset >= partSize) {
return -1; // End of part reached
}
int bytesRead = super.read(b, off, (int) Math.min(len, partSize - partOffset));
if (bytesRead <= -1) {
return -1; // End of file reached
}
partOffset += bytesRead;
return bytesRead;
}
}

private static <T> List<T> collect(List<Future<T>> futures)
{
return futures.stream().map(S3FileOutput::get).collect(Collectors.toList());
}

private static <T> T get(Future<T> future)
{
try {
return future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

public static class MultipartUpload
{
@JsonProperty("part_size")
public final long partSize;
@JsonProperty("max_threads")
public final int maxThreads;
@JsonProperty("retry_limit")
public final int retryLimit;

@JsonCreator
public MultipartUpload(@JsonProperty("part_size") String partSize, @JsonProperty("max_threads") Integer maxThreads, @JsonProperty("retry_limit") Integer retryLimit)
{
this.partSize = parseLong(partSize != null ? partSize : "5g");
this.maxThreads = maxThreads != null ? maxThreads : 4;
this.retryLimit = retryLimit != null ? retryLimit : 3;
}

private static final long K = 1024;
private static final long M = K * K;
private static final long G = M * K;

private static long parseLong(String valueWithUnit)
{
final long value = Long.parseLong(valueWithUnit.replaceFirst("[^0-9]*$", ""));
final String unit = valueWithUnit.replaceFirst("^[0-9]*", "").toLowerCase();
return value * (unit.equals("g") ? G : unit.equals("m") ? M : unit.equals("k") ? K : 1);
}
}

private void validateSequenceFormat(PluginTask task)
Expand Down

0 comments on commit 2077c09

Please sign in to comment.