Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): update multipart operations to support multiple buckets/regions #2899

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.junit.Test

open class TransferDBTest {
private val bucketName = "bucket_name"
private val region = "us-east-1"
private val fileKey = "file_key"
private lateinit var transferDB: TransferDB
private lateinit var tempFile: File
Expand All @@ -55,6 +56,7 @@ open class TransferDBTest {
transferId,
TransferType.UPLOAD,
bucketName,
region,
fileKey,
tempFile,
null,
Expand All @@ -67,6 +69,7 @@ open class TransferDBTest {
Assert.assertEquals(tempFile, File(this.file))
Assert.assertEquals(fileKey, this.key)
Assert.assertEquals(bucketName, this.bucketName)
Assert.assertEquals(region, this.region)
} ?: Assert.fail("InsertedRecord is null")
}

Expand All @@ -76,6 +79,7 @@ open class TransferDBTest {
val uri = transferDB.insertMultipartUploadRecord(
uploadID,
bucketName,
region,
fileKey,
tempFile,
1L,
Expand All @@ -91,6 +95,7 @@ open class TransferDBTest {
Assert.assertEquals(fileKey, this.key)
Assert.assertEquals(bucketName, this.bucketName)
Assert.assertEquals(uploadID, this.multipartId)
Assert.assertEquals(region, this.region)
} ?: Assert.fail("InsertedRecord is null")
}

Expand All @@ -104,6 +109,7 @@ open class TransferDBTest {
contentValues[0] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand Down Expand Up @@ -137,6 +143,7 @@ open class TransferDBTest {
contentValues[0] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand All @@ -151,6 +158,7 @@ open class TransferDBTest {
contentValues[1] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand All @@ -165,6 +173,7 @@ open class TransferDBTest {
contentValues[2] = transferDB.generateContentValuesForMultiPartUpload(
key,
bucketName,
region,
key,
tempFile,
0L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
import com.amplifyframework.storage.s3.request.AWSS3StorageRemoveRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest;
import com.amplifyframework.storage.s3.service.AWSS3StorageService;
import com.amplifyframework.storage.s3.service.StorageService;
import com.amplifyframework.storage.s3.transfer.S3StorageTransferClientProvider;
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider;
import com.amplifyframework.storage.s3.transfer.TransferObserver;
import com.amplifyframework.storage.s3.transfer.TransferRecord;
import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater;
Expand Down Expand Up @@ -126,20 +127,36 @@ public final class AWSS3StoragePlugin extends StoragePlugin<S3Client> {

private static final int DEFAULT_URL_EXPIRATION_DAYS = 7;

private final StorageService.Factory storageServiceFactory;
private final AWSS3StorageService.Factory storageServiceFactory;
private final ExecutorService executorService;
private final AuthCredentialsProvider authCredentialsProvider;
private AuthCredentialsProvider authCredentialsProvider;
private final AWSS3StoragePluginConfiguration awsS3StoragePluginConfiguration;
private AWSS3StorageService defaultStorageService;
@SuppressWarnings("deprecation")
private StorageAccessLevel defaultAccessLevel;
private int defaultUrlExpiration;

private Map<String, AWSS3StorageService> awsS3StorageServicesByBucketName = new HashMap<>();
private final Map<String, AWSS3StorageService> awsS3StorageServicesByBucketName = new HashMap<>();
private Context context;
@SuppressLint("UnsafeOptInUsageError")
private List<AmplifyOutputsData.StorageBucket> configuredBuckets;

@SuppressLint("UnsafeOptInUsageError")
private StorageTransferClientProvider clientProvider
= new S3StorageTransferClientProvider((region, bucketName) -> {
if (region != null && bucketName != null) {
StorageBucket bucket = StorageBucket.fromBucketInfo(new BucketInfo(bucketName, region));
return getAWSS3StorageService((ResolvedStorageBucket) bucket).getClient();
}

if (region != null) {
// unable to create a new S3Client from java code,
// redirecting to AWSS3Service to create S3 Client from kotlin
phantumcode marked this conversation as resolved.
Show resolved Hide resolved
return AWSS3StorageService.getS3Client(region, authCredentialsProvider);
}
return defaultStorageService.getClient();
});

/**
* Constructs the AWS S3 Storage Plugin initializing the executor service.
*/
Expand All @@ -162,13 +179,14 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf

@VisibleForTesting
AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider) {
this((context, region, bucket) ->
this((context, region, bucket, clientProvider) ->
new AWSS3StorageService(
context,
region,
bucket,
authCredentialsProvider,
AWS_S3_STORAGE_PLUGIN_KEY
AWS_S3_STORAGE_PLUGIN_KEY,
clientProvider
),
authCredentialsProvider,
new AWSS3StoragePluginConfiguration.Builder().build());
Expand All @@ -177,21 +195,22 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf
@VisibleForTesting
AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider,
AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration) {
this((context, region, bucket) ->
this((context, region, bucket, clientProvider) ->
new AWSS3StorageService(
context,
region,
bucket,
authCredentialsProvider,
AWS_S3_STORAGE_PLUGIN_KEY
AWS_S3_STORAGE_PLUGIN_KEY,
clientProvider
),
authCredentialsProvider,
awss3StoragePluginConfiguration);
}

@VisibleForTesting
AWSS3StoragePlugin(
StorageService.Factory storageServiceFactory,
AWSS3StorageService.Factory storageServiceFactory,
AuthCredentialsProvider authCredentialsProvider,
AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration
) {
Expand Down Expand Up @@ -282,10 +301,11 @@ private void configure(
) throws StorageException {
try {
this.context = context;
this.defaultStorageService = (AWSS3StorageService) storageServiceFactory.create(
this.defaultStorageService = storageServiceFactory.create(
context,
region,
bucket.getBucketInfo().getName());
bucket.getBucketInfo().getName(),
clientProvider);
this.awsS3StorageServicesByBucketName.clear();
this.awsS3StorageServicesByBucketName.put(bucket.getBucketInfo().getName(), this.defaultStorageService);
} catch (RuntimeException exception) {
Expand Down Expand Up @@ -935,7 +955,8 @@ public StorageRemoveOperation<?> remove(

return operation;
}


@SuppressLint("UnsafeOptInUsageError")
@Override
@SuppressWarnings("deprecation")
public void getTransfer(
Expand All @@ -951,18 +972,23 @@ public void getTransfer(
transferRecord.getId(),
defaultStorageService.getTransferManager().getTransferStatusUpdater(),
transferRecord.getBucketName(),
transferRecord.getRegion(),
transferRecord.getKey(),
transferRecord.getFile(),
null,
transferRecord.getState() != null ? transferRecord.getState() : TransferState.UNKNOWN);
TransferType transferType = transferRecord.getType();

AWSS3StorageService storageService
= getAwss3StorageServiceFromTransferRecord(onError, transferRecord);

switch (Objects.requireNonNull(transferType)) {
case UPLOAD:
if (transferRecord.getFile().startsWith(TransferStatusUpdater.TEMP_FILE_PREFIX)) {
AWSS3StorageUploadInputStreamOperation operation =
new AWSS3StorageUploadInputStreamOperation(
transferId,
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -973,7 +999,7 @@ public void getTransfer(
AWSS3StorageUploadFileOperation operation =
new AWSS3StorageUploadFileOperation(
transferId,
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -987,7 +1013,7 @@ public void getTransfer(
downloadFileOperation = new AWSS3StorageDownloadFileOperation(
transferId,
new File(transferRecord.getFile()),
defaultStorageService,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
Expand All @@ -1009,6 +1035,25 @@ public void getTransfer(
});
}

private AWSS3StorageService getAwss3StorageServiceFromTransferRecord(
@NonNull Consumer<StorageException> onError,
TransferRecord transferRecord
) {
AWSS3StorageService storageService = defaultStorageService;
if (transferRecord.getRegion() != null && transferRecord.getBucketName() != null) {
try {
BucketInfo bucketInfo = new BucketInfo(
transferRecord.getBucketName(),
transferRecord.getRegion());
StorageBucket bucket = StorageBucket.fromBucketInfo(bucketInfo);
storageService = getStorageService(bucket);
} catch (StorageException exception) {
onError.accept(exception);
}
}
return storageService;
}

@NonNull
@SuppressWarnings("deprecation")
@Override
Expand Down Expand Up @@ -1133,7 +1178,7 @@ private AWSS3StorageService getAWSS3StorageService(OutputsStorageBucket outputsS
AWSS3StorageService service = awsS3StorageServicesByBucketName.get(bucketName);
if (service == null) {
String region = configuredBucket.getAwsRegion();
service = (AWSS3StorageService) storageServiceFactory.create(context, region, bucketName);
service = storageServiceFactory.create(context, region, bucketName, clientProvider);
awsS3StorageServicesByBucketName.put(bucketName, service);
}

Expand All @@ -1150,7 +1195,7 @@ private AWSS3StorageService getAWSS3StorageService(ResolvedStorageBucket resolve
AWSS3StorageService service = awsS3StorageServicesByBucketName.get(bucketName);
if (service == null) {
String region = resolvedStorageBucket.getBucketInfo().getRegion();
service = (AWSS3StorageService) storageServiceFactory.create(context, region, bucketName);
service = storageServiceFactory.create(context, region, bucketName, clientProvider);
awsS3StorageServicesByBucketName.put(bucketName, service);
}
return service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ internal object TransferOperations {
transferRecord.id,
transferStatusUpdater,
transferRecord.bucketName,
transferRecord.region,
transferRecord.key,
transferRecord.file,
listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.amplifyframework.storage.StorageItem
import com.amplifyframework.storage.options.SubpathStrategy
import com.amplifyframework.storage.options.SubpathStrategy.Exclude
import com.amplifyframework.storage.result.StorageListResult
import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider
import com.amplifyframework.storage.s3.transfer.TransferManager
import com.amplifyframework.storage.s3.transfer.TransferObserver
import com.amplifyframework.storage.s3.transfer.TransferRecord
Expand All @@ -46,7 +47,6 @@ import java.util.Date
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.runBlocking

/**
* A representation of an S3 backend service endpoint.
*/
Expand All @@ -55,16 +55,27 @@ internal class AWSS3StorageService(
private val awsRegion: String,
private val s3BucketName: String,
private val authCredentialsProvider: AuthCredentialsProvider,
private val awsS3StoragePluginKey: String
private val awsS3StoragePluginKey: String,
private val clientProvider: StorageTransferClientProvider
) : StorageService {

companion object {
@JvmStatic
fun getS3Client(region: String, authCredentialsProvider: AuthCredentialsProvider): S3Client {
phantumcode marked this conversation as resolved.
Show resolved Hide resolved
return S3Client {
this.region = region
this.credentialsProvider = authCredentialsProvider
}
}
}

private var s3Client: S3Client = S3Client {
region = awsRegion
credentialsProvider = authCredentialsProvider
}

val transferManager: TransferManager =
TransferManager(context, s3Client, awsS3StoragePluginKey)
TransferManager(context, clientProvider, awsS3StoragePluginKey)

/**
* Generate pre-signed URL for an object.
Expand Down Expand Up @@ -130,6 +141,7 @@ internal class AWSS3StorageService(
return transferManager.download(
transferId,
s3BucketName,
awsRegion,
serviceKey,
file,
useAccelerateEndpoint = useAccelerateEndpoint
Expand All @@ -153,6 +165,7 @@ internal class AWSS3StorageService(
return transferManager.upload(
transferId,
s3BucketName,
awsRegion,
serviceKey,
file,
metadata,
Expand All @@ -175,7 +188,7 @@ internal class AWSS3StorageService(
metadata: ObjectMetadata,
useAccelerateEndpoint: Boolean
): TransferObserver {
val uploadOptions = UploadOptions(s3BucketName, metadata)
val uploadOptions = UploadOptions(s3BucketName, awsRegion, metadata)
return transferManager.upload(transferId, serviceKey, inputStream, uploadOptions, useAccelerateEndpoint)
}

Expand Down Expand Up @@ -420,4 +433,21 @@ internal class AWSS3StorageService(
fun getClient(): S3Client {
return s3Client
}

interface Factory {
/**
* Factory interface to instantiate [StorageService] object.
*
* @param context Android context
* @param region S3 bucket region
* @param bucketName Name of the bucket where the items are stored
* @return An instantiated storage service instance
*/
fun create(
context: Context,
region: String,
bucketName: String,
clientProvider: StorageTransferClientProvider
): AWSS3StorageService
}
}
Loading