Skip to content

Commit

Permalink
feat(Storage): Implementing support for multiple buckets (#3817)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruisebas authored Aug 19, 2024
1 parent f012197 commit 0425e51
Show file tree
Hide file tree
Showing 21 changed files with 723 additions and 60 deletions.
19 changes: 19 additions & 0 deletions Amplify/Core/Configuration/AmplifyOutputsData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ public struct AmplifyOutputsData: Codable {
public struct Storage: Codable {
public let awsRegion: AWSRegion
public let bucketName: String
public let buckets: [Bucket]?

@_spi(InternalAmplifyConfiguration)
public struct Bucket: Codable {
public let name: String
public let bucketName: String
public let awsRegion: AWSRegion
}

// Internal init used for testing
init(
awsRegion: AWSRegion,
bucketName: String,
buckets: [Bucket]? = nil
) {
self.awsRegion = awsRegion
self.bucketName = bucketName
self.buckets = buckets
}
}

@_spi(InternalAmplifyConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extension AWSS3StoragePlugin {
let prefix = try await prefixResolver.resolvePrefix(for: options.accessLevel,
targetIdentityId: options.targetIdentityId)
let serviceKey = prefix + request.key

let storageService = try storageService(for: options.bucket)
if let pluginOptions = options.pluginOptions as? AWSStorageGetURLOptions, pluginOptions.validateObjectExistence {
try await storageService.validateObjectExistence(serviceKey: serviceKey)
}
Expand All @@ -51,6 +53,7 @@ extension AWSS3StoragePlugin {
) async throws -> URL {
let options = options ?? StorageGetURLRequest.Options()
let request = StorageGetURLRequest(path: path, options: options)
let storageService = try storageService(for: options.bucket)
let task = AWSS3StorageGetURLTask(
request,
storageBehaviour: storageService)
Expand All @@ -65,7 +68,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadDataRequest(path: path, options: options)
let operation = AWSS3StorageDownloadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -82,7 +85,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadDataRequest(key: key, options: options)
let operation = AWSS3StorageDownloadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -100,7 +103,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadFileRequest(key: key, local: local, options: options)
let operation = AWSS3StorageDownloadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -118,7 +121,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadFileRequest(path: path, local: local, options: options)
let operation = AWSS3StorageDownloadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -136,7 +139,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadDataRequest(key: key, data: data, options: options)
let operation = AWSS3StorageUploadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -154,7 +157,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadDataRequest(path: path, data: data, options: options)
let operation = AWSS3StorageUploadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -172,7 +175,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadFileRequest(key: key, local: local, options: options)
let operation = AWSS3StorageUploadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -190,7 +193,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadFileRequest(path: path, local: local, options: options)
let operation = AWSS3StorageUploadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -205,6 +208,7 @@ extension AWSS3StoragePlugin {
) async throws -> String {
let options = options ?? StorageRemoveRequest.Options()
let request = StorageRemoveRequest(key: key, options: options)
let storageService = try storageService(for: options.bucket)
let operation = AWSS3StorageRemoveOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
Expand All @@ -222,6 +226,7 @@ extension AWSS3StoragePlugin {
) async throws -> String {
let options = options ?? StorageRemoveRequest.Options()
let request = StorageRemoveRequest(path: path, options: options)
let storageService = try storageService(for: options.bucket)
let task = AWSS3StorageRemoveTask(
request,
storageConfiguration: storageConfiguration,
Expand All @@ -233,6 +238,7 @@ extension AWSS3StoragePlugin {
options: StorageListRequest.Options? = nil
) async throws -> StorageListResult {
let options = options ?? StorageListRequest.Options()
let storageService = try storageService(for: options.bucket)
let prefixResolver = storageConfiguration.prefixResolver ?? StorageAccessLevelAwarePrefixResolver(authService: authService)
let prefix = try await prefixResolver.resolvePrefix(for: options.accessLevel, targetIdentityId: options.targetIdentityId)
let result = try await storageService.list(prefix: prefix, options: options)
Expand All @@ -250,6 +256,7 @@ extension AWSS3StoragePlugin {
) async throws -> StorageListResult {
let options = options ?? StorageListRequest.Options()
let request = StorageListRequest(path: path, options: options)
let storageService = try storageService(for: options.bucket)
let task = AWSS3StorageListObjectsTask(
request,
storageConfiguration: storageConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ extension AWSS3StoragePlugin {
///
/// - Tag: AWSS3StoragePlugin.getEscapeHatch
public func getEscapeHatch() -> S3Client {
return storageService.getEscapeHatch()
return defaultStorageService.getEscapeHatch()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import Foundation
@_spi(InternalAmplifyConfiguration) import Amplify
import AWSPluginsCore
import InternalAmplifyCredentials

extension AWSS3StoragePlugin {

Expand All @@ -27,6 +28,7 @@ extension AWSS3StoragePlugin {
let configClosures: ConfigurationClosures
if let config = configuration as? AmplifyOutputsData {
configClosures = try retrieveConfiguration(config)
additionalBucketsByName = retrieveAdditionalBucketsByName(from: config.storage)
} else if let config = configuration as? JSONValue {
configClosures = try retrieveConfiguration(config)
} else {
Expand All @@ -38,15 +40,24 @@ extension AWSS3StoragePlugin {
do {
let authService = AWSAuthService()
let defaultAccessLevel = try configClosures.retrieveDefaultAccessLevel()
let storageService = try AWSS3StorageService(authService: authService,
region: configClosures.retrieveRegion(),
bucket: configClosures.retrieveBucket(),
httpClientEngineProxy: self.httpClientEngineProxy)
storageService.urlRequestDelegate = self.urlRequestDelegate

configure(storageService: storageService,
authService: authService,
defaultAccessLevel: defaultAccessLevel)
let defaultBucket: ResolvedStorageBucket = try .fromBucketInfo(
.init(
bucketName: configClosures.retrieveBucket(),
region: configClosures.retrieveRegion()
)
)

let storageService = try createStorageService(
authService: authService,
bucketInfo: defaultBucket.bucketInfo
)

configure(
defaultBucket: defaultBucket,
storageService: storageService,
authService: authService,
defaultAccessLevel: defaultAccessLevel
)
} catch let storageError as StorageError {
throw storageError
} catch {
Expand All @@ -67,18 +78,38 @@ extension AWSS3StoragePlugin {
/// Called from the configure method which implements the Plugin protocol. Useful for testing by passing in mocks.
///
/// - Parameters:
/// - storageService: The S3 storage service object.
/// - defaultBucket: The bucket to be used for all API calls by default.
/// - storageService: The S3 storage service object associated with the default bucket
/// - authService: The authentication service object.
/// - defaultAccessLevel: The access level to be used for all API calls by default.
/// - queue: The queue which operations are stored and dispatched for asychronous processing.
func configure(storageService: AWSS3StorageServiceBehavior,
authService: AWSAuthServiceBehavior,
defaultAccessLevel: StorageAccessLevel,
queue: OperationQueue = OperationQueue()) {
self.storageService = storageService
func configure(
defaultBucket: ResolvedStorageBucket,
storageService: AWSS3StorageServiceBehavior,
authService: AWSAuthCredentialsProviderBehavior,
defaultAccessLevel: StorageAccessLevel,
queue: OperationQueue = OperationQueue()
) {
self.defaultBucket = defaultBucket
self.authService = authService
self.queue = queue
self.defaultAccessLevel = defaultAccessLevel
self.storageServicesByBucket[defaultBucket.bucketInfo.bucketName] = storageService
}

/// Creates a new AWSS3StorageServiceBehavior for the given BucketInfo
func createStorageService(
authService: AWSAuthCredentialsProviderBehavior,
bucketInfo: BucketInfo
) throws -> AWSS3StorageServiceBehavior {
let storageService = try AWSS3StorageService(
authService: authService,
region: bucketInfo.region,
bucket: bucketInfo.bucketName,
httpClientEngineProxy: httpClientEngineProxy
)
storageService.urlRequestDelegate = urlRequestDelegate
return storageService
}

// MARK: Private helper methods
Expand Down Expand Up @@ -127,6 +158,21 @@ extension AWSS3StoragePlugin {
retrieveDefaultAccessLevel: defaultAccessLevelClosure)
}

/// Retrieves the configured buckets from the configuration grouped by their names.
/// If no buckets are provided in the configuration, an empty dictionary is returned instead.
private func retrieveAdditionalBucketsByName(
from configuration: AmplifyOutputsData.Storage?
) -> [String: AmplifyOutputsData.Storage.Bucket] {
guard let configuration,
let buckets = configuration.buckets else {
return [:]
}

return buckets.reduce(into: [:]) { dictionary, bucket in
dictionary[bucket.name] = bucket
}
}

/// Retrieves the region from configuration, validates, and returns it.
private static func getRegion(_ configuration: [String: JSONValue]) throws -> String {
guard let region = configuration[PluginConstants.region] else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ extension AWSS3StoragePlugin {
///
/// - Tag: AWSS3StoragePlugin.reset
public func reset() async {
if storageService != nil {
if defaultBucket != nil {
defaultBucket = nil
}

for storageService in storageServicesByBucket.values {
storageService.reset()
storageService = nil
}
storageServicesByBucket.removeAll()

if additionalBucketsByName != nil {
additionalBucketsByName = nil
}

if authService != nil {
authService = nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

@_spi(InternalAmplifyConfiguration) import Amplify
import Foundation

extension AWSS3StoragePlugin {
/// Returns a AWSS3StorageServiceBehavior instance for the given StorageBucket
func storageService(for bucket: (any StorageBucket)?) throws -> AWSS3StorageServiceBehavior {
guard let bucket else {
// When no bucket is provided, use the default one
return defaultStorageService
}

let bucketInfo = try bucketInfo(from: bucket)
guard let storageService = storageServicesByBucket[bucketInfo.bucketName] else {
// If no service was found for the bucket, create one
let storageService = try createStorageService(
authService: authService,
bucketInfo: bucketInfo
)
storageServicesByBucket[bucketInfo.bucketName] = storageService
return storageService
}

return storageService
}

/// Returns a AWSS3StorageServiceProvider callback for the given StorageBucket
func storageServiceProvider(for bucket: (any StorageBucket)?) -> AWSS3StorageServiceProvider {
let storageServiceResolver: () throws -> AWSS3StorageServiceBehavior = { [weak self] in
guard let self = self else {
throw StorageError.unknown("AWSS3StoragePlugin was deallocated", nil)
}
return try self.storageService(for: bucket)
}
return storageServiceResolver
}

/// Returns a valid `BucketInfo` instance from the given StorageBucket
private func bucketInfo(from bucket: any StorageBucket) throws -> BucketInfo {
switch bucket {
case let outputsBucket as OutputsStorageBucket:
guard let additionalBucketsByName else {
let errorDescription = "Amplify was not configured using an Amplify Outputs file"
let recoverySuggestion = "Make sure that `Amplify.configure(with:)` is invoked"
throw StorageError.validation("bucket", errorDescription, recoverySuggestion, nil)
}

guard let awsBucket = additionalBucketsByName[outputsBucket.name] else {
let errorDescription = "Unable to lookup bucket from provided name in Amplify Outputs"
let recoverySuggestion = "Make sure the bucket name exists in the Amplify Outputs file"
throw StorageError.validation("bucket", errorDescription, recoverySuggestion, nil)
}

return .init(
bucketName: awsBucket.bucketName,
region: awsBucket.awsRegion
)

case let resolvedBucket as ResolvedStorageBucket:
return resolvedBucket.bucketInfo

default:
let errorDescription = "The specified StorageBucket is not supported"
let recoverySuggestion = "Please specify a StorageBucket from the Outputs file or from BucketInfo"
throw StorageError.validation("bucket", errorDescription, recoverySuggestion, nil)
}
}
}
Loading

0 comments on commit 0425e51

Please sign in to comment.