From a0ee2574eecff1fe77500705d744d710b661bfc4 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Tue, 2 Jan 2024 10:47:45 -0800 Subject: [PATCH] fix(datastore): sync pending mutation events with latest synced metadata (#3377) * apply metadata of incoming mutation event which has pending mutations * sync mutation event with latest synced version * add doc for reconcile * resolve comments * remove redundant logic * update test case to verify the latest synced version is applied to api request * resolve comments --- ...atabaseAdapter+MutationEventIngester.swift | 17 - .../OutgoingMutationQueue.swift | 8 +- .../SyncMutationToCloudOperation.swift | 11 +- .../ReconcileAndLocalSaveOperation.swift | 161 ++++--- .../RemoteSyncReconciler.swift | 31 +- .../Support/MutationEvent+Extensions.swift | 104 ----- .../SyncMutationToCloudOperationTests.swift | 64 ++- .../ReconcileAndLocalSaveOperationTests.swift | 30 +- .../RemoteSyncReconcilerTests.swift | 31 -- .../MutationEventExtensionsTests.swift | 404 ------------------ .../StorageCategoryConfigurationTests.swift | 2 +- 11 files changed, 186 insertions(+), 677 deletions(-) delete mode 100644 AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/MutationEvent+Extensions.swift delete mode 100644 AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/Support/MutationEventExtensionsTests.swift diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift index 88851515f7..528c4f9210 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventIngester.swift @@ -33,21 +33,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester { func resolveConflictsThenSave(mutationEvent: MutationEvent, storageAdapter: StorageEngineAdapter, completion: @escaping (Result)->Void) { - - // We don't want to query MutationSync because a) we already have the model, and b) delete mutations - // are submitted *after* the delete has already been applied to the local data store, meaning there is no model - // to query. - var mutationEvent = mutationEvent - do { - // TODO: Refactor this so that it's clear that the storage engine is not responsible for setting the version - // perhaps as simple as renaming to `submit(unversionedMutationEvent:)` or similar - let syncMetadata = try storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId, - modelName: mutationEvent.modelName) - mutationEvent.version = syncMetadata?.version - } catch { - completion(.failure(DataStoreError(error: error))) - } - MutationEvent.pendingMutationEvents( forMutationEvent: mutationEvent, storageAdapter: storageAdapter) { result in @@ -208,8 +193,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester { } resolvedEvent.mutationType = updatedMutationType - resolvedEvent.version = candidate.version - return resolvedEvent } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 5e833b41a5..ef982dff25 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -204,6 +204,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { Task { let syncMutationToCloudOperation = await SyncMutationToCloudOperation( mutationEvent: mutationEvent, + getLatestSyncMetadata: { try? self.storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId, modelName: mutationEvent.modelName) }, api: api, authModeStrategy: authModeStrategy ) { [weak self] result in @@ -257,12 +258,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { return } reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName) - MutationEvent.reconcilePendingMutationEventsVersion( - sent: mutationEvent, - received: mutationSync, - storageAdapter: storageAdapter) { _ in - self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync) - } + completeProcessingEvent(mutationEvent, mutationSync: mutationSync) } else { completeProcessingEvent(mutationEvent) } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift index dac31dd0bf..77434f51ef 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift @@ -19,6 +19,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { private weak var api: APICategoryGraphQLBehaviorExtended? private let mutationEvent: MutationEvent + private let getLatestSyncMetadata: () -> MutationSyncMetadata? private let completion: GraphQLOperation>.ResultListener private let requestRetryablePolicy: RequestRetryablePolicy @@ -31,6 +32,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { private var authTypesIterator: AWSAuthorizationTypeIterator? init(mutationEvent: MutationEvent, + getLatestSyncMetadata: @escaping () -> MutationSyncMetadata?, api: APICategoryGraphQLBehaviorExtended, authModeStrategy: AuthModeStrategy, networkReachabilityPublisher: AnyPublisher? = nil, @@ -38,6 +40,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { requestRetryablePolicy: RequestRetryablePolicy? = RequestRetryablePolicy(), completion: @escaping GraphQLOperation>.ResultListener) async { self.mutationEvent = mutationEvent + self.getLatestSyncMetadata = getLatestSyncMetadata self.api = api self.networkReachabilityPublisher = networkReachabilityPublisher self.completion = completion @@ -57,6 +60,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { override func main() { log.verbose(#function) + sendMutationToCloud(withAuthType: authTypesIterator?.next()) } @@ -108,6 +112,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { mutationType: GraphQLMutationType, authType: AWSAuthorizationType? = nil ) -> GraphQLRequest>? { + let latestSyncMetadata = getLatestSyncMetadata() var request: GraphQLRequest> do { @@ -128,7 +133,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { request = GraphQLRequest.deleteMutation(of: model, modelSchema: modelSchema, where: graphQLFilter, - version: mutationEvent.version) + version: latestSyncMetadata?.version) case .update: let model = try mutationEvent.decodeModel() guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else { @@ -140,7 +145,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { request = GraphQLRequest.updateMutation(of: model, modelSchema: modelSchema, where: graphQLFilter, - version: mutationEvent.version) + version: latestSyncMetadata?.version) case .create: let model = try mutationEvent.decodeModel() guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else { @@ -151,7 +156,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { } request = GraphQLRequest.createMutation(of: model, modelSchema: modelSchema, - version: mutationEvent.version) + version: latestSyncMetadata?.version) } } catch { let apiError = APIError.unknown("Couldn't decode model", "", error) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift index 9afaf2e8a1..1ee6004c70 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift @@ -113,6 +113,14 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { // MARK: - Responder methods + /// The reconcile function incorporates incoming mutation events into the local database through the following steps: + /// 1. Retrieve the local metadata of the models. + /// 2. Generate dispositions based on incoming mutation events and local metadata. + /// 3. Categorize dispositions into: + /// 3.1 Apply metadata only for those with existing pending mutations. + /// 3.1.1 Notify the count of these incoming mutation events as dropped items. + /// 3.2 Apply incoming mutation and metadata for those without existing pending mutations. + /// 4. Notify the final result. func reconcile(remoteModels: [RemoteModel]) { guard !isCancelled else { log.info("\(#function) - cancelled, aborting") @@ -133,16 +141,21 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { do { try storageAdapter.transaction { - queryPendingMutations(withModels: remoteModels.map(\.model)) + self.queryLocalMetadata(remoteModels) .subscribe(on: workQueue) - .flatMap { mutationEvents -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> in - let remoteModelsToApply = self.reconcile(remoteModels, pendingMutations: mutationEvents) - return self.queryLocalMetadata(remoteModelsToApply) + .map { (remoteModels, localMetadatas) in + self.getDispositions(for: remoteModels, localMetadatas: localMetadatas) } - .flatMap { (remoteModelsToApply, localMetadatas) -> Future in - let dispositions = self.getDispositions(for: remoteModelsToApply, - localMetadatas: localMetadatas) - return self.applyRemoteModelsDispositions(dispositions) + .flatMap { dispositions in + self.queryPendingMutations(withModels: dispositions.map(\.remoteModel.model)) + .map { pendingMutations in (pendingMutations, dispositions) } + } + .map { (pendingMutations, dispositions) in + self.separateDispositions(pendingMutations: pendingMutations, dispositions: dispositions) + } + .flatMap { (dispositions, dispositionOnlyApplyMetadata) in + self.waitAllPublisherFinishes(publishers: dispositionOnlyApplyMetadata.map(self.saveMetadata(disposition:))) + .flatMap { _ in self.applyRemoteModelsDispositions(dispositions) } } .sink( receiveCompletion: { @@ -203,15 +216,27 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { } } - func reconcile(_ remoteModels: [RemoteModel], pendingMutations: [MutationEvent]) -> [RemoteModel] { - guard !remoteModels.isEmpty else { - return [] + func separateDispositions( + pendingMutations: [MutationEvent], + dispositions: [RemoteSyncReconciler.Disposition] + ) -> ([RemoteSyncReconciler.Disposition], [RemoteSyncReconciler.Disposition]) { + guard !dispositions.isEmpty else { + return ([], []) } - let remoteModelsToApply = RemoteSyncReconciler.filter(remoteModels, - pendingMutations: pendingMutations) - notifyDropped(count: remoteModels.count - remoteModelsToApply.count) - return remoteModelsToApply + + let pendingMutationModelIds = Set(pendingMutations.map(\.modelId)) + + let dispositionsToApply = dispositions.filter { + !pendingMutationModelIds.contains($0.remoteModel.model.identifier) + } + + let dispositionsOnlyApplyMetadata = dispositions.filter { + pendingMutationModelIds.contains($0.remoteModel.model.identifier) + } + + notifyDropped(count: dispositionsOnlyApplyMetadata.count) + return (dispositionsToApply, dispositionsOnlyApplyMetadata) } func queryLocalMetadata(_ remoteModels: [RemoteModel]) -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> { @@ -269,24 +294,16 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { disposition: RemoteSyncReconciler.Disposition ) -> AnyPublisher, Never> { let operation: Future - let mutationType: MutationEvent.MutationType switch disposition { - case .create(let remoteModel): - operation = self.save(storageAdapter: storageAdapter, remoteModel: remoteModel) - mutationType = .create - case .update(let remoteModel): - operation = self.save(storageAdapter: storageAdapter, remoteModel: remoteModel) - mutationType = .update - case .delete(let remoteModel): - operation = self.delete(storageAdapter: storageAdapter, remoteModel: remoteModel) - mutationType = .delete + case .create, .update: + operation = self.save(storageAdapter: storageAdapter, remoteModel: disposition.remoteModel) + case .delete: + operation = self.delete(storageAdapter: storageAdapter, remoteModel: disposition.remoteModel) } return operation - .flatMap { applyResult in - self.saveMetadata(storageAdapter: storageAdapter, applyResult: applyResult, mutationType: mutationType) - } - .map {_ in Result.success(()) } + .flatMap { self.saveMetadata(storageAdapter: storageAdapter, result: $0, mutationType: disposition.mutationType) } + .map { _ in Result.success(()) } .catch { Just>(.failure($0))} .eraseToAnyPublisher() } @@ -315,15 +332,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { applyRemoteModelsDisposition(storageAdapter: storageAdapter, disposition: $0) } - return Future { promise in - Publishers.MergeMany(publishers) - .collect() - .sink { _ in - // This stream will never fail, as we wrapped error in the result type. - promise(.successfulVoid) - } receiveValue: { _ in } - .store(in: &self.cancellables) - } + return self.waitAllPublisherFinishes(publishers: publishers) } enum ApplyRemoteModelResult { @@ -359,8 +368,10 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { } } - private func save(storageAdapter: StorageEngineAdapter, - remoteModel: RemoteModel) -> Future { + private func save( + storageAdapter: StorageEngineAdapter, + remoteModel: RemoteModel + ) -> Future { Future { promise in storageAdapter.save(untypedModel: remoteModel.model.instance, eagerLoad: self.isEagerLoad) { response in switch response { @@ -388,27 +399,50 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { } } - private func saveMetadata(storageAdapter: StorageEngineAdapter, - applyResult: ApplyRemoteModelResult, - mutationType: MutationEvent.MutationType) -> Future { - Future { promise in - guard case let .applied(inProcessModel) = applyResult else { - promise(.successfulVoid) - return - } + private func saveMetadata( + disposition: RemoteSyncReconciler.Disposition + ) -> AnyPublisher { + guard let storageAdapter = self.storageAdapter else { + return Just(()).eraseToAnyPublisher() + } + return saveMetadata(storageAdapter: storageAdapter, remoteModel: disposition.remoteModel, mutationType: disposition.mutationType) + .map { _ in () } + .catch { _ in Just(()) } + .eraseToAnyPublisher() + } - storageAdapter.save(inProcessModel.syncMetadata, - condition: nil, - eagerLoad: self.isEagerLoad) { result in - switch result { - case .failure(let dataStoreError): - self.notifyDropped(error: dataStoreError) - promise(.failure(dataStoreError)) - case .success(let syncMetadata): + private func saveMetadata( + storageAdapter: StorageEngineAdapter, + result: ApplyRemoteModelResult, + mutationType: MutationEvent.MutationType + ) -> AnyPublisher { + if case let .applied(inProcessModel) = result { + return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType) + .handleEvents( receiveOutput: { syncMetadata in let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata) self.notify(savedModel: appliedModel, mutationType: mutationType) - promise(.successfulVoid) - } + }, receiveCompletion: { completion in + if case .failure(let error) = completion { + self.notifyDropped(error: error) + } + }) + .map { _ in () } + .eraseToAnyPublisher() + + } + return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher() + } + + private func saveMetadata( + storageAdapter: StorageEngineAdapter, + remoteModel: RemoteModel, + mutationType: MutationEvent.MutationType + ) -> Future { + Future { promise in + storageAdapter.save(remoteModel.syncMetadata, + condition: nil, + eagerLoad: self.isEagerLoad) { result in + promise(result) } } } @@ -454,6 +488,17 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { private static func unfulfilledDataStoreError(name: String = #function) -> DataStoreError { .unknown("\(name) did not fulfill promise", AmplifyErrorMessages.shouldNotHappenReportBugToAWS(), nil) } + + private func waitAllPublisherFinishes(publishers: [AnyPublisher]) -> Future { + Future { promise in + Publishers.MergeMany(publishers) + .collect() + .sink(receiveCompletion: { _ in + promise(.successfulVoid) + }, receiveValue: { _ in }) + .store(in: &self.cancellables) + } + } } extension ReconcileAndLocalSaveOperation: DefaultLogger { diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/RemoteSyncReconciler.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/RemoteSyncReconciler.swift index 9115e96cd5..6efae1270d 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/RemoteSyncReconciler.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/RemoteSyncReconciler.swift @@ -16,31 +16,24 @@ struct RemoteSyncReconciler { case create(RemoteModel) case update(RemoteModel) case delete(RemoteModel) - } - /// Filter the incoming `remoteModels` against the pending mutations. - /// If there is a matching pending mutation, drop the remote model. - /// - /// - Parameters: - /// - remoteModels: models retrieved from the remote store - /// - pendingMutations: pending mutations from the outbox - /// - Returns: remote models to be applied - static func filter(_ remoteModels: [RemoteModel], - pendingMutations: [MutationEvent]) -> [RemoteModel] { - guard !pendingMutations.isEmpty else { - return remoteModels + var remoteModel: RemoteModel { + switch self { + case .create(let model), .update(let model), .delete(let model): + return model + } } - let pendingMutationModelIdsArr = pendingMutations.map { mutationEvent in - mutationEvent.modelId - } - let pendingMutationModelIds = Set(pendingMutationModelIdsArr) - - return remoteModels.filter { remoteModel in - !pendingMutationModelIds.contains(remoteModel.model.identifier) + var mutationType: MutationEvent.MutationType { + switch self { + case .create: return .create + case .update: return .update + case .delete: return .delete + } } } + /// Reconciles the incoming `remoteModels` against the local metadata to get the disposition /// /// - Parameters: diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/MutationEvent+Extensions.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/MutationEvent+Extensions.swift deleted file mode 100644 index 7d2057528e..0000000000 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/MutationEvent+Extensions.swift +++ /dev/null @@ -1,104 +0,0 @@ -// -// Copyright Amazon.com Inc. or its affiliates. -// All Rights Reserved. -// -// SPDX-License-Identifier: Apache-2.0 -// - -import Amplify -import Dispatch -import AWSPluginsCore - -extension MutationEvent { - // Consecutive operations that modify a model results in a sequence of pending mutation events that - // have the current version of the model. The first mutation event has the correct version of the model, - // while the subsequent events will have lower versions if the first mutation event is successfully synced - // to the cloud. By reconciling the pending mutation events after syncing the first mutation event, - // we attempt to update the pending version to the latest version from the response. - // The before and after conditions for consecutive update scenarios are as below: - // - Save, then immediately update - // Queue Before - [(version: nil, inprocess: true, type: .create), - // (version: nil, inprocess: false, type: .update)] - // Response - [version: 1, type: .create] - // Queue After - [(version: 1, inprocess: false, type: .update)] - // - Save, then immediately delete - // Queue Before - [(version: nil, inprocess: true, type: .create), - // (version: nil, inprocess: false, type: .delete)] - // Response - [version: 1, type: .create] - // Queue After - [(version: 1, inprocess: false, type: .delete)] - // - Save, sync, then immediately update and delete - // Queue Before (After save, sync) - // - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)] - // Response - [version: 2, type: .update] - // Queue After - [(version: 2, inprocess: false, type: .delete)] - // - // For a given model `id`, checks the version of the head of pending mutation event queue - // against the API response version in `mutationSync` and saves it in the mutation event table if - // the response version is a newer one - static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent, - received mutationSync: MutationSync, - storageAdapter: StorageEngineAdapter, - completion: @escaping DataStoreCallback) { - MutationEvent.pendingMutationEvents( - forMutationEvent: mutationEvent, - storageAdapter: storageAdapter - ) { queryResult in - switch queryResult { - case .failure(let dataStoreError): - completion(.failure(dataStoreError)) - case .success(let localMutationEvents): - guard let existingEvent = localMutationEvents.first else { - completion(.success(())) - return - } - - guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent, - with: mutationEvent, - responseMutationSync: mutationSync) else { - completion(.success(())) - return - } - - storageAdapter.save(reconciledEvent, condition: nil, eagerLoad: true) { result in - switch result { - case .failure(let dataStoreError): - completion(.failure(dataStoreError)) - case .success: - completion(.success(())) - } - } - } - } - } - - static func reconcile(pendingMutationEvent: MutationEvent, - with requestMutationEvent: MutationEvent, - responseMutationSync: MutationSync) -> MutationEvent? { - // return if version of the pending mutation event is not nil and - // is >= version contained in the response - if pendingMutationEvent.version != nil && - pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version { - return nil - } - - do { - let responseModel = responseMutationSync.model.instance - let requestModel = try requestMutationEvent.decodeModel() - - // check if the data sent in the request is the same as the response - // if it is, update the pending mutation event version to the response version - guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName), - modelSchema.compare(responseModel, requestModel) else { - return nil - } - - var pendingMutationEvent = pendingMutationEvent - pendingMutationEvent.version = responseMutationSync.syncMetadata.version - return pendingMutationEvent - } catch { - Amplify.log.verbose("Error decoding models: \(error)") - return nil - } - } - -} diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/SyncMutationToCloudOperationTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/SyncMutationToCloudOperationTests.swift index 263a6b52f6..96a0ef2252 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/SyncMutationToCloudOperationTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/SyncMutationToCloudOperationTests.swift @@ -45,8 +45,10 @@ class SyncMutationToCloudOperationTests: XCTestCase { var listenerFromSecondRequestOptional: GraphQLOperation>.ResultListener? var numberOfTimesEntered = 0 - let responder = MutateRequestListenerResponder> { _, eventListener in + let responder = MutateRequestListenerResponder> { request, eventListener in if numberOfTimesEntered == 0 { + let requestInputVersion = request.variables.flatMap { $0["input"] as? [String: Any] }.flatMap { $0["_version"] as? Int } + XCTAssertEqual(requestInputVersion, 10) listenerFromFirstRequestOptional = eventListener expectFirstCallToAPIMutate.fulfill() } else if numberOfTimesEntered == 1 { @@ -67,12 +69,24 @@ class SyncMutationToCloudOperationTests: XCTestCase { expectMutationRequestCompletion.fulfill() } - let operation = await SyncMutationToCloudOperation(mutationEvent: mutationEvent, - api: mockAPIPlugin, - authModeStrategy: AWSDefaultAuthModeStrategy(), - networkReachabilityPublisher: publisher, - currentAttemptNumber: 1, - completion: completion) + let model = MockSynced(id: "id-1") + let operation = await SyncMutationToCloudOperation( + mutationEvent: mutationEvent, + getLatestSyncMetadata: { + MutationSyncMetadata( + modelId: model.id, + modelName: model.modelName, + deleted: false, + lastChangedAt: Date().unixSeconds, + version: 10 + ) + }, + api: mockAPIPlugin, + authModeStrategy: AWSDefaultAuthModeStrategy(), + networkReachabilityPublisher: publisher, + currentAttemptNumber: 1, + completion: completion + ) let queue = OperationQueue() queue.addOperation(operation) await fulfillment(of: [expectFirstCallToAPIMutate], timeout: defaultAsyncWaitTimeout) @@ -90,7 +104,7 @@ class SyncMutationToCloudOperationTests: XCTestCase { return } - let model = MockSynced(id: "id-1") + let anyModel = try model.eraseToAnyModel() let remoteSyncMetadata = MutationSyncMetadata(modelId: model.id, modelName: model.modelName, @@ -139,13 +153,16 @@ class SyncMutationToCloudOperationTests: XCTestCase { let completion: GraphQLOperation>.ResultListener = { _ in expectMutationRequestCompletion.fulfill() } - let operation = await SyncMutationToCloudOperation(mutationEvent: mutationEvent, - api: mockAPIPlugin, - authModeStrategy: AWSDefaultAuthModeStrategy(), - networkReachabilityPublisher: publisher, - currentAttemptNumber: 1, - requestRetryablePolicy: mockRequestRetryPolicy, - completion: completion) + let operation = await SyncMutationToCloudOperation( + mutationEvent: mutationEvent, + getLatestSyncMetadata: { nil }, + api: mockAPIPlugin, + authModeStrategy: AWSDefaultAuthModeStrategy(), + networkReachabilityPublisher: publisher, + currentAttemptNumber: 1, + requestRetryablePolicy: mockRequestRetryPolicy, + completion: completion + ) let queue = OperationQueue() queue.addOperation(operation) await fulfillment(of: [expectFirstCallToAPIMutate], timeout: defaultAsyncWaitTimeout) @@ -212,13 +229,16 @@ class SyncMutationToCloudOperationTests: XCTestCase { break } } - let operation = await SyncMutationToCloudOperation(mutationEvent: mutationEvent, - api: mockAPIPlugin, - authModeStrategy: AWSDefaultAuthModeStrategy(), - networkReachabilityPublisher: publisher, - currentAttemptNumber: 1, - requestRetryablePolicy: mockRequestRetryPolicy, - completion: completion) + let operation = await SyncMutationToCloudOperation( + mutationEvent: mutationEvent, + getLatestSyncMetadata: { nil }, + api: mockAPIPlugin, + authModeStrategy: AWSDefaultAuthModeStrategy(), + networkReachabilityPublisher: publisher, + currentAttemptNumber: 1, + requestRetryablePolicy: mockRequestRetryPolicy, + completion: completion + ) let queue = OperationQueue() queue.addOperation(operation) await fulfillment(of: [expectFirstCallToAPIMutate], timeout: defaultAsyncWaitTimeout) diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift index d0d19eeb4e..610c58ba31 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift @@ -314,23 +314,25 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { // MARK: - reconcile(remoteModels:pendingMutations) - func testReconcilePendingMutations_emptyModels() { - let result = operation.reconcile([], pendingMutations: [anyPostMutationEvent]) - - XCTAssertTrue(result.isEmpty) + func testSeparateDispositions_emptyModels() { + let result = operation.separateDispositions(pendingMutations: [], dispositions: []) + XCTAssertTrue(result.0.isEmpty) + XCTAssertTrue(result.1.isEmpty) } func testReconcilePendingMutations_emptyPendingMutations() { - let result = operation.reconcile([anyPostMutationSync], pendingMutations: []) + let result = operation.separateDispositions(pendingMutations: [], dispositions: [.update(anyPostMutationSync)]) - guard let remoteModelToApply = result.first else { + guard let remoteModelToApply = result.0.first else { XCTFail("Missing models to apply") return } - XCTAssertEqual(remoteModelToApply.model.id, anyPostMutationSync.model.id) + + XCTAssertTrue(result.1.isEmpty) + XCTAssertEqual(remoteModelToApply.remoteModel.model.id, anyPostMutationSync.model.id) } - func testReconcilePendingMutations_notifyDropped() { + func testSeparateDispositions_notifyDropped () async { let expect = expectation(description: "notify dropped twice") expect.expectedFulfillmentCount = 2 let model1 = AnyModel(Post(title: "post1", content: "content", createdAt: .now())) @@ -377,11 +379,15 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { } }.store(in: &cancellables) - let result = operation.reconcile([remoteModel1, remoteModel2], - pendingMutations: [mutationEvent1, mutationEvent2]) + let result = operation.separateDispositions( + pendingMutations: [mutationEvent1, mutationEvent2], + dispositions: [.update(remoteModel1), .update(remoteModel2)] + ) - XCTAssertTrue(result.isEmpty) - waitForExpectations(timeout: 1) + XCTAssertTrue(result.0.isEmpty) + XCTAssertEqual(result.1, [.update(remoteModel1), .update(remoteModel2)]) + + await fulfillment(of: [expect], timeout: 1) } // MARK: - queryLocalMetadata diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/RemoteSyncReconcilerTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/RemoteSyncReconcilerTests.swift index 7d734697fa..612947d04d 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/RemoteSyncReconcilerTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/RemoteSyncReconcilerTests.swift @@ -20,37 +20,6 @@ class RemoteSyncReconcilerTests: XCTestCase { ModelRegistry.register(modelType: MockSynced.self) } - // MARK: reconcile(pendingMutations) - - func testFilter_EmptyRemoteModels() { - let pendingMutations: [MutationEvent] = [makeMutationEvent()] - let results = RemoteSyncReconciler.filter([], pendingMutations: pendingMutations) - - XCTAssertTrue(results.isEmpty) - } - - func testFilter_EmptyPendingMutations() { - let remoteModel = makeRemoteModel() - let results = RemoteSyncReconciler.filter([remoteModel], pendingMutations: []) - - XCTAssertEqual(results.first?.model.id, remoteModel.model.id) - } - - func testFilter_pendingMutationMatchRemoteModel() { - let remoteModel = makeRemoteModel() - let pendingMutation = makeMutationEvent(modelId: remoteModel.model.id) - let results = RemoteSyncReconciler.filter([remoteModel], pendingMutations: [pendingMutation]) - - XCTAssertTrue(results.isEmpty) - } - - func testFilter_pendingMutationDoesNotMatchRemoteModel() { - let remoteModel = makeRemoteModel(modelId: "1") - let pendingMutation = makeMutationEvent(modelId: "2") - let results = RemoteSyncReconciler.filter([remoteModel], pendingMutations: [pendingMutation]) - - XCTAssertEqual(results.first?.model.id, remoteModel.model.id) - } // MARK: - reconcile(remoteModel:localMetadata) diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/Support/MutationEventExtensionsTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/Support/MutationEventExtensionsTests.swift deleted file mode 100644 index 7938a953ff..0000000000 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/Support/MutationEventExtensionsTests.swift +++ /dev/null @@ -1,404 +0,0 @@ -// -// Copyright Amazon.com Inc. or its affiliates. -// All Rights Reserved. -// -// SPDX-License-Identifier: Apache-2.0 -// - -import Foundation -import SQLite -import XCTest - -@testable import Amplify -@testable import AmplifyTestCommon -@testable import AWSDataStorePlugin -@testable import AWSPluginsCore - -// TODO: This flaky test has been disabled, tracking issue: https://github.com/aws-amplify/amplify-ios/issues/1831 -// swiftlint:disable type_body_length -class MutationEventExtensionsTest: BaseDataStoreTests { - - /// - Given: A pending mutation events queue with event containing `nil` version, a sent mutation - /// event model that matches the received mutation sync model. The received mutation sync has version 1. - /// - When: The sent model matches the received model and the first pending mutation event version is `nil`. - /// - Then: The pending mutation event version should be updated to the received model version of 1. - func testSentModelWithNilVersion_Reconciled() throws { - throw XCTSkip("TODO: fix this test") - let modelId = UUID().uuidString - let post = Post(id: modelId, title: "title", content: "content", createdAt: .now()) - let requestMutationEvent = try createMutationEvent(model: post, - mutationType: .create, - createdAt: .now(), - version: nil, - inProcess: true) - let pendingMutationEvent = try createMutationEvent(model: post, - mutationType: .update, - createdAt: .now().add(value: 1, to: .second), - version: nil) - let responseMutationSync = createMutationSync(model: post, version: 1) - - setUpPendingMutationQueue(post, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) - - let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, - with: requestMutationEvent, - responseMutationSync: responseMutationSync) - XCTAssertNotNil(reconciledEvent) - XCTAssertEqual(reconciledEvent?.version, responseMutationSync.syncMetadata.version) - - let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should be latest version") - let updatingVersionExpectation = expectation(description: "update latest mutation event with response version") - - // update the version of head of mutation event table for given model id to the version of `mutationSync` - MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, - received: responseMutationSync, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success: - updatingVersionExpectation.fulfill() - } - } - wait(for: [updatingVersionExpectation], timeout: 1) - - // query for head of mutation event table for given model id and check if it has the updated version - MutationEvent.pendingMutationEvents(forModel: post, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success(let mutationEvents): - guard !mutationEvents.isEmpty, let head = mutationEvents.first else { - XCTFail("Failure while updating version") - return - } - XCTAssertEqual(head.version, responseMutationSync.syncMetadata.version) - XCTAssertEqual(head.mutationType, MutationEvent.MutationType.update.rawValue) - queryAfterUpdatingVersionExpectation.fulfill() - } - } - wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) - } - - /// - Given: A pending mutation events queue with two events(update and delete) containing `nil` version, - /// a sent mutation event model that matches the received mutation sync model. The received mutation - /// sync has version 1. - /// - When: The sent model matches the received model, the first pending mutation event(update) version is `nil` and - /// the second pending mutation event(delete) version is `nil`. - /// - Then: The first pending mutation event(update) version should be updated to the received model version of 1 - /// and the second pending mutation event version(delete) should not be updated. - func testSentModelWithNilVersion_SecondPendingEventNotReconciled() throws { - throw XCTSkip("TODO: fix this test") - let modelId = UUID().uuidString - let post = Post(id: modelId, title: "title", content: "content", createdAt: .now()) - let requestMutationEvent = try createMutationEvent(model: post, - mutationType: .create, - createdAt: .now(), - version: nil, - inProcess: true) - let pendingUpdateMutationEvent = try createMutationEvent(model: post, - mutationType: .update, - createdAt: .now().add(value: 1, to: .second), - version: nil) - let pendingDeleteMutationEvent = try createMutationEvent(model: post, - mutationType: .delete, - createdAt: .now().add(value: 2, to: .second), - version: nil) - let responseMutationSync = createMutationSync(model: post, version: 1) - - setUpPendingMutationQueue(post, - [requestMutationEvent, pendingUpdateMutationEvent, pendingDeleteMutationEvent], - pendingUpdateMutationEvent) - - let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingUpdateMutationEvent, - with: requestMutationEvent, - responseMutationSync: responseMutationSync) - XCTAssertNotNil(reconciledEvent) - XCTAssertEqual(reconciledEvent?.version, responseMutationSync.syncMetadata.version) - - let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should be latest version") - let updatingVersionExpectation = expectation(description: "update latest mutation event with response version") - - // update the version of head of mutation event table for given model id to the version of `mutationSync` - MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, - received: responseMutationSync, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success: - updatingVersionExpectation.fulfill() - } - } - wait(for: [updatingVersionExpectation], timeout: 1) - - // query for head of mutation event table for given model id and check if it has the updated version - MutationEvent.pendingMutationEvents(forModel: post, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success(let mutationEvents): - guard !mutationEvents.isEmpty, let head = mutationEvents.first, let last = mutationEvents.last else { - XCTFail("Failure while updating version") - return - } - XCTAssertEqual(head.version, responseMutationSync.syncMetadata.version) - XCTAssertEqual(head.mutationType, MutationEvent.MutationType.update.rawValue) - XCTAssertEqual(last, pendingDeleteMutationEvent) - queryAfterUpdatingVersionExpectation.fulfill() - } - } - wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) - } - - /// - Given: A pending mutation events queue with event containing version 2, a sent mutation event model - /// that matches the received mutation sync model having version 2. The received mutation sync has - /// version 1. - /// - When: The sent model matches the received model and the first pending mutation event version is 2. - /// - Then: The first pending mutation event version should NOT be updated. - func testSentModelVersionNewerThanResponseVersion_PendingEventNotReconciled() throws { - throw XCTSkip("TODO: fix this test") - let modelId = UUID().uuidString - let post1 = Post(id: modelId, title: "title1", content: "content1", createdAt: .now()) - let post2 = Post(id: modelId, title: "title2", content: "content2", createdAt: .now()) - let requestMutationEvent = try createMutationEvent(model: post1, - mutationType: .create, - createdAt: .now(), - version: 2, - inProcess: true) - let pendingMutationEvent = try createMutationEvent(model: post2, - mutationType: .update, - createdAt: .now().add(value: 1, to: .second), - version: 2) - let responseMutationSync = createMutationSync(model: post1, version: 1) - - setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) - - let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, - with: requestMutationEvent, - responseMutationSync: responseMutationSync) - XCTAssertNil(reconciledEvent) - - let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should have version 2") - let updatingVersionExpectation = - expectation(description: "don't update latest mutation event with response version") - - MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, - received: responseMutationSync, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success: - updatingVersionExpectation.fulfill() - } - } - wait(for: [updatingVersionExpectation], timeout: 1) - - // query for head of mutation event table for given model id and check if it has the correct version - MutationEvent.pendingMutationEvents(forModel: post1, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success(let mutationEvents): - guard !mutationEvents.isEmpty, let head = mutationEvents.first else { - XCTFail("Failure while updating version") - return - } - XCTAssertNotEqual(head.version, responseMutationSync.syncMetadata.version) - XCTAssertEqual(head, pendingMutationEvent) - queryAfterUpdatingVersionExpectation.fulfill() - } - } - wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) - } - - /// - Given: A pending mutation events queue with event containing version 1, a sent mutation event model - /// that doesn't match the received mutation sync model having version 1. The received mutation - /// sync has version 2. - /// - When: The sent model doesn't match the received model and the first pending mutation event version is 1. - /// - Then: The first pending mutation event version should NOT be updated. - func testSentModelNotEqualToResponseModel_PendingEventNotReconciled() throws { - throw XCTSkip("TODO: fix this test") - let modelId = UUID().uuidString - let post1 = Post(id: modelId, title: "title1", content: "content1", createdAt: .now()) - let post2 = Post(id: modelId, title: "title2", content: "content2", createdAt: .now()) - let post3 = Post(id: modelId, title: "title3", content: "content3", createdAt: .now()) - let requestMutationEvent = try createMutationEvent(model: post1, - mutationType: .update, - createdAt: .now(), - version: 1, - inProcess: true) - let pendingMutationEvent = try createMutationEvent(model: post2, - mutationType: .update, - createdAt: .now().add(value: 1, to: .second), - version: 1) - let responseMutationSync = createMutationSync(model: post3, version: 2) - - setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) - - let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, - with: requestMutationEvent, - responseMutationSync: responseMutationSync) - XCTAssertNil(reconciledEvent) - - let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should have version 1") - let updatingVersionExpectation = - expectation(description: "don't update latest mutation event with response version") - - MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, - received: responseMutationSync, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success: - updatingVersionExpectation.fulfill() - } - } - wait(for: [updatingVersionExpectation], timeout: 1) - - // query for head of mutation event table for given model id and check if it has the correct version - MutationEvent.pendingMutationEvents(forModel: post1, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success(let mutationEvents): - guard !mutationEvents.isEmpty, let head = mutationEvents.first else { - XCTFail("Failure while updating version") - return - } - XCTAssertNotEqual(head.version, responseMutationSync.syncMetadata.version) - XCTAssertEqual(head, pendingMutationEvent) - queryAfterUpdatingVersionExpectation.fulfill() - } - } - wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) - } - - /// - Given: A pending mutation events queue with event containing version 1, a sent mutation event model - /// that matches the received mutation sync model having version 1. The received mutation sync - /// has version 2. - /// - When: The sent model matches the received model and the first pending mutation event version is 1. - /// - Then: The first pending mutation event version should be updated to received mutation sync version i.e. 2. - func testPendingVersionReconciledSuccess() throws { - throw XCTSkip("TODO: fix this test") - let modelId = UUID().uuidString - let post1 = Post(id: modelId, title: "title1", content: "content1", createdAt: .now()) - let post2 = Post(id: modelId, title: "title2", content: "content2", createdAt: .now()) - let requestMutationEvent = try createMutationEvent(model: post1, - mutationType: .update, - createdAt: .now(), - version: 1, - inProcess: true) - let pendingMutationEvent = try createMutationEvent(model: post2, - mutationType: .update, - createdAt: .now().add(value: 1, to: .second), - version: 1) - let responseMutationSync = createMutationSync(model: post1, version: 2) - - setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) - - let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, - with: requestMutationEvent, - responseMutationSync: responseMutationSync) - XCTAssertNotNil(reconciledEvent) - XCTAssertEqual(reconciledEvent?.version, responseMutationSync.syncMetadata.version) - - let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should have version 2") - let updatingVersionExpectation = expectation(description: "update latest mutation event with response version") - - MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, - received: responseMutationSync, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success: - updatingVersionExpectation.fulfill() - } - } - wait(for: [updatingVersionExpectation], timeout: 1) - - // query for head of mutation event table for given model id and check if it has the correct version - MutationEvent.pendingMutationEvents(forModel: post1, - storageAdapter: storageAdapter) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success(let mutationEvents): - guard !mutationEvents.isEmpty, let head = mutationEvents.first else { - XCTFail("Failure while updating version") - return - } - XCTAssertEqual(head.version, responseMutationSync.syncMetadata.version) - XCTAssertEqual(head.mutationType, MutationEvent.MutationType.update.rawValue) - queryAfterUpdatingVersionExpectation.fulfill() - } - } - wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) - } - - private func createMutationEvent(model: Model, - mutationType: MutationEvent.MutationType, - createdAt: Temporal.DateTime, - version: Int? = nil, - inProcess: Bool = false) throws -> MutationEvent { - return MutationEvent(id: UUID().uuidString, - modelId: model.identifier(schema: MutationEvent.schema).stringValue, - modelName: model.modelName, - json: try model.toJSON(), - mutationType: mutationType, - createdAt: createdAt, - version: version, - inProcess: inProcess) - } - - private func createMutationSync(model: Model, version: Int = 1) -> MutationSync { - let metadata = MutationSyncMetadata(modelId: model.identifier(schema: MutationEvent.schema).stringValue, - modelName: model.modelName, - deleted: false, - lastChangedAt: Int64(Date().timeIntervalSince1970), - version: version) - return MutationSync(model: AnyModel(model), syncMetadata: metadata) - } - - private func setUpPendingMutationQueue(_ model: Model, - _ mutationEvents: [MutationEvent], - _ expectedHeadOfQueue: MutationEvent) { - for mutationEvent in mutationEvents { - let mutationEventSaveExpectation = expectation(description: "save mutation event success") - storageAdapter.save(mutationEvent) { result in - guard case .success = result else { - XCTFail("Failed to save metadata") - return - } - mutationEventSaveExpectation.fulfill() - } - wait(for: [mutationEventSaveExpectation], timeout: 1) - } - - // verify the head of queue is expected - let headOfQueueExpectation = expectation(description: "head of mutation event queue is as expected") - MutationEvent.pendingMutationEvents( - forModel: model, - storageAdapter: storageAdapter - ) { result in - switch result { - case .failure(let error): - XCTFail("Error : \(error)") - case .success(let events): - guard !events.isEmpty, let head = events.first else { - XCTFail("Failure while fetching mutation events") - return - } - XCTAssertEqual(head, expectedHeadOfQueue) - headOfQueueExpectation.fulfill() - } - } - wait(for: [headOfQueueExpectation], timeout: 1) - } -} diff --git a/AmplifyTests/CategoryTests/Storage/StorageCategoryConfigurationTests.swift b/AmplifyTests/CategoryTests/Storage/StorageCategoryConfigurationTests.swift index f653699f38..cccf40b9a1 100644 --- a/AmplifyTests/CategoryTests/Storage/StorageCategoryConfigurationTests.swift +++ b/AmplifyTests/CategoryTests/Storage/StorageCategoryConfigurationTests.swift @@ -157,7 +157,7 @@ class StorageCategoryConfigurationTests: XCTestCase { _ = try Amplify.Storage.getPlugin(for: "MockSecondStorageCategoryPlugin") .downloadData(key: "", options: nil) - await fulfillment(of: [methodShouldNotBeInvokedOnDefaultPlugin, methodShouldBeInvokedOnSecondPlugin]) + await fulfillment(of: [methodShouldNotBeInvokedOnDefaultPlugin, methodShouldBeInvokedOnSecondPlugin], timeout: 10) } func testPreconditionFailureInvokingWithMultiplePlugins() async throws {