Skip to content

Commit

Permalink
feat(DataStore): DisableSubscriptions flag for watchOS (#3368)
Browse files Browse the repository at this point in the history
* fix(DataStore): Store larger than 32-bit values in Int64 over Int

* fix: Removing ModelFieldType.int64

* feat(DataStore): DisableSubscriptions flag for watchOS

* fix 'default' DataStoreConfiguration to non-watchOS. introduce 'testDefault' for shared testing. force plugin initializer to take in configuration for watchOS

* move integration tests under watchOS flag

* fix integration tests code

* fix integration test code

* Add verbose logging when subsriptions enabled, watchOS simulator

* add doc comments
  • Loading branch information
lawmicha authored Dec 5, 2023
1 parent 88e871f commit 459aaed
Show file tree
Hide file tree
Showing 63 changed files with 585 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1430"
version = "1.7">
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
}
}

/// No-argument init that uses defaults for all providers
#if os(watchOS)
/// Initializer
/// - Parameters:
/// - modelRegistration: Register DataStore models.
/// - dataStoreConfiguration: Configuration object for DataStore
public init(modelRegistration: AmplifyModelRegistration,
configuration dataStoreConfiguration: DataStoreConfiguration = .default) {
configuration dataStoreConfiguration: DataStoreConfiguration) {
self.modelRegistration = modelRegistration
self.configuration = InternalDatastoreConfiguration(
isSyncEnabled: false,
Expand All @@ -77,10 +81,37 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
self.dataStorePublisher = DataStorePublisher()
self.dispatchedModelSyncedEvents = [:]
}
#else
/// Initializer
/// - Parameters:
/// - modelRegistration: Register DataStore models.
/// - dataStoreConfiguration: Configuration object for DataStore
public init(modelRegistration: AmplifyModelRegistration,
configuration dataStoreConfiguration: DataStoreConfiguration = .default) {
self.modelRegistration = modelRegistration
self.configuration = InternalDatastoreConfiguration(
isSyncEnabled: false,
validAPIPluginKey: "awsAPIPlugin",
validAuthPluginKey: "awsCognitoAuthPlugin",
pluginConfiguration: dataStoreConfiguration)

self.storageEngineBehaviorFactory =
StorageEngine.init(
isSyncEnabled:
dataStoreConfiguration:
validAPIPluginKey:
validAuthPluginKey:
modelRegistryVersion:
userDefault:
)
self.dataStorePublisher = DataStorePublisher()
self.dispatchedModelSyncedEvents = [:]
}
#endif

/// Internal initializer for testing
init(modelRegistration: AmplifyModelRegistration,
configuration dataStoreConfiguration: DataStoreConfiguration = .default,
configuration dataStoreConfiguration: DataStoreConfiguration = .testDefault(),
storageEngineBehaviorFactory: StorageEngineBehaviorFactory? = nil,
dataStorePublisher: ModelSubcriptionBehavior,
operationQueue: OperationQueue = OperationQueue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,42 @@ extension DataStoreConfiguration {
public static let defaultSyncMaxRecords: UInt = 10_000
public static let defaultSyncPageSize: UInt = 1_000

#if os(watchOS)
/// Creates a custom configuration. The only required property is `conflictHandler`.
///
/// - Parameters:
/// - errorHandler: a callback function called on unhandled errors
/// - conflictHandler: a callback called when a conflict could not be resolved by the service
/// - syncInterval: how often the sync engine will run (in seconds)
/// - syncMaxRecords: the number of records to sync per execution
/// - syncPageSize: the page size of each sync execution
/// - authModeStrategy: authorization strategy (.default | multiauth)
/// - disableSubscriptions: called before establishing subscriptions. Return true to disable subscriptions.
/// - Returns: an instance of `DataStoreConfiguration` with the passed parameters.
public static func custom(
errorHandler: @escaping DataStoreErrorHandler = { error in
Amplify.Logging.error(error: error)
},
conflictHandler: @escaping DataStoreConflictHandler = { _, resolve in
resolve(.applyRemote)
},
syncInterval: TimeInterval = DataStoreConfiguration.defaultSyncInterval,
syncMaxRecords: UInt = DataStoreConfiguration.defaultSyncMaxRecords,
syncPageSize: UInt = DataStoreConfiguration.defaultSyncPageSize,
syncExpressions: [DataStoreSyncExpression] = [],
authModeStrategy: AuthModeStrategyType = .default,
disableSubscriptions: @escaping () -> Bool
) -> DataStoreConfiguration {
return DataStoreConfiguration(errorHandler: errorHandler,
conflictHandler: conflictHandler,
syncInterval: syncInterval,
syncMaxRecords: syncMaxRecords,
syncPageSize: syncPageSize,
syncExpressions: syncExpressions,
authModeStrategy: authModeStrategy,
disableSubscriptions: disableSubscriptions)
}
#else
/// Creates a custom configuration. The only required property is `conflictHandler`.
///
/// - Parameters:
Expand Down Expand Up @@ -46,10 +82,32 @@ extension DataStoreConfiguration {
syncExpressions: syncExpressions,
authModeStrategy: authModeStrategy)
}

#endif

#if os(watchOS)
/// Default configuration with subscriptions disabled for watchOS. DataStore uses subscriptions via websockets,
/// which work on the watchOS simulator but not on the device. Running DataStore on watchOS with subscriptions
/// enabled is only possible during special circumstances such as actively streaming audio.
/// See https://github.com/aws-amplify/amplify-swift/pull/3368 for more details.
public static var subscriptionsDisabled: DataStoreConfiguration {
.custom(disableSubscriptions: { false })
}
#else
/// The default configuration.
public static var `default`: DataStoreConfiguration {
.custom()
}

#endif

#if os(watchOS)
/// Internal method for testing
static func testDefault(disableSubscriptions: @escaping () -> Bool = { false }) -> DataStoreConfiguration {
.custom(disableSubscriptions: disableSubscriptions)
}
#else
/// Internal method for testing
static func testDefault() -> DataStoreConfiguration {
.custom()
}
#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ public struct DataStoreConfiguration {
/// Authorization mode strategy
public var authModeStrategyType: AuthModeStrategyType

public let disableSubscriptions: () -> Bool

#if os(watchOS)
init(errorHandler: @escaping DataStoreErrorHandler,
conflictHandler: @escaping DataStoreConflictHandler,
syncInterval: TimeInterval,
syncMaxRecords: UInt,
syncPageSize: UInt,
syncExpressions: [DataStoreSyncExpression],
authModeStrategy: AuthModeStrategyType = .default,
disableSubscriptions: @escaping () -> Bool) {
self.errorHandler = errorHandler
self.conflictHandler = conflictHandler
self.syncInterval = syncInterval
self.syncMaxRecords = syncMaxRecords
self.syncPageSize = syncPageSize
self.syncExpressions = syncExpressions
self.authModeStrategyType = authModeStrategy
self.disableSubscriptions = disableSubscriptions
}
#else
init(errorHandler: @escaping DataStoreErrorHandler,
conflictHandler: @escaping DataStoreConflictHandler,
syncInterval: TimeInterval,
Expand All @@ -84,6 +105,7 @@ public struct DataStoreConfiguration {
self.syncPageSize = syncPageSize
self.syncExpressions = syncExpressions
self.authModeStrategyType = authModeStrategy
self.disableSubscriptions = { false }
}

#endif
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
authModeStrategy: resolvedAuthStrategy)

let reconciliationQueueFactory = reconciliationQueueFactory ??
AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:)
AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:disableSubscriptions:)

let initialSyncOrchestratorFactory = initialSyncOrchestratorFactory ??
AWSInitialSyncOrchestrator.init(dataStoreConfiguration:authModeStrategy:api:reconciliationQueue:storageAdapter:)
Expand Down Expand Up @@ -289,7 +289,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
dataStoreConfiguration.syncExpressions,
auth,
authModeStrategy,
nil)
nil,
dataStoreConfiguration.disableSubscriptions)
reconciliationQueueSink = reconciliationQueue?
.publisher
.sink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import AWSPluginsCore
import Combine
import Foundation

typealias DisableSubscriptions = () -> Bool

// Used for testing:
typealias IncomingEventReconciliationQueueFactory =
([ModelSchema],
Expand All @@ -18,7 +20,8 @@ typealias IncomingEventReconciliationQueueFactory =
[DataStoreSyncExpression],
AuthCategoryBehavior?,
AuthModeStrategy,
ModelReconciliationQueueFactory?
ModelReconciliationQueueFactory?,
@escaping DisableSubscriptions
) async -> IncomingEventReconciliationQueue

final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue {
Expand Down Expand Up @@ -48,7 +51,8 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
syncExpressions: [DataStoreSyncExpression],
auth: AuthCategoryBehavior? = nil,
authModeStrategy: AuthModeStrategy,
modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil) async {
modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil,
disableSubscriptions: @escaping () -> Bool = { false } ) async {
self.modelSchemasCount = modelSchemas.count
self.modelReconciliationQueueSinks.set([:])
self.eventReconciliationQueueTopic = CurrentValueSubject<IncomingEventReconciliationQueueEvent, DataStoreError>(.idle)
Expand All @@ -67,6 +71,19 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
self.connectionStatusSerialQueue
= DispatchQueue(label: "com.amazonaws.DataStore.AWSIncomingEventReconciliationQueue")

let subscriptionsDisabled = disableSubscriptions()

#if targetEnvironment(simulator) && os(watchOS)
if !subscriptionsDisabled {
let message = """
DataStore uses subscriptions via websockets, which work on the watchOS simulator but not on the device.
Running DataStore on watchOS with subscriptions enabled is only possible during special circumstances
such as actively streaming audio. See https://github.com/aws-amplify/amplify-swift/pull/3368 for more details.
"""
self.log.verbose(message)
}
#endif

for modelSchema in modelSchemas {
let modelName = modelSchema.name
let syncExpression = syncExpressions.first(where: {
Expand All @@ -78,13 +95,13 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
continue
}
let queue = await self.modelReconciliationQueueFactory(modelSchema,
storageAdapter,
api,
reconcileAndSaveQueue,
modelPredicate,
auth,
authModeStrategy,
nil)
storageAdapter,
api,
reconcileAndSaveQueue,
modelPredicate,
auth,
authModeStrategy,
subscriptionsDisabled ? OperationDisabledIncomingSubscriptionEventPublisher() : nil)

reconciliationQueues.with { reconciliationQueues in
reconciliationQueues[modelName] = queue
Expand Down Expand Up @@ -190,14 +207,15 @@ extension AWSIncomingEventReconciliationQueue: DefaultLogger {

// MARK: - Static factory
extension AWSIncomingEventReconciliationQueue {
static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _ in
static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _, disableSubscriptions in
await AWSIncomingEventReconciliationQueue(modelSchemas: modelSchemas,
api: api,
storageAdapter: storageAdapter,
syncExpressions: syncExpressions,
auth: auth,
authModeStrategy: authModeStrategy,
modelReconciliationQueueFactory: nil)
api: api,
storageAdapter: storageAdapter,
syncExpressions: syncExpressions,
auth: auth,
authModeStrategy: authModeStrategy,
modelReconciliationQueueFactory: nil,
disableSubscriptions: disableSubscriptions)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import AWSPluginsCore
import Combine

final class OperationDisabledIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPublisher {

private let subscriptionEventSubject: PassthroughSubject<IncomingSubscriptionEventPublisherEvent, DataStoreError>

var publisher: AnyPublisher<IncomingSubscriptionEventPublisherEvent, DataStoreError> {
return subscriptionEventSubject.eraseToAnyPublisher()
}

init() {
self.subscriptionEventSubject = PassthroughSubject<IncomingSubscriptionEventPublisherEvent, DataStoreError>()

let apiError = APIError.operationError(AppSyncErrorType.operationDisabled.rawValue, "", nil)
let dataStoreError = DataStoreError.api(apiError, nil)
subscriptionEventSubject.send(completion: .failure(dataStoreError))

}

func cancel() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
return
}
if case let .api(error, _) = dataStoreError,
case let APIError.operationError(_, _, underlyingError) = error,
isOperationDisabledError(underlyingError) {
case let APIError.operationError(errorMessage, _, underlyingError) = error,
isOperationDisabledError(errorMessage, underlyingError) {
log.verbose("[InitializeSubscription.3] AWSModelReconciliationQueue determined isOperationDisabledError \(modelSchema.name)")
modelReconciliationQueueSubject.send(.disconnected(modelName: modelSchema.name, reason: .operationDisabled))
return
Expand Down Expand Up @@ -284,7 +284,12 @@ extension AWSModelReconciliationQueue {
return false
}

private func isOperationDisabledError(_ error: Error?) -> Bool {
private func isOperationDisabledError(_ errorMessage: String?, _ error: Error?) -> Bool {
if let errorMessage = errorMessage,
case .operationDisabled = AppSyncErrorType(errorMessage) {
return true
}

if let responseError = error as? GraphQLResponseError<ResponseType>,
let graphQLError = graphqlErrors(from: responseError)?.first,
let errorTypeValue = errorTypeValueFrom(graphQLError: graphQLError),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ import AWSDataStorePlugin
class AWSDataStorePluginAmplifyVersionableTests: XCTestCase {

func testVersionExists() {
#if os(watchOS)
let plugin = AWSDataStorePlugin(modelRegistration: AmplifyModels(),
configuration: .subscriptionsDisabled)
#else
let plugin = AWSDataStorePlugin(modelRegistration: AmplifyModels())
#endif
XCTAssertNotNil(plugin.version)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ class AWSDataStorePluginConfigurationTests: XCTestCase {
}

func testDoesNotThrowOnMissingConfig() throws {
#if os(watchOS)
let plugin = AWSDataStorePlugin(modelRegistration: TestModelRegistration(),
configuration: .subscriptionsDisabled)
#else
let plugin = AWSDataStorePlugin(modelRegistration: TestModelRegistration())

#endif
try Amplify.add(plugin: plugin)

let categoryConfig = DataStoreCategoryConfiguration(plugins: ["NonExistentPlugin": true])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class SQLiteStorageEngineAdapterJsonTests: XCTestCase {
try storageAdapter.setUp(modelSchemas: StorageEngine.systemModelSchemas)

let syncEngine = try RemoteSyncEngine(storageAdapter: storageAdapter,
dataStoreConfiguration: .default)
dataStoreConfiguration: .testDefault())
storageEngine = StorageEngine(storageAdapter: storageAdapter,
dataStoreConfiguration: .default,
dataStoreConfiguration: .testDefault(),
syncEngine: syncEngine,
validAPIPluginKey: validAPIPluginKey,
validAuthPluginKey: validAuthPluginKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CascadeDeleteOperationTests: StorageEngineTestsBase {

syncEngine = MockRemoteSyncEngine()
storageEngine = StorageEngine(storageAdapter: storageAdapter,
dataStoreConfiguration: .default,
dataStoreConfiguration: .testDefault(),
syncEngine: syncEngine,
validAPIPluginKey: validAPIPluginKey,
validAuthPluginKey: validAuthPluginKey)
Expand Down
Loading

0 comments on commit 459aaed

Please sign in to comment.