Skip to content

Commit

Permalink
feat(storage): Gen2 Storage Download/Upload (#2741)
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerjroach authored Apr 4, 2024
1 parent 8b1d6f1 commit 63bc45d
Show file tree
Hide file tree
Showing 23 changed files with 1,788 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.storage.s3

import android.content.Context
import androidx.test.core.app.ApplicationProvider
import com.amplifyframework.auth.AuthPlugin
import com.amplifyframework.auth.cognito.AWSCognitoAuthPlugin
import com.amplifyframework.core.Amplify
import com.amplifyframework.core.async.Cancelable
import com.amplifyframework.core.async.Resumable
import com.amplifyframework.hub.HubChannel
import com.amplifyframework.hub.HubEvent
import com.amplifyframework.hub.SubscriptionToken
import com.amplifyframework.storage.StorageCategory
import com.amplifyframework.storage.StorageChannelEventName
import com.amplifyframework.storage.StoragePath
import com.amplifyframework.storage.TransferState
import com.amplifyframework.storage.TransferState.Companion.getState
import com.amplifyframework.storage.operation.StorageDownloadFileOperation
import com.amplifyframework.storage.options.StorageDownloadFileOptions
import com.amplifyframework.storage.options.StorageUploadFileOptions
import com.amplifyframework.storage.s3.options.AWSS3StorageDownloadFileOptions
import com.amplifyframework.storage.s3.test.R
import com.amplifyframework.storage.s3.util.WorkmanagerTestUtils.initializeWorkmanagerTestUtil
import com.amplifyframework.testutils.FileAssert
import com.amplifyframework.testutils.random.RandomTempFile
import com.amplifyframework.testutils.sync.SynchronousAuth
import com.amplifyframework.testutils.sync.SynchronousStorage
import java.io.File
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import org.junit.After
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.BeforeClass
import org.junit.Test

/**
* Instrumentation test for operational work on download.
*/
class AWSS3StoragePathDownloadTest {
// Create a file to download to
private val downloadFile: File = RandomTempFile()
private val options = StorageDownloadFileOptions.defaultInstance()
// Create a set to remember all the subscriptions
private val subscriptions = mutableSetOf<SubscriptionToken>()

companion object {
private val EXTENDED_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60)
private const val LARGE_FILE_SIZE = 10 * 1024 * 1024L // 10 MB
private const val SMALL_FILE_SIZE = 100L
private val LARGE_FILE_NAME = "large-${System.currentTimeMillis()}"
private val LARGE_FILE_PATH = StoragePath.fromString("public/$LARGE_FILE_NAME")
private val SMALL_FILE_NAME = "small-${System.currentTimeMillis()}"
private val SMALL_FILE_PATH = StoragePath.fromString("public/$SMALL_FILE_NAME")

lateinit var storageCategory: StorageCategory
lateinit var synchronousStorage: SynchronousStorage
lateinit var largeFile: File
lateinit var smallFile: File

/**
* Initialize mobile client and configure the storage.
* Upload the test files ahead of time.
*/
@JvmStatic
@BeforeClass
fun setUpOnce() {
val context = ApplicationProvider.getApplicationContext<Context>()
initializeWorkmanagerTestUtil(context)
SynchronousAuth.delegatingToCognito(context, AWSCognitoAuthPlugin() as AuthPlugin<*>)

// Get a handle to storage
storageCategory = TestStorageCategory.create(context, R.raw.amplifyconfiguration)
synchronousStorage = SynchronousStorage.delegatingTo(storageCategory)

val uploadOptions = StorageUploadFileOptions.defaultInstance()

// Upload large test file
largeFile = RandomTempFile(LARGE_FILE_NAME, LARGE_FILE_SIZE)
synchronousStorage.uploadFile(LARGE_FILE_PATH, largeFile, uploadOptions, EXTENDED_TIMEOUT_MS)

// Upload small test file
smallFile = RandomTempFile(SMALL_FILE_NAME, SMALL_FILE_SIZE)
synchronousStorage.uploadFile(SMALL_FILE_PATH, smallFile, uploadOptions)
}
}

/**
* Unsubscribe from everything after each test.
*/
@After
fun tearDown() {
// Unsubscribe from everything
for (token in subscriptions) {
Amplify.Hub.unsubscribe(token)
}
}

@Test
fun testDownloadSmallFile() {
synchronousStorage.downloadFile(SMALL_FILE_PATH, downloadFile, options)
FileAssert.assertEquals(smallFile, downloadFile)
}

@Test
fun testDownloadLargeFile() {
synchronousStorage.downloadFile(
LARGE_FILE_PATH,
downloadFile,
options,
EXTENDED_TIMEOUT_MS
)
FileAssert.assertEquals(largeFile, downloadFile)
}

@Test
fun testDownloadFileIsCancelable() {
val canceled = CountDownLatch(1)
val opContainer = AtomicReference<Cancelable>()
val errorContainer = AtomicReference<Throwable>()

// Listen to Hub events for cancel
val cancelToken = Amplify.Hub.subscribe(HubChannel.STORAGE) { hubEvent: HubEvent<*> ->
if (StorageChannelEventName.DOWNLOAD_STATE.toString() == hubEvent.name) {
val state = getState(hubEvent.data as String)
if (TransferState.CANCELED == state) {
canceled.countDown()
}
}
}
subscriptions.add(cancelToken)

// Begin downloading a large file
val op = storageCategory.downloadFile(
LARGE_FILE_PATH,
downloadFile,
options,
{
if (it.currentBytes > 0 && canceled.count > 0) {
opContainer.get().cancel()
}
},
{ errorContainer.set(RuntimeException("Download completed without canceling.")) },
{ newValue -> errorContainer.set(newValue) }
)
opContainer.set(op)

// Assert that the required conditions have been met
assertTrue(canceled.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS))
assertNull(errorContainer.get())
}

@Test
fun testDownloadFileIsResumable() {
val completed = CountDownLatch(1)
val resumed = CountDownLatch(1)
val opContainer = AtomicReference<Resumable>()
val errorContainer = AtomicReference<Throwable>()

// Listen to Hub events to resume when operation has been paused
val resumeToken = Amplify.Hub.subscribe(HubChannel.STORAGE) { hubEvent: HubEvent<*> ->
if (StorageChannelEventName.DOWNLOAD_STATE.toString() == hubEvent.name) {
val state = getState(hubEvent.data as String)
if (TransferState.PAUSED == state) {
opContainer.get().resume()
resumed.countDown()
}
}
}
subscriptions.add(resumeToken)

// Begin downloading a large file
val op = storageCategory.downloadFile(
LARGE_FILE_PATH,
downloadFile,
options,
{
if (it.currentBytes > 0 && resumed.count > 0) {
opContainer.get().pause()
}
},
{ completed.countDown() },
{ errorContainer.set(it) }
)
opContainer.set(op)

// Assert that all the required conditions have been met
assertTrue(resumed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS))
assertTrue(completed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS))
assertNull(errorContainer.get())
FileAssert.assertEquals(largeFile, downloadFile)
}

@Test
fun testGetTransferOnPause() {
val completed = CountDownLatch(1)
val resumed = CountDownLatch(1)
val opContainer = AtomicReference<StorageDownloadFileOperation<*>>()
val transferId = AtomicReference<String>()
val errorContainer = AtomicReference<Throwable>()
// Listen to Hub events to resume when operation has been paused
val resumeToken = Amplify.Hub.subscribe(HubChannel.STORAGE) { hubEvent: HubEvent<*> ->
if (StorageChannelEventName.DOWNLOAD_STATE.toString() == hubEvent.name) {
val state = getState(hubEvent.data as String)
if (TransferState.PAUSED == state) {
opContainer.get().clearAllListeners()
storageCategory.getTransfer(
transferId.get(),
{
val getOp = it as StorageDownloadFileOperation<*>
getOp.resume()
resumed.countDown()
getOp.setOnSuccess { completed.countDown() }
},
{ errorContainer.set(it) }
)
}
}
}
subscriptions.add(resumeToken)

// Begin downloading a large file
val op = storageCategory.downloadFile(
LARGE_FILE_PATH,
downloadFile,
options,
{
if (it.currentBytes > 0 && resumed.count > 0) {
opContainer.get().pause()
}
},
{ },
{ errorContainer.set(it) }
)

opContainer.set(op)
transferId.set(op.transferId)

// Assert that all the required conditions have been met
assertTrue(resumed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS))
assertTrue(completed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS))
assertNull(errorContainer.get())
FileAssert.assertEquals(largeFile, downloadFile)
}

@Test
fun testDownloadLargeFileWithAccelerationEnabled() {
val awsS3Options = AWSS3StorageDownloadFileOptions.builder().setUseAccelerateEndpoint(true).build()
synchronousStorage.downloadFile(
LARGE_FILE_PATH,
downloadFile,
awsS3Options,
EXTENDED_TIMEOUT_MS
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import com.amplifyframework.storage.s3.operation.AWSS3StorageDownloadFileOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StorageGetPresignedUrlOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StorageListOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StoragePathDownloadFileOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StoragePathUploadFileOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StoragePathUploadInputStreamOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StorageRemoveOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StorageUploadFileOperation;
import com.amplifyframework.storage.s3.operation.AWSS3StorageUploadInputStreamOperation;
Expand All @@ -68,6 +71,8 @@
import com.amplifyframework.storage.s3.request.AWSS3StorageDownloadFileRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageGetPresignedUrlRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageListRequest;
import com.amplifyframework.storage.s3.request.AWSS3StoragePathDownloadFileRequest;
import com.amplifyframework.storage.s3.request.AWSS3StoragePathUploadRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageRemoveRequest;
import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest;
import com.amplifyframework.storage.s3.service.AWSS3StorageService;
Expand Down Expand Up @@ -271,7 +276,7 @@ public StorageGetUrlOperation<?> getUrl(
) {
return getUrl(path, StorageGetUrlOptions.defaultInstance(), onSuccess, onError);
}

@NonNull
@Override
@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -419,8 +424,28 @@ public StorageDownloadFileOperation<?> downloadFile(
@NonNull Consumer<StorageDownloadFileResult> onSuccess,
@NonNull Consumer<StorageException> onError
) {
// TODO
return null;
boolean useAccelerateEndpoint =
options instanceof AWSS3StorageDownloadFileOptions &&
((AWSS3StorageDownloadFileOptions) options).useAccelerateEndpoint();

AWSS3StoragePathDownloadFileRequest request = new AWSS3StoragePathDownloadFileRequest(
path,
local,
useAccelerateEndpoint
);

AWSS3StoragePathDownloadFileOperation operation = new AWSS3StoragePathDownloadFileOperation(
request,
storageService,
executorService,
authCredentialsProvider,
onProgress,
onSuccess,
onError
);
operation.start();

return operation;
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -526,8 +551,31 @@ public StorageUploadFileOperation<?> uploadFile(
@NonNull Consumer<StorageUploadFileResult> onSuccess,
@NonNull Consumer<StorageException> onError
) {
// TODO: Implement
return null;
boolean useAccelerateEndpoint = options instanceof AWSS3StorageUploadFileOptions &&
((AWSS3StorageUploadFileOptions) options).useAccelerateEndpoint();
AWSS3StoragePathUploadRequest<File> request = new AWSS3StoragePathUploadRequest<>(
path,
local,
options.getContentType(),
options instanceof AWSS3StorageUploadFileOptions
? ((AWSS3StorageUploadFileOptions) options).getServerSideEncryption()
: ServerSideEncryption.NONE,
options.getMetadata(),
useAccelerateEndpoint
);

AWSS3StoragePathUploadFileOperation operation = new AWSS3StoragePathUploadFileOperation(
request,
storageService,
executorService,
authCredentialsProvider,
onProgress,
onSuccess,
onError
);
operation.start();

return operation;
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -631,8 +679,32 @@ public StorageUploadInputStreamOperation<?> uploadInputStream(
@NonNull Consumer<StorageUploadInputStreamResult> onSuccess,
@NonNull Consumer<StorageException> onError
) {
// TODO: Implement
return null;
boolean useAccelerateEndpoint = options instanceof AWSS3StorageUploadInputStreamOptions &&
((AWSS3StorageUploadInputStreamOptions) options).useAccelerateEndpoint();
AWSS3StoragePathUploadRequest<InputStream> request = new AWSS3StoragePathUploadRequest<>(
path,
local,
options.getContentType(),
options instanceof AWSS3StorageUploadInputStreamOptions
? ((AWSS3StorageUploadInputStreamOptions) options).getServerSideEncryption()
: ServerSideEncryption.NONE,
options.getMetadata(),
useAccelerateEndpoint
);

AWSS3StoragePathUploadInputStreamOperation operation =
new AWSS3StoragePathUploadInputStreamOperation(
request,
storageService,
executorService,
authCredentialsProvider,
onProgress,
onSuccess,
onError
);
operation.start();

return operation;
}

@SuppressWarnings("deprecation")
Expand Down
Loading

0 comments on commit 63bc45d

Please sign in to comment.