Skip to content

Commit

Permalink
feat(storage): update multipart operations to support multiple bucket…
Browse files Browse the repository at this point in the history
…s/regions (#2899)
  • Loading branch information
phantumcode authored Aug 27, 2024
1 parent 44ad870 commit 5125555
Show file tree
Hide file tree
Showing 27 changed files with 501 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.junit.Test

open class TransferDBTest {
private val bucketName = "bucket_name"
private val region = "us-east-1"
private val fileKey = "file_key"
private lateinit var transferDB: TransferDB
private lateinit var tempFile: File
Expand All @@ -55,6 +56,7 @@ open class TransferDBTest {
transferId,
TransferType.UPLOAD,
bucketName,
region,
fileKey,
tempFile,
null,
Expand All @@ -67,6 +69,7 @@ open class TransferDBTest {
Assert.assertEquals(tempFile, File(this.file))
Assert.assertEquals(fileKey, this.key)
Assert.assertEquals(bucketName, this.bucketName)
Assert.assertEquals(region, this.region)
} ?: Assert.fail("InsertedRecord is null")
}

Expand All @@ -76,6 +79,7 @@ open class TransferDBTest {
val uri = transferDB.insertMultipartUploadRecord(
uploadID,
bucketName,
region,
fileKey,
tempFile,
1L,
Expand All @@ -91,6 +95,7 @@ open class TransferDBTest {
Assert.assertEquals(fileKey, this.key)
Assert.assertEquals(bucketName, this.bucketName)
Assert.assertEquals(uploadID, this.multipartId)
Assert.assertEquals(region, this.region)
} ?: Assert.fail("InsertedRecord is null")
}

Expand All @@ -104,6 +109,7 @@ open class TransferDBTest {
contentValues[0] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand Down Expand Up @@ -137,6 +143,7 @@ open class TransferDBTest {
contentValues[0] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand All @@ -151,6 +158,7 @@ open class TransferDBTest {
contentValues[1] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand All @@ -165,6 +173,7 @@ open class TransferDBTest {
contentValues[2] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@
import com.amplifyframework.storage.s3.request.AWSS3StorageRemoveRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest;
import com.amplifyframework.storage.s3.service.AWSS3StorageService;
import com.amplifyframework.storage.s3.service.StorageService;
import com.amplifyframework.storage.s3.service.AWSS3StorageServiceContainer;
import com.amplifyframework.storage.s3.transfer.S3StorageTransferClientProvider;
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider;
import com.amplifyframework.storage.s3.transfer.TransferObserver;
import com.amplifyframework.storage.s3.transfer.TransferRecord;
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater;
Expand All @@ -101,9 +103,7 @@

import java.io.File;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -126,20 +126,33 @@ public final class AWSS3StoragePlugin extends StoragePlugin<S3Client> {

private static final int DEFAULT_URL_EXPIRATION_DAYS = 7;

private final StorageService.Factory storageServiceFactory;
private final AWSS3StorageService.Factory storageServiceFactory;
private final ExecutorService executorService;
private final AuthCredentialsProvider authCredentialsProvider;
private AuthCredentialsProvider authCredentialsProvider;
private final AWSS3StoragePluginConfiguration awsS3StoragePluginConfiguration;
private AWSS3StorageService defaultStorageService;
@SuppressWarnings("deprecation")
private StorageAccessLevel defaultAccessLevel;
private int defaultUrlExpiration;

private Map<String, AWSS3StorageService> awsS3StorageServicesByBucketName = new HashMap<>();
private Context context;
private AWSS3StorageServiceContainer awss3StorageServiceContainer;
@SuppressLint("UnsafeOptInUsageError")
private List<AmplifyOutputsData.StorageBucket> configuredBuckets;

@SuppressLint("UnsafeOptInUsageError")
private StorageTransferClientProvider clientProvider
= new S3StorageTransferClientProvider((region, bucketName) -> {
if (region != null && bucketName != null) {
StorageBucket bucket = StorageBucket.fromBucketInfo(new BucketInfo(bucketName, region));
return awss3StorageServiceContainer.get((ResolvedStorageBucket) bucket).getClient();

Check warning on line 147 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L146-L147

Added lines #L146 - L147 were not covered by tests
}

if (region != null) {
return S3StorageTransferClientProvider.getS3Client(region, authCredentialsProvider);

Check warning on line 151 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L151

Added line #L151 was not covered by tests
}
return defaultStorageService.getClient();

Check warning on line 153 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L153

Added line #L153 was not covered by tests
});

/**
* Constructs the AWS S3 Storage Plugin initializing the executor service.
*/
Expand All @@ -162,13 +175,14 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf

@VisibleForTesting
AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider) {
this((context, region, bucket) ->
this((context, region, bucket, clientProvider) ->

Check warning on line 178 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L178

Added line #L178 was not covered by tests
new AWSS3StorageService(
context,
region,
bucket,
authCredentialsProvider,
AWS_S3_STORAGE_PLUGIN_KEY
AWS_S3_STORAGE_PLUGIN_KEY,
clientProvider
),
authCredentialsProvider,
new AWSS3StoragePluginConfiguration.Builder().build());
Expand All @@ -177,21 +191,23 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf
@VisibleForTesting
AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider,
AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration) {
this((context, region, bucket) ->

this((context, region, bucket, clientProvider) ->

Check warning on line 195 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L195

Added line #L195 was not covered by tests
new AWSS3StorageService(
context,
region,
bucket,
authCredentialsProvider,
AWS_S3_STORAGE_PLUGIN_KEY
AWS_S3_STORAGE_PLUGIN_KEY,
clientProvider
),
authCredentialsProvider,
awss3StoragePluginConfiguration);
}

@VisibleForTesting
AWSS3StoragePlugin(
StorageService.Factory storageServiceFactory,
AWSS3StorageService.Factory storageServiceFactory,
AuthCredentialsProvider authCredentialsProvider,
AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration
) {
Expand Down Expand Up @@ -281,13 +297,15 @@ private void configure(
@NonNull ResolvedStorageBucket bucket
) throws StorageException {
try {
this.context = context;
this.defaultStorageService = (AWSS3StorageService) storageServiceFactory.create(
this.defaultStorageService = storageServiceFactory.create(
context,
region,
bucket.getBucketInfo().getName());
this.awsS3StorageServicesByBucketName.clear();
this.awsS3StorageServicesByBucketName.put(bucket.getBucketInfo().getName(), this.defaultStorageService);
bucket.getBucketInfo().getName(),
clientProvider);
this.awss3StorageServiceContainer = new AWSS3StorageServiceContainer(
context, storageServiceFactory,
(S3StorageTransferClientProvider) clientProvider);
this.awss3StorageServiceContainer.put(bucket.getBucketInfo().getName(), this.defaultStorageService);
} catch (RuntimeException exception) {
throw new StorageException(
"Failed to create storage service.",
Expand Down Expand Up @@ -935,7 +953,8 @@ public StorageRemoveOperation<?> remove(

return operation;
}


@SuppressLint("UnsafeOptInUsageError")
@Override
@SuppressWarnings("deprecation")
public void getTransfer(
Expand All @@ -951,18 +970,23 @@ public void getTransfer(
transferRecord.getId(),
defaultStorageService.getTransferManager().getTransferStatusUpdater(),

Check warning on line 971 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L971

Added line #L971 was not covered by tests
transferRecord.getBucketName(),
transferRecord.getRegion(),

Check warning on line 973 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L973

Added line #L973 was not covered by tests
transferRecord.getKey(),
transferRecord.getFile(),
null,
transferRecord.getState() != null ? transferRecord.getState() : TransferState.UNKNOWN);
TransferType transferType = transferRecord.getType();

AWSS3StorageService storageService
= getAwss3StorageServiceFromTransferRecord(onError, transferRecord);

Check warning on line 981 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L980-L981

Added lines #L980 - L981 were not covered by tests

switch (Objects.requireNonNull(transferType)) {
case UPLOAD:
if (transferRecord.getFile().startsWith(TransferStatusUpdater.TEMP_FILE_PREFIX)) {
AWSS3StorageUploadInputStreamOperation operation =
new AWSS3StorageUploadInputStreamOperation(
transferId,
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -973,7 +997,7 @@ public void getTransfer(
AWSS3StorageUploadFileOperation operation =
new AWSS3StorageUploadFileOperation(
transferId,
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -987,7 +1011,7 @@ public void getTransfer(
downloadFileOperation = new AWSS3StorageDownloadFileOperation(
transferId,
new File(transferRecord.getFile()),
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -1009,6 +1033,25 @@ public void getTransfer(
});
}

private AWSS3StorageService getAwss3StorageServiceFromTransferRecord(
@NonNull Consumer<StorageException> onError,
TransferRecord transferRecord
) {
AWSS3StorageService storageService = defaultStorageService;

Check warning on line 1040 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L1040

Added line #L1040 was not covered by tests
if (transferRecord.getRegion() != null && transferRecord.getBucketName() != null) {
try {
BucketInfo bucketInfo = new BucketInfo(
transferRecord.getBucketName(),
transferRecord.getRegion());
StorageBucket bucket = StorageBucket.fromBucketInfo(bucketInfo);
storageService = getStorageService(bucket);
} catch (StorageException exception) {
onError.accept(exception);
}

Check warning on line 1050 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L1043-L1050

Added lines #L1043 - L1050 were not covered by tests
}
return storageService;

Check warning on line 1052 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L1052

Added line #L1052 was not covered by tests
}

@NonNull
@SuppressWarnings("deprecation")
@Override
Expand Down Expand Up @@ -1105,55 +1148,27 @@ AWSS3StorageService getStorageService(@Nullable StorageBucket bucket) throws Sto
}

if (bucket instanceof OutputsStorageBucket) {
AWSS3StorageService service = getAWSS3StorageService((OutputsStorageBucket) bucket);
if (service == null) {
throw new StorageException(
"Unable to find bucket from name in Amplify Outputs.",
new InvalidStorageBucketException(),
"Ensure the bucket name used is available in Amplify Outputs.");
} else {
return service;
}
}

if (bucket instanceof ResolvedStorageBucket) {
return getAWSS3StorageService((ResolvedStorageBucket) bucket);
}

return defaultStorageService;
}

@SuppressLint("UnsafeOptInUsageError")
private AWSS3StorageService getAWSS3StorageService(OutputsStorageBucket outputsStorageBucket) {
if (configuredBuckets != null && !configuredBuckets.isEmpty()) {
String name = outputsStorageBucket.getName();
for (AmplifyOutputsData.StorageBucket configuredBucket : configuredBuckets) {
if (configuredBucket.getName().equals(name)) {
String bucketName = configuredBucket.getBucketName();
AWSS3StorageService service = awsS3StorageServicesByBucketName.get(bucketName);
if (service == null) {
if (configuredBuckets != null && !configuredBuckets.isEmpty()) {
String name = ((OutputsStorageBucket) bucket).getName();
for (AmplifyOutputsData.StorageBucket configuredBucket : configuredBuckets) {
if (configuredBucket.getName().equals(name)) {
String bucketName = configuredBucket.getBucketName();
String region = configuredBucket.getAwsRegion();
service = (AWSS3StorageService) storageServiceFactory.create(context, region, bucketName);
awsS3StorageServicesByBucketName.put(bucketName, service);
return awss3StorageServiceContainer.get(bucketName, region);
}

return service;
}
}
throw new StorageException(
"Unable to find bucket from name in Amplify Outputs.",
new InvalidStorageBucketException(),
"Ensure the bucket name used is available in Amplify Outputs.");
}
return null;
}

@SuppressLint("UnsafeOptInUsageError")
private AWSS3StorageService getAWSS3StorageService(ResolvedStorageBucket resolvedStorageBucket) {
String bucketName = resolvedStorageBucket.getBucketInfo().getName();
AWSS3StorageService service = awsS3StorageServicesByBucketName.get(bucketName);
if (service == null) {
String region = resolvedStorageBucket.getBucketInfo().getRegion();
service = (AWSS3StorageService) storageServiceFactory.create(context, region, bucketName);
awsS3StorageServicesByBucketName.put(bucketName, service);
if (bucket instanceof ResolvedStorageBucket) {
return awss3StorageServiceContainer.get((ResolvedStorageBucket) bucket);
}
return service;

return defaultStorageService;

Check warning on line 1171 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java#L1171

Added line #L1171 was not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ internal object TransferOperations {
transferRecord.id,
transferStatusUpdater,
transferRecord.bucketName,
transferRecord.region,

Check warning on line 73 in aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt

View check run for this annotation

Codecov / codecov/patch

aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt#L73

Added line #L73 was not covered by tests
transferRecord.key,
transferRecord.file,
listener
Expand Down
Loading

0 comments on commit 5125555

Please sign in to comment.