From 43685a207701916d81a4f6d38692afb5f6b1d980 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Thu, 4 Jan 2024 14:36:54 -0800 Subject: [PATCH] fix(datastore-v1): update pending mutation events version from mutation response --- .../OutgoingMutationQueue.swift | 8 +- .../SyncMutationToCloudOperation.swift | 35 +- .../Support/MutationEvent+Extensions.swift | 103 +++++ .../MutationEventExtensionsTests.swift | 408 ++++++++++++++++++ .../project.pbxproj | 8 + AmplifyPlugins/DataStore/Podfile.lock | 2 +- 6 files changed, 558 insertions(+), 6 deletions(-) create mode 100644 AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Extensions.swift create mode 100644 AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/Support/MutationEventExtensionsTests.swift diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 0c1905288c..9d9bcd5d2c 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -262,7 +262,13 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { return } reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName) - completeProcessingEvent(mutationEvent, mutationSync: mutationSync) + MutationEvent.reconcilePendingMutationEventsVersion( + sent: mutationEvent, + received: mutationSync, + storageAdapter: storageAdapter + ) { _ in + self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync) + } } else { completeProcessingEvent(mutationEvent) } diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift index 0be465f6b0..6f7001f306 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift @@ -102,6 +102,33 @@ class SyncMutationToCloudOperation: AsynchronousOperation { } } + /// Always retrieve and use the largest version when available. The source of the version comes + /// from either the MutationEvent itself, which represents the queue request, or the persisted version + /// from the metadata table. + /// + /// **Version in the Mutation Event**. If there are mulitple mutation events pending, each outgoing + /// mutation processing will result in synchronously updating the pending mutation's version + /// before enqueuing the mutation response for reconciliation. + /// + /// **Version persisted in the metadata table**: Reconciliation will persist the latest version in the + /// metadata table. In cases of quick consecutive updates, the MutationEvent's version could + /// be greater than the persisted since the MutationEvent is updated from the original thread that + /// processed the outgoing mutation. + private func getLatestVersion(_ mutationEvent: MutationEvent) -> Int? { + let latestSyncedMetadataVersion = getLatestSyncMetadata()?.version + let mutationEventVersion = mutationEvent.version + switch (latestSyncedMetadataVersion, mutationEventVersion) { + case let (.some(syncedVersion), .some(version)): + return max(syncedVersion, version) + case let (.some(syncedVersion), .none): + return syncedVersion + case let (.none, .some(version)): + return version + case (.none, .none): + return nil + } + } + /// Creates a GraphQLRequest based on given `mutationType` /// - Parameters: /// - mutationType: mutation type @@ -112,7 +139,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { authType: AWSAuthorizationType? = nil ) -> GraphQLRequest>? { var request: GraphQLRequest> - let latestSyncMetadata = getLatestSyncMetadata() + let version = getLatestVersion(mutationEvent) do { var graphQLFilter: GraphQLFilter? if let graphQLFilterJSON = mutationEvent.graphQLFilterJSON { @@ -131,7 +158,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { request = GraphQLRequest.deleteMutation(of: model, modelSchema: modelSchema, where: graphQLFilter, - version: latestSyncMetadata?.version) + version: version) case .update: let model = try mutationEvent.decodeModel() guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else { @@ -143,7 +170,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { request = GraphQLRequest.updateMutation(of: model, modelSchema: modelSchema, where: graphQLFilter, - version: latestSyncMetadata?.version) + version: version) case .create: let model = try mutationEvent.decodeModel() guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else { @@ -154,7 +181,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { } request = GraphQLRequest.createMutation(of: model, modelSchema: modelSchema, - version: latestSyncMetadata?.version) + version: version) } } catch { let apiError = APIError.unknown("Couldn't decode model", "", error) diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Extensions.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Extensions.swift new file mode 100644 index 0000000000..3d99be0a03 --- /dev/null +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/Support/MutationEvent+Extensions.swift @@ -0,0 +1,103 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Amplify +import Dispatch +import AWSPluginsCore + +extension MutationEvent { + // Consecutive operations that modify a model results in a sequence of pending mutation events that + // have the current version of the model. The first mutation event has the correct version of the model, + // while the subsequent events will have lower versions if the first mutation event is successfully synced + // to the cloud. By reconciling the pending mutation events after syncing the first mutation event, + // we attempt to update the pending version to the latest version from the response. + // The before and after conditions for consecutive update scenarios are as below: + // - Save, then immediately update + // Queue Before - [(version: nil, inprocess: true, type: .create), + // (version: nil, inprocess: false, type: .update)] + // Response - [version: 1, type: .create] + // Queue After - [(version: 1, inprocess: false, type: .update)] + // - Save, then immediately delete + // Queue Before - [(version: nil, inprocess: true, type: .create), + // (version: nil, inprocess: false, type: .delete)] + // Response - [version: 1, type: .create] + // Queue After - [(version: 1, inprocess: false, type: .delete)] + // - Save, sync, then immediately update and delete + // Queue Before (After save, sync) + // - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)] + // Response - [version: 2, type: .update] + // Queue After - [(version: 2, inprocess: false, type: .delete)] + // + // For a given model `id`, checks the version of the head of pending mutation event queue + // against the API response version in `mutationSync` and saves it in the mutation event table if + // the response version is a newer one + static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent, + received mutationSync: MutationSync, + storageAdapter: StorageEngineAdapter, + completion: @escaping DataStoreCallback) { + MutationEvent.pendingMutationEvents( + forMutationEvent: mutationEvent, + storageAdapter: storageAdapter) { queryResult in + switch queryResult { + case .failure(let dataStoreError): + completion(.failure(dataStoreError)) + case .success(let localMutationEvents): + guard let existingEvent = localMutationEvents.first else { + completion(.success(())) + return + } + + guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent, + with: mutationEvent, + responseMutationSync: mutationSync) else { + completion(.success(())) + return + } + + storageAdapter.save(reconciledEvent, condition: nil) { result in + switch result { + case .failure(let dataStoreError): + completion(.failure(dataStoreError)) + case .success: + completion(.success(())) + } + } + } + } + } + + static func reconcile(pendingMutationEvent: MutationEvent, + with requestMutationEvent: MutationEvent, + responseMutationSync: MutationSync) -> MutationEvent? { + // return if version of the pending mutation event is not nil and + // is >= version contained in the response + if pendingMutationEvent.version != nil && + pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version { + return nil + } + + do { + let responseModel = responseMutationSync.model.instance + let requestModel = try requestMutationEvent.decodeModel() + + // check if the data sent in the request is the same as the response + // if it is, update the pending mutation event version to the response version + guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName), + modelSchema.compare(responseModel, requestModel) else { + return nil + } + + var pendingMutationEvent = pendingMutationEvent + pendingMutationEvent.version = responseMutationSync.syncMetadata.version + return pendingMutationEvent + } catch { + Amplify.log.verbose("Error decoding models: \(error)") + return nil + } + } + +} diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/Support/MutationEventExtensionsTests.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/Support/MutationEventExtensionsTests.swift new file mode 100644 index 0000000000..82b76780bf --- /dev/null +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/Support/MutationEventExtensionsTests.swift @@ -0,0 +1,408 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Foundation +import SQLite +import XCTest + +@testable import Amplify +@testable import AmplifyTestCommon +@testable import AWSDataStoreCategoryPlugin +@testable import AWSPluginsCore + +// swiftlint:disable type_body_length +class MutationEventExtensionsTest: BaseDataStoreTests { + + /// - Given: A pending mutation events queue with event containing `nil` version, a sent mutation + /// event model that matches the received mutation sync model. The received mutation sync has version 1. + /// - When: The sent model matches the received model and the first pending mutation event version is `nil`. + /// - Then: The pending mutation event version should be updated to the received model version of 1. + func testSentModelWithNilVersion_Reconciled() throws { + let modelId = UUID().uuidString + let post = Post(id: modelId, title: "title", content: "content", createdAt: .now()) + let requestMutationEvent = try createMutationEvent(model: post, + mutationType: .create, + createdAt: .now(), + version: nil, + inProcess: true) + let pendingMutationEvent = try createMutationEvent(model: post, + mutationType: .update, + createdAt: .now().add(value: 1, to: .second), + version: nil) + let responseMutationSync = createMutationSync(model: post, version: 1) + + setUpPendingMutationQueue(post, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) + + let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, + with: requestMutationEvent, + responseMutationSync: responseMutationSync) + XCTAssertNotNil(reconciledEvent) + XCTAssertEqual(reconciledEvent?.version, responseMutationSync.syncMetadata.version) + + let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should be latest version") + let updatingVersionExpectation = expectation(description: "update latest mutation event with response version") + + // update the version of head of mutation event table for given model id to the version of `mutationSync` + MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, + received: responseMutationSync, + storageAdapter: storageAdapter) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success: + updatingVersionExpectation.fulfill() + } + } + wait(for: [updatingVersionExpectation], timeout: 1) + + // query for head of mutation event table for given model id and check if it has the updated version + MutationEvent.pendingMutationEvents( + forModel: post, + storageAdapter: storageAdapter + ) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success(let mutationEvents): + guard !mutationEvents.isEmpty, let head = mutationEvents.first else { + XCTFail("Failure while updating version") + return + } + XCTAssertEqual(head.version, responseMutationSync.syncMetadata.version) + XCTAssertEqual(head.mutationType, MutationEvent.MutationType.update.rawValue) + queryAfterUpdatingVersionExpectation.fulfill() + } + } + wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) + } + + /// - Given: A pending mutation events queue with two events(update and delete) containing `nil` version, + /// a sent mutation event model that matches the received mutation sync model. The received mutation + /// sync has version 1. + /// - When: The sent model matches the received model, the first pending mutation event(update) version is `nil` and + /// the second pending mutation event(delete) version is `nil`. + /// - Then: The first pending mutation event(update) version should be updated to the received model version of 1 + /// and the second pending mutation event version(delete) should not be updated. + func testSentModelWithNilVersion_SecondPendingEventNotReconciled() throws { + let modelId = UUID().uuidString + let post = Post(id: modelId, title: "title", content: "content", createdAt: .now()) + let requestMutationEvent = try createMutationEvent(model: post, + mutationType: .create, + createdAt: .now(), + version: nil, + inProcess: true) + let pendingUpdateMutationEvent = try createMutationEvent(model: post, + mutationType: .update, + createdAt: .now().add(value: 1, to: .second), + version: nil) + let pendingDeleteMutationEvent = try createMutationEvent(model: post, + mutationType: .delete, + createdAt: .now().add(value: 2, to: .second), + version: nil) + let responseMutationSync = createMutationSync(model: post, version: 1) + + setUpPendingMutationQueue(post, + [requestMutationEvent, pendingUpdateMutationEvent, pendingDeleteMutationEvent], + pendingUpdateMutationEvent) + + let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingUpdateMutationEvent, + with: requestMutationEvent, + responseMutationSync: responseMutationSync) + XCTAssertNotNil(reconciledEvent) + XCTAssertEqual(reconciledEvent?.version, responseMutationSync.syncMetadata.version) + + let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should be latest version") + let updatingVersionExpectation = expectation(description: "update latest mutation event with response version") + + // update the version of head of mutation event table for given model id to the version of `mutationSync` + MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, + received: responseMutationSync, + storageAdapter: storageAdapter) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success: + updatingVersionExpectation.fulfill() + } + } + wait(for: [updatingVersionExpectation], timeout: 1) + + // query for head of mutation event table for given model id and check if it has the updated version + MutationEvent.pendingMutationEvents( + forModel: post, + storageAdapter: storageAdapter + ) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success(let mutationEvents): + guard !mutationEvents.isEmpty, let head = mutationEvents.first, let last = mutationEvents.last else { + XCTFail("Failure while updating version") + return + } + XCTAssertEqual(head.version, responseMutationSync.syncMetadata.version) + XCTAssertEqual(head.mutationType, MutationEvent.MutationType.update.rawValue) + XCTAssertEqual(last, pendingDeleteMutationEvent) + queryAfterUpdatingVersionExpectation.fulfill() + } + } + wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) + } + + /// - Given: A pending mutation events queue with event containing version 2, a sent mutation event model + /// that matches the received mutation sync model having version 2. The received mutation sync has + /// version 1. + /// - When: The sent model matches the received model and the first pending mutation event version is 2. + /// - Then: The first pending mutation event version should NOT be updated. + func testSentModelVersionNewerThanResponseVersion_PendingEventNotReconciled() throws { + let modelId = UUID().uuidString + let post1 = Post(id: modelId, title: "title1", content: "content1", createdAt: .now()) + let post2 = Post(id: modelId, title: "title2", content: "content2", createdAt: .now()) + let requestMutationEvent = try createMutationEvent(model: post1, + mutationType: .create, + createdAt: .now(), + version: 2, + inProcess: true) + let pendingMutationEvent = try createMutationEvent(model: post2, + mutationType: .update, + createdAt: .now().add(value: 1, to: .second), + version: 2) + let responseMutationSync = createMutationSync(model: post1, version: 1) + + setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) + + let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, + with: requestMutationEvent, + responseMutationSync: responseMutationSync) + XCTAssertNil(reconciledEvent) + + let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should have version 2") + let updatingVersionExpectation = + expectation(description: "don't update latest mutation event with response version") + + MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, + received: responseMutationSync, + storageAdapter: storageAdapter) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success: + updatingVersionExpectation.fulfill() + } + } + wait(for: [updatingVersionExpectation], timeout: 1) + + // query for head of mutation event table for given model id and check if it has the correct version + MutationEvent.pendingMutationEvents( + forModel: post1, + storageAdapter: storageAdapter + ) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success(let mutationEvents): + guard !mutationEvents.isEmpty, let head = mutationEvents.first else { + XCTFail("Failure while updating version") + return + } + XCTAssertNotEqual(head.version, responseMutationSync.syncMetadata.version) + XCTAssertEqual(head, pendingMutationEvent) + queryAfterUpdatingVersionExpectation.fulfill() + } + } + wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) + } + + /// - Given: A pending mutation events queue with event containing version 1, a sent mutation event model + /// that doesn't match the received mutation sync model having version 1. The received mutation + /// sync has version 2. + /// - When: The sent model doesn't match the received model and the first pending mutation event version is 1. + /// - Then: The first pending mutation event version should NOT be updated. + func testSentModelNotEqualToResponseModel_PendingEventNotReconciled() throws { + let modelId = UUID().uuidString + let post1 = Post(id: modelId, title: "title1", content: "content1", createdAt: .now()) + let post2 = Post(id: modelId, title: "title2", content: "content2", createdAt: .now()) + let post3 = Post(id: modelId, title: "title3", content: "content3", createdAt: .now()) + let requestMutationEvent = try createMutationEvent(model: post1, + mutationType: .update, + createdAt: .now(), + version: 1, + inProcess: true) + let pendingMutationEvent = try createMutationEvent(model: post2, + mutationType: .update, + createdAt: .now().add(value: 1, to: .second), + version: 1) + let responseMutationSync = createMutationSync(model: post3, version: 2) + + setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) + + let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, + with: requestMutationEvent, + responseMutationSync: responseMutationSync) + XCTAssertNil(reconciledEvent) + + let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should have version 1") + let updatingVersionExpectation = + expectation(description: "don't update latest mutation event with response version") + + MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, + received: responseMutationSync, + storageAdapter: storageAdapter) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success: + updatingVersionExpectation.fulfill() + } + } + wait(for: [updatingVersionExpectation], timeout: 1) + + // query for head of mutation event table for given model id and check if it has the correct version + MutationEvent.pendingMutationEvents( + forModel: post1, + storageAdapter: storageAdapter + ) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success(let mutationEvents): + guard !mutationEvents.isEmpty, let head = mutationEvents.first else { + XCTFail("Failure while updating version") + return + } + XCTAssertNotEqual(head.version, responseMutationSync.syncMetadata.version) + XCTAssertEqual(head, pendingMutationEvent) + queryAfterUpdatingVersionExpectation.fulfill() + } + } + wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) + } + + /// - Given: A pending mutation events queue with event containing version 1, a sent mutation event model + /// that matches the received mutation sync model having version 1. The received mutation sync + /// has version 2. + /// - When: The sent model matches the received model and the first pending mutation event version is 1. + /// - Then: The first pending mutation event version should be updated to received mutation sync version i.e. 2. + func testPendingVersionReconciledSuccess() throws { + let modelId = UUID().uuidString + let post1 = Post(id: modelId, title: "title1", content: "content1", createdAt: .now()) + let post2 = Post(id: modelId, title: "title2", content: "content2", createdAt: .now()) + let requestMutationEvent = try createMutationEvent(model: post1, + mutationType: .update, + createdAt: .now(), + version: 1, + inProcess: true) + let pendingMutationEvent = try createMutationEvent(model: post2, + mutationType: .update, + createdAt: .now().add(value: 1, to: .second), + version: 1) + let responseMutationSync = createMutationSync(model: post1, version: 2) + + setUpPendingMutationQueue(post1, [requestMutationEvent, pendingMutationEvent], pendingMutationEvent) + + let reconciledEvent = MutationEvent.reconcile(pendingMutationEvent: pendingMutationEvent, + with: requestMutationEvent, + responseMutationSync: responseMutationSync) + XCTAssertNotNil(reconciledEvent) + XCTAssertEqual(reconciledEvent?.version, responseMutationSync.syncMetadata.version) + + let queryAfterUpdatingVersionExpectation = expectation(description: "update mutation should have version 2") + let updatingVersionExpectation = expectation(description: "update latest mutation event with response version") + + MutationEvent.reconcilePendingMutationEventsVersion(sent: requestMutationEvent, + received: responseMutationSync, + storageAdapter: storageAdapter) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success: + updatingVersionExpectation.fulfill() + } + } + wait(for: [updatingVersionExpectation], timeout: 1) + + // query for head of mutation event table for given model id and check if it has the correct version + MutationEvent.pendingMutationEvents( + forModel: post1, + storageAdapter: storageAdapter + ) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success(let mutationEvents): + guard !mutationEvents.isEmpty, let head = mutationEvents.first else { + XCTFail("Failure while updating version") + return + } + XCTAssertEqual(head.version, responseMutationSync.syncMetadata.version) + XCTAssertEqual(head.mutationType, MutationEvent.MutationType.update.rawValue) + queryAfterUpdatingVersionExpectation.fulfill() + } + } + wait(for: [queryAfterUpdatingVersionExpectation], timeout: 1) + } + + private func createMutationEvent(model: Model, + mutationType: MutationEvent.MutationType, + createdAt: Temporal.DateTime, + version: Int? = nil, + inProcess: Bool = false) throws -> MutationEvent { + return MutationEvent(id: UUID().uuidString, + modelId: model.identifier(schema: MutationEvent.schema).stringValue, + modelName: model.modelName, + json: try model.toJSON(), + mutationType: mutationType, + createdAt: createdAt, + version: version, + inProcess: inProcess) + } + + private func createMutationSync(model: Model, version: Int = 1) -> MutationSync { + let metadata = MutationSyncMetadata(modelId: model.identifier(schema: MutationEvent.schema).stringValue, + modelName: model.modelName, + deleted: false, + lastChangedAt: Int(Date().timeIntervalSince1970), + version: version) + return MutationSync(model: AnyModel(model), syncMetadata: metadata) + } + + private func setUpPendingMutationQueue(_ model: Model, + _ mutationEvents: [MutationEvent], + _ expectedHeadOfQueue: MutationEvent) { + for mutationEvent in mutationEvents { + let mutationEventSaveExpectation = expectation(description: "save mutation event success") + storageAdapter.save(mutationEvent) { result in + guard case .success = result else { + XCTFail("Failed to save metadata") + return + } + mutationEventSaveExpectation.fulfill() + } + wait(for: [mutationEventSaveExpectation], timeout: 1) + } + + // verify the head of queue is expected + let headOfQueueExpectation = expectation(description: "head of mutation event queue is as expected") + MutationEvent.pendingMutationEvents( + forModel: model, + storageAdapter: storageAdapter + ) { result in + switch result { + case .failure(let error): + XCTFail("Error : \(error)") + case .success(let events): + guard !events.isEmpty, let head = events.first else { + XCTFail("Failure while fetching mutation events") + return + } + XCTAssertEqual(head, expectedHeadOfQueue) + headOfQueueExpectation.fulfill() + } + } + wait(for: [headOfQueueExpectation], timeout: 1) + } +} // swiftlint:disable:this file_length diff --git a/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj b/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj index b9b8078505..1fed9a912a 100644 --- a/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj +++ b/AmplifyPlugins/DataStore/DataStoreCategoryPlugin.xcodeproj/project.pbxproj @@ -126,6 +126,8 @@ 21FDBBDB2587DB7A0086FCDC /* DataStoreConnectionScenario6Tests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 21FDBBDA2587DB7A0086FCDC /* DataStoreConnectionScenario6Tests.swift */; }; 21FE027E25890B2F00B81D72 /* DataStoreListProvider.swift in Sources */ = {isa = PBXBuildFile; fileRef = 21FE027D25890B2F00B81D72 /* DataStoreListProvider.swift */; }; 3AC75F1100634561C7A480BE /* Pods_HostApp_AWSDataStoreCategoryPluginFlutterIntegrationTests.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 3689EBBD76A66F01A7ABFF16 /* Pods_HostApp_AWSDataStoreCategoryPluginFlutterIntegrationTests.framework */; }; + 6037DBF22B4763A900DB0742 /* MutationEvent+Extensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6037DBF12B4763A900DB0742 /* MutationEvent+Extensions.swift */; }; + 6037DBF42B4763E300DB0742 /* MutationEventExtensionsTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6037DBF32B4763E300DB0742 /* MutationEventExtensionsTests.swift */; }; 6B01B71D23A4615900AD0E97 /* SyncMutationToCloudOperationTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B01B71C23A4615900AD0E97 /* SyncMutationToCloudOperationTests.swift */; }; 6B01B72023A4672500AD0E97 /* RequestRetryable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B01B71E23A4672500AD0E97 /* RequestRetryable.swift */; }; 6B01B72223A4672500AD0E97 /* RequestRetryablePolicy.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6B01B71F23A4672500AD0E97 /* RequestRetryablePolicy.swift */; }; @@ -568,6 +570,8 @@ 32AD9436C9FB473423CA9786 /* Pods-AWSDataStoreCategoryPlugin.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-AWSDataStoreCategoryPlugin.release.xcconfig"; path = "Target Support Files/Pods-AWSDataStoreCategoryPlugin/Pods-AWSDataStoreCategoryPlugin.release.xcconfig"; sourceTree = ""; }; 3689EBBD76A66F01A7ABFF16 /* Pods_HostApp_AWSDataStoreCategoryPluginFlutterIntegrationTests.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_HostApp_AWSDataStoreCategoryPluginFlutterIntegrationTests.framework; sourceTree = BUILT_PRODUCTS_DIR; }; 5B62423E457D3149264ADA1F /* Pods-HostApp-AWSDataStoreCategoryPluginFlutterIntegrationTests.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-HostApp-AWSDataStoreCategoryPluginFlutterIntegrationTests.debug.xcconfig"; path = "Target Support Files/Pods-HostApp-AWSDataStoreCategoryPluginFlutterIntegrationTests/Pods-HostApp-AWSDataStoreCategoryPluginFlutterIntegrationTests.debug.xcconfig"; sourceTree = ""; }; + 6037DBF12B4763A900DB0742 /* MutationEvent+Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "MutationEvent+Extensions.swift"; sourceTree = ""; }; + 6037DBF32B4763E300DB0742 /* MutationEventExtensionsTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = MutationEventExtensionsTests.swift; sourceTree = ""; }; 6A1D332BE6CF885805360B3D /* Pods_AWSDataStoreCategoryPlugin.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_AWSDataStoreCategoryPlugin.framework; sourceTree = BUILT_PRODUCTS_DIR; }; 6B01B71C23A4615900AD0E97 /* SyncMutationToCloudOperationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SyncMutationToCloudOperationTests.swift; sourceTree = ""; }; 6B01B71E23A4672500AD0E97 /* RequestRetryable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RequestRetryable.swift; sourceTree = ""; }; @@ -1133,6 +1137,7 @@ 214B6B66264B156200A9311D /* Support */ = { isa = PBXGroup; children = ( + 6037DBF32B4763E300DB0742 /* MutationEventExtensionsTests.swift */, 210E218126601C1C00D90ED8 /* MutationEventQueryTests.swift */, 214B6B67264B157500A9311D /* StopwatchTests.swift */, 973AF1AE26E016EC00BED353 /* ModelCompareTests.swift */, @@ -1595,6 +1600,7 @@ FA8F4D1C2395AF5E00861D91 /* Support */ = { isa = PBXGroup; children = ( + 6037DBF12B4763A900DB0742 /* MutationEvent+Extensions.swift */, FAAA588F2396BC5A008A4DB6 /* CancelAwareBlockOperation.swift */, FA8F4D1D2395AF7600861D91 /* DataStoreError+Plugin.swift */, FA8F4D212395B11700861D91 /* MutationEvent+Query.swift */, @@ -2600,6 +2606,7 @@ 2149E5CD2388684F00873955 /* SQLStatement+Update.swift in Sources */, FAF7CEC9238C6A940095547B /* SyncMutationToCloudOperation.swift in Sources */, 762383A527501EBC00EAF1C7 /* RemoteSyncEngine+AuthModeStrategyDelegate.swift in Sources */, + 6037DBF22B4763A900DB0742 /* MutationEvent+Extensions.swift in Sources */, 21DDCDFE272C861F00D9B297 /* MutationSyncMetadataMigrationDelegate+SQLiteValidation.swift in Sources */, FA8F4D1E2395AF7600861D91 /* DataStoreError+Plugin.swift in Sources */, FA6C3FEE239890B500A73110 /* AWSMutationEventPublisher.swift in Sources */, @@ -2710,6 +2717,7 @@ 21DDCE0427303E5300D9B297 /* AWSDataStorePluginBaseBehaviorTests.swift in Sources */, 2149E5FF238869CF00873955 /* SQLStatementTests.swift in Sources */, 6B64027923E3584300001FD7 /* MockAWSIncomingEventReconciliationQueue.swift in Sources */, + 6037DBF42B4763E300DB0742 /* MutationEventExtensionsTests.swift in Sources */, 7638898F26AB56580061AF0B /* StorageEngineSyncRequirementsTests.swift in Sources */, FA0427D02396CDD800D25AB0 /* DataStoreHubTests.swift in Sources */, FA5113AE26A9E808007B1F25 /* OutgoingMutationQueueNetworkTests.swift in Sources */, diff --git a/AmplifyPlugins/DataStore/Podfile.lock b/AmplifyPlugins/DataStore/Podfile.lock index a7f876e58b..49bfdb23bc 100644 --- a/AmplifyPlugins/DataStore/Podfile.lock +++ b/AmplifyPlugins/DataStore/Podfile.lock @@ -128,4 +128,4 @@ SPEC CHECKSUMS: PODFILE CHECKSUM: 0bab7193bebdf470839514f327440893b0d26090 -COCOAPODS: 1.11.3 +COCOAPODS: 1.14.3