Skip to content

Commit

Permalink
feat(storage): update multipart operations to support multiple bucket…
Browse files Browse the repository at this point in the history
…s and regions
  • Loading branch information
phantumcode committed Aug 20, 2024
1 parent 8fcfd7e commit 764b9a9
Show file tree
Hide file tree
Showing 24 changed files with 263 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
import com.amplifyframework.storage.s3.request.AWSS3StorageRemoveRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest;
import com.amplifyframework.storage.s3.service.AWSS3StorageService;
import com.amplifyframework.storage.s3.service.StorageService;
import com.amplifyframework.storage.s3.transfer.S3StorageTransferClientProvider;
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider;
import com.amplifyframework.storage.s3.transfer.TransferObserver;
import com.amplifyframework.storage.s3.transfer.TransferRecord;
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater;
Expand Down Expand Up @@ -126,20 +127,33 @@ public final class AWSS3StoragePlugin extends StoragePlugin<S3Client> {

private static final int DEFAULT_URL_EXPIRATION_DAYS = 7;

private final StorageService.Factory storageServiceFactory;
private final AWSS3StorageService.Factory storageServiceFactory;
private final ExecutorService executorService;
private final AuthCredentialsProvider authCredentialsProvider;
private AuthCredentialsProvider authCredentialsProvider;
private final AWSS3StoragePluginConfiguration awsS3StoragePluginConfiguration;
private AWSS3StorageService defaultStorageService;
@SuppressWarnings("deprecation")
private StorageAccessLevel defaultAccessLevel;
private int defaultUrlExpiration;

private Map<String, AWSS3StorageService> awsS3StorageServicesByBucketName = new HashMap<>();
private final Map<String, AWSS3StorageService> awsS3StorageServicesByBucketName = new HashMap<>();
private Context context;
@SuppressLint("UnsafeOptInUsageError")
private List<AmplifyOutputsData.StorageBucket> configuredBuckets;

@SuppressLint("UnsafeOptInUsageError")
private StorageTransferClientProvider clientProvider = new S3StorageTransferClientProvider((region, bucketName) -> {
if (region != null && bucketName != null) {
StorageBucket bucket = StorageBucket.fromBucketInfo(new BucketInfo(region, bucketName));
return getAWSS3StorageService((ResolvedStorageBucket) bucket).getClient();
}

if (region != null) {
return AWSS3StorageService.getS3Client(region, authCredentialsProvider);
}
return defaultStorageService.getClient();
});

/**
* Constructs the AWS S3 Storage Plugin initializing the executor service.
*/
Expand All @@ -162,13 +176,14 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf

@VisibleForTesting
AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider) {
this((context, region, bucket) ->
this((context, region, bucket, clientProvider) ->
new AWSS3StorageService(
context,
region,
bucket,
authCredentialsProvider,
AWS_S3_STORAGE_PLUGIN_KEY
AWS_S3_STORAGE_PLUGIN_KEY,
clientProvider
),
authCredentialsProvider,
new AWSS3StoragePluginConfiguration.Builder().build());
Expand All @@ -177,21 +192,22 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf
@VisibleForTesting
AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider,
AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration) {
this((context, region, bucket) ->
this((context, region, bucket, clientProvider) ->
new AWSS3StorageService(
context,
region,
bucket,
authCredentialsProvider,
AWS_S3_STORAGE_PLUGIN_KEY
AWS_S3_STORAGE_PLUGIN_KEY,
clientProvider
),
authCredentialsProvider,
awss3StoragePluginConfiguration);
}

@VisibleForTesting
AWSS3StoragePlugin(
StorageService.Factory storageServiceFactory,
AWSS3StorageService.Factory storageServiceFactory,
AuthCredentialsProvider authCredentialsProvider,
AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration
) {
Expand Down Expand Up @@ -282,10 +298,11 @@ private void configure(
) throws StorageException {
try {
this.context = context;
this.defaultStorageService = (AWSS3StorageService) storageServiceFactory.create(
this.defaultStorageService = storageServiceFactory.create(
context,
region,
bucket.getBucketInfo().getName());
bucket.getBucketInfo().getName(),
clientProvider);
this.awsS3StorageServicesByBucketName.clear();
this.awsS3StorageServicesByBucketName.put(bucket.getBucketInfo().getName(), this.defaultStorageService);
} catch (RuntimeException exception) {
Expand Down Expand Up @@ -935,7 +952,8 @@ public StorageRemoveOperation<?> remove(

return operation;
}


@SuppressLint("UnsafeOptInUsageError")
@Override
@SuppressWarnings("deprecation")
public void getTransfer(
Expand All @@ -951,18 +969,23 @@ public void getTransfer(
transferRecord.getId(),
defaultStorageService.getTransferManager().getTransferStatusUpdater(),
transferRecord.getBucketName(),
transferRecord.getRegion(),
transferRecord.getKey(),
transferRecord.getFile(),
null,
transferRecord.getState() != null ? transferRecord.getState() : TransferState.UNKNOWN);
TransferType transferType = transferRecord.getType();

AWSS3StorageService storageService
= getAwss3StorageServiceFromTransferRecord(onError, transferRecord);

switch (Objects.requireNonNull(transferType)) {
case UPLOAD:
if (transferRecord.getFile().startsWith(TransferStatusUpdater.TEMP_FILE_PREFIX)) {
AWSS3StorageUploadInputStreamOperation operation =
new AWSS3StorageUploadInputStreamOperation(
transferId,
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -973,7 +996,7 @@ public void getTransfer(
AWSS3StorageUploadFileOperation operation =
new AWSS3StorageUploadFileOperation(
transferId,
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -987,7 +1010,7 @@ public void getTransfer(
downloadFileOperation = new AWSS3StorageDownloadFileOperation(
transferId,
new File(transferRecord.getFile()),
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -1009,6 +1032,25 @@ public void getTransfer(
});
}

private AWSS3StorageService getAwss3StorageServiceFromTransferRecord(
@NonNull Consumer<StorageException> onError,
TransferRecord transferRecord
) {
AWSS3StorageService storageService = defaultStorageService;
if (transferRecord.getRegion() != null && transferRecord.getBucketName() != null) {
try {
BucketInfo bucketInfo = new BucketInfo(
transferRecord.getBucketName(),
transferRecord.getRegion());
StorageBucket bucket = StorageBucket.fromBucketInfo(bucketInfo);
storageService = getStorageService(bucket);
} catch (StorageException exception) {
onError.accept(exception);
}
}
return storageService;
}

@NonNull
@SuppressWarnings("deprecation")
@Override
Expand Down Expand Up @@ -1133,7 +1175,7 @@ private AWSS3StorageService getAWSS3StorageService(OutputsStorageBucket outputsS
AWSS3StorageService service = awsS3StorageServicesByBucketName.get(bucketName);
if (service == null) {
String region = configuredBucket.getAwsRegion();
service = (AWSS3StorageService) storageServiceFactory.create(context, region, bucketName);
service = storageServiceFactory.create(context, region, bucketName, clientProvider);
awsS3StorageServicesByBucketName.put(bucketName, service);
}

Expand All @@ -1150,7 +1192,7 @@ private AWSS3StorageService getAWSS3StorageService(ResolvedStorageBucket resolve
AWSS3StorageService service = awsS3StorageServicesByBucketName.get(bucketName);
if (service == null) {
String region = resolvedStorageBucket.getBucketInfo().getRegion();
service = (AWSS3StorageService) storageServiceFactory.create(context, region, bucketName);
service = storageServiceFactory.create(context, region, bucketName, clientProvider);
awsS3StorageServicesByBucketName.put(bucketName, service);
}
return service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ internal object TransferOperations {
transferRecord.id,
transferStatusUpdater,
transferRecord.bucketName,
transferRecord.region,
transferRecord.key,
transferRecord.file,
listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amplifyframework.storage.StorageItem
import com.amplifyframework.storage.options.SubpathStrategy
import com.amplifyframework.storage.options.SubpathStrategy.Exclude
import com.amplifyframework.storage.result.StorageListResult
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider
import com.amplifyframework.storage.s3.transfer.TransferManager
import com.amplifyframework.storage.s3.transfer.TransferObserver
import com.amplifyframework.storage.s3.transfer.TransferRecord
Expand All @@ -46,7 +47,6 @@ import java.util.Date
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.runBlocking

/**
* A representation of an S3 backend service endpoint.
*/
Expand All @@ -55,16 +55,27 @@ internal class AWSS3StorageService(
private val awsRegion: String,
private val s3BucketName: String,
private val authCredentialsProvider: AuthCredentialsProvider,
private val awsS3StoragePluginKey: String
private val awsS3StoragePluginKey: String,
private val clientProvider: StorageTransferClientProvider
) : StorageService {

companion object {
@JvmStatic
fun getS3Client(region: String, authCredentialsProvider: AuthCredentialsProvider): S3Client {
return S3Client {
this.region = region
this.credentialsProvider = authCredentialsProvider
}
}
}

private var s3Client: S3Client = S3Client {
region = awsRegion
credentialsProvider = authCredentialsProvider
}

val transferManager: TransferManager =
TransferManager(context, s3Client, awsS3StoragePluginKey)
TransferManager(context, clientProvider, awsS3StoragePluginKey)

/**
* Generate pre-signed URL for an object.
Expand Down Expand Up @@ -130,6 +141,7 @@ internal class AWSS3StorageService(
return transferManager.download(
transferId,
s3BucketName,
awsRegion,
serviceKey,
file,
useAccelerateEndpoint = useAccelerateEndpoint
Expand All @@ -153,6 +165,7 @@ internal class AWSS3StorageService(
return transferManager.upload(
transferId,
s3BucketName,
awsRegion,
serviceKey,
file,
metadata,
Expand All @@ -175,7 +188,7 @@ internal class AWSS3StorageService(
metadata: ObjectMetadata,
useAccelerateEndpoint: Boolean
): TransferObserver {
val uploadOptions = UploadOptions(s3BucketName, metadata)
val uploadOptions = UploadOptions(s3BucketName, awsRegion, metadata)
return transferManager.upload(transferId, serviceKey, inputStream, uploadOptions, useAccelerateEndpoint)
}

Expand Down Expand Up @@ -420,4 +433,21 @@ internal class AWSS3StorageService(
fun getClient(): S3Client {
return s3Client
}

interface Factory {
/**
* Factory interface to instantiate [StorageService] object.
*
* @param context Android context
* @param region S3 bucket region
* @param bucketName Name of the bucket where the items are stored
* @return An instantiated storage service instance
*/
fun create(
context: Context,
region: String,
bucketName: String,
clientProvider: StorageTransferClientProvider
): AWSS3StorageService
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.storage.s3.transfer

import aws.sdk.kotlin.services.s3.S3Client
import com.amplifyframework.auth.AuthCredentialsProvider
import com.amplifyframework.storage.StorageException

internal class S3StorageTransferClientProvider(
private val createS3Client: (region: String?, bucketName: String?) -> S3Client
) : StorageTransferClientProvider {
override fun getStorageTransferClient(region: String?, bucketName: String?): S3Client {
return createS3Client(region, bucketName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.storage.s3.transfer

import aws.sdk.kotlin.services.s3.S3Client
import com.amplifyframework.annotations.InternalApiWarning

@InternalApiWarning
internal interface StorageTransferClientProvider {
fun getStorageTransferClient(region: String?, bucketName: String?): S3Client
}
Loading

0 comments on commit 764b9a9

Please sign in to comment.