Skip to content

Commit

Permalink
feat(Storage): Implementing support for multiple buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
ruisebas committed Aug 14, 2024
1 parent f012197 commit 5513b43
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 45 deletions.
19 changes: 19 additions & 0 deletions Amplify/Core/Configuration/AmplifyOutputsData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ public struct AmplifyOutputsData: Codable {
public struct Storage: Codable {
public let awsRegion: AWSRegion
public let bucketName: String
public let buckets: [Bucket]?

@_spi(InternalAmplifyConfiguration)
public struct Bucket: Codable {
public let name: String
public let bucketName: String
public let awsRegion: AWSRegion
}

// Internal init used for testing
init(
awsRegion: AWSRegion,
bucketName: String,
buckets: [Bucket]? = nil
) {
self.awsRegion = awsRegion
self.bucketName = bucketName
self.buckets = buckets
}
}

@_spi(InternalAmplifyConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extension AWSS3StoragePlugin {
let prefix = try await prefixResolver.resolvePrefix(for: options.accessLevel,
targetIdentityId: options.targetIdentityId)
let serviceKey = prefix + request.key

let storageService = try storageService(for: options.bucket)
if let pluginOptions = options.pluginOptions as? AWSStorageGetURLOptions, pluginOptions.validateObjectExistence {
try await storageService.validateObjectExistence(serviceKey: serviceKey)
}
Expand All @@ -51,6 +53,7 @@ extension AWSS3StoragePlugin {
) async throws -> URL {
let options = options ?? StorageGetURLRequest.Options()
let request = StorageGetURLRequest(path: path, options: options)
let storageService = try storageService(for: options.bucket)
let task = AWSS3StorageGetURLTask(
request,
storageBehaviour: storageService)
Expand All @@ -65,7 +68,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadDataRequest(path: path, options: options)
let operation = AWSS3StorageDownloadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -82,7 +85,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadDataRequest(key: key, options: options)
let operation = AWSS3StorageDownloadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -100,7 +103,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadFileRequest(key: key, local: local, options: options)
let operation = AWSS3StorageDownloadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -118,7 +121,7 @@ extension AWSS3StoragePlugin {
let request = StorageDownloadFileRequest(path: path, local: local, options: options)
let operation = AWSS3StorageDownloadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -136,7 +139,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadDataRequest(key: key, data: data, options: options)
let operation = AWSS3StorageUploadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -154,7 +157,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadDataRequest(path: path, data: data, options: options)
let operation = AWSS3StorageUploadDataOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -172,7 +175,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadFileRequest(key: key, local: local, options: options)
let operation = AWSS3StorageUploadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -190,7 +193,7 @@ extension AWSS3StoragePlugin {
let request = StorageUploadFileRequest(path: path, local: local, options: options)
let operation = AWSS3StorageUploadFileOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
storageServiceProvider: storageServiceProvider(for: options.bucket),
authService: authService)
let taskAdapter = AmplifyInProcessReportingOperationTaskAdapter(operation: operation)
queue.addOperation(operation)
Expand All @@ -205,6 +208,7 @@ extension AWSS3StoragePlugin {
) async throws -> String {
let options = options ?? StorageRemoveRequest.Options()
let request = StorageRemoveRequest(key: key, options: options)
let storageService = try storageService(for: options.bucket)
let operation = AWSS3StorageRemoveOperation(request,
storageConfiguration: storageConfiguration,
storageService: storageService,
Expand All @@ -222,6 +226,7 @@ extension AWSS3StoragePlugin {
) async throws -> String {
let options = options ?? StorageRemoveRequest.Options()
let request = StorageRemoveRequest(path: path, options: options)
let storageService = try storageService(for: options.bucket)
let task = AWSS3StorageRemoveTask(
request,
storageConfiguration: storageConfiguration,
Expand All @@ -233,6 +238,7 @@ extension AWSS3StoragePlugin {
options: StorageListRequest.Options? = nil
) async throws -> StorageListResult {
let options = options ?? StorageListRequest.Options()
let storageService = try storageService(for: options.bucket)
let prefixResolver = storageConfiguration.prefixResolver ?? StorageAccessLevelAwarePrefixResolver(authService: authService)
let prefix = try await prefixResolver.resolvePrefix(for: options.accessLevel, targetIdentityId: options.targetIdentityId)
let result = try await storageService.list(prefix: prefix, options: options)
Expand All @@ -250,6 +256,7 @@ extension AWSS3StoragePlugin {
) async throws -> StorageListResult {
let options = options ?? StorageListRequest.Options()
let request = StorageListRequest(path: path, options: options)
let storageService = try storageService(for: options.bucket)
let task = AWSS3StorageListObjectsTask(
request,
storageConfiguration: storageConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import Foundation
@_spi(InternalAmplifyConfiguration) import Amplify
import AWSPluginsCore
import InternalAmplifyCredentials

extension AWSS3StoragePlugin {

Expand All @@ -27,6 +28,7 @@ extension AWSS3StoragePlugin {
let configClosures: ConfigurationClosures
if let config = configuration as? AmplifyOutputsData {
configClosures = try retrieveConfiguration(config)
configuredBuckets = retrieveConfiguredBuckets(from: config.storage)
} else if let config = configuration as? JSONValue {
configClosures = try retrieveConfiguration(config)
} else {
Expand All @@ -38,15 +40,22 @@ extension AWSS3StoragePlugin {
do {
let authService = AWSAuthService()
let defaultAccessLevel = try configClosures.retrieveDefaultAccessLevel()
let storageService = try AWSS3StorageService(authService: authService,
region: configClosures.retrieveRegion(),
bucket: configClosures.retrieveBucket(),
httpClientEngineProxy: self.httpClientEngineProxy)
storageService.urlRequestDelegate = self.urlRequestDelegate

configure(storageService: storageService,
authService: authService,
defaultAccessLevel: defaultAccessLevel)
let defaultRegion = try configClosures.retrieveRegion()
let defaultBucket = try configClosures.retrieveBucket()
let storageService = try createStorageService(
authService: authService,
bucketInfo: .init(
bucketName: defaultBucket,
region: defaultRegion
)
)

configure(
defaultBucket: defaultBucket,
storageService: storageService,
authService: authService,
defaultAccessLevel: defaultAccessLevel
)
} catch let storageError as StorageError {
throw storageError
} catch {
Expand All @@ -67,18 +76,35 @@ extension AWSS3StoragePlugin {
/// Called from the configure method which implements the Plugin protocol. Useful for testing by passing in mocks.
///
/// - Parameters:
/// - storageService: The S3 storage service object.
/// - defaultBucket: The bucket name to be used for all API calls by default.
/// - storageService: The S3 storage service object associated with the default bucket
/// - authService: The authentication service object.
/// - defaultAccessLevel: The access level to be used for all API calls by default.
/// - queue: The queue which operations are stored and dispatched for asychronous processing.
func configure(storageService: AWSS3StorageServiceBehavior,
authService: AWSAuthServiceBehavior,
func configure(defaultBucket: String,
storageService: AWSS3StorageServiceBehavior,
authService: AWSAuthCredentialsProviderBehavior,
defaultAccessLevel: StorageAccessLevel,
queue: OperationQueue = OperationQueue()) {
self.storageService = storageService
self.defaultBucket = defaultBucket
self.authService = authService
self.queue = queue
self.defaultAccessLevel = defaultAccessLevel
self.storageServices[defaultBucket] = storageService
}

func createStorageService(
authService: AWSAuthCredentialsProviderBehavior,
bucketInfo: BucketInfo
) throws -> AWSS3StorageServiceBehavior {
let storageService = try AWSS3StorageService(
authService: authService,
region: bucketInfo.region,
bucket: bucketInfo.bucketName,
httpClientEngineProxy: httpClientEngineProxy
)
storageService.urlRequestDelegate = urlRequestDelegate
return storageService
}

// MARK: Private helper methods
Expand Down Expand Up @@ -127,6 +153,21 @@ extension AWSS3StoragePlugin {
retrieveDefaultAccessLevel: defaultAccessLevelClosure)
}

/// Retrieves the configured buckets from the configuration grouped by their names.
/// If no buckets are provided in the configuration, an empty dictionary is returned instead.
private func retrieveConfiguredBuckets(
from configuration: AmplifyOutputsData.Storage?
) -> [String: AmplifyOutputsData.Storage.Bucket] {
guard let configuration,
let buckets = configuration.buckets else {
return [:]
}

return buckets.reduce(into: [:]) { dictionary, bucket in
dictionary[bucket.name] = bucket
}
}

/// Retrieves the region from configuration, validates, and returns it.
private static func getRegion(_ configuration: [String: JSONValue]) throws -> String {
guard let region = configuration[PluginConstants.region] else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ extension AWSS3StoragePlugin {
///
/// - Tag: AWSS3StoragePlugin.reset
public func reset() async {
if storageService != nil {
if defaultBucket != nil {
defaultBucket = nil
}

for storageService in storageServices.values {
storageService.reset()
storageService = nil
}
storageServices.removeAll()

if configuredBuckets != nil {
configuredBuckets = nil
}

if authService != nil {
authService = nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

@_spi(InternalAmplifyConfiguration) import Amplify
import Foundation

extension AWSS3StoragePlugin {
/// Returns a AWSS3StorageServiceBehavior instance for the given StorageBucket
func storageService(for bucket: (any StorageBucket)?) throws -> AWSS3StorageServiceBehavior {
guard let bucket else {
// When no bucket is provided, use the default one
return storageService
}

let bucketInfo = try bucketInfo(from: bucket)
guard let storageService = storageServices[bucketInfo.bucketName] else {
let storageService = try createStorageService(
authService: authService,
bucketInfo: bucketInfo
)
storageServices[bucketInfo.bucketName] = storageService
return storageService
}

return storageService
}

/// Returns a AWSS3StorageServiceProvider callback for the given StorageBucket
func storageServiceProvider(for bucket: (any StorageBucket)?) -> AWSS3StorageServiceProvider {
let storageServiceResolver: () throws -> AWSS3StorageServiceBehavior = { [weak self] in
guard let self = self else {
throw StorageError.unknown("AWSS3StoragePlugin was deallocated", nil)
}
return try self.storageService(for: bucket)
}
return storageServiceResolver
}

/// Returns a valid `BucketInfo` instance from the given StorageBucket
private func bucketInfo(from bucket: any StorageBucket) throws -> BucketInfo {
switch bucket {
case let outputsBucket as OutputsStorageBucket:
guard let configuredBuckets else {
let errorDescription = "Amplify was not configured using an Amplify Outputs file"
let recoverySuggestion = "Make sure that `Amplify.configure(with:)` is invoked"
throw StorageError.validation("bucket", errorDescription, recoverySuggestion, nil)
}

guard let awsBucket = configuredBuckets[outputsBucket.name] else {
let errorDescription = "Unable to lookup bucket from provided name in Amplify Outputs"
let recoverySuggestion = "Make sure the bucket name exists in the Amplify Outputs file"
throw StorageError.validation("bucket", errorDescription, recoverySuggestion, nil)
}

return .init(
bucketName: awsBucket.bucketName,
region: awsBucket.awsRegion
)

case let resolvedBucket as ResolvedStorageBucket:
return resolvedBucket.bucketInfo

default:
let errorDescription = "The specified StorageBucket is not supported"
let recoverySuggestion = "Please specify a StorageBucket from the Outputs file or from BucketInfo"
throw StorageError.validation("bucket", errorDescription, recoverySuggestion, nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,36 @@
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
@_spi(InternalAmplifyConfiguration) import Amplify
import Foundation
import AWSPluginsCore
import InternalAmplifyCredentials

/// The AWSS3StoragePlugin which conforms to the Amplify plugin protocols and implements the Storage
/// Plugin APIs for AWS S3.
///
/// - Tag: AWSS3StoragePlugin
final public class AWSS3StoragePlugin: StorageCategoryPlugin {

/// An instance of the S3 storage service.
var storageService: AWSS3StorageServiceBehavior!
/// The default S3 storage service.
var storageService: AWSS3StorageServiceBehavior! {
guard let defaultBucket else {
return nil
}
return storageServices[defaultBucket]
}

/// The default bucket name
var defaultBucket: String!

/// A dictionary of S3 storage service instances grouped by a specific bucket
var storageServices: AtomicDictionary<String, AWSS3StorageServiceBehavior> = [:]

/// A dictionary of AmplifyOutputs-configured buckets grouped by their names
var configuredBuckets: [String: AmplifyOutputsData.Storage.Bucket]?

/// An instance of the authentication service.
var authService: AWSAuthServiceBehavior!
var authService: AWSAuthCredentialsProviderBehavior!

/// A queue that regulates the execution of operations.
var queue: OperationQueue!
Expand Down
Loading

0 comments on commit 5513b43

Please sign in to comment.