diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift index 474f76666e..7ba4449c50 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift @@ -337,7 +337,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { } enum ApplyRemoteModelResult { - case applied(RemoteModel) + case applied(RemoteModel, AppliedModel) case dropped } @@ -363,7 +363,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { promise(.failure(dataStoreError)) } case .success: - promise(.success(.applied(remoteModel))) + promise(.success(.applied(remoteModel, remoteModel))) } } } @@ -387,14 +387,13 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { let anyModel: AnyModel do { anyModel = try savedModel.eraseToAnyModel() + let appliedModel = MutationSync(model: anyModel, syncMetadata: remoteModel.syncMetadata) + promise(.success(.applied(remoteModel, appliedModel))) } catch { let dataStoreError = DataStoreError(error: error) self.notifyDropped(error: dataStoreError) promise(.failure(dataStoreError)) - return } - let inProcessModel = MutationSync(model: anyModel, syncMetadata: remoteModel.syncMetadata) - promise(.success(.applied(inProcessModel))) } } } @@ -417,21 +416,15 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { result: ApplyRemoteModelResult, mutationType: MutationEvent.MutationType ) -> AnyPublisher { - if case let .applied(inProcessModel) = result { - return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType) - .handleEvents( receiveOutput: { syncMetadata in - let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata) - self.notify(savedModel: appliedModel, mutationType: mutationType) - }, receiveCompletion: { completion in - if case .failure(let error) = completion { - self.notifyDropped(error: error) - } - }) - .map { _ in () } + switch result { + case .applied(let remoteModel, let appliedModel): + return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: remoteModel, mutationType: mutationType) + .map { MutationSync(model: appliedModel.model, syncMetadata: $0) } + .map { [weak self] in self?.notify(appliedModel: $0, mutationType: mutationType) } .eraseToAnyPublisher() - + case .dropped: + return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher() } - return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher() } private func saveMetadata( @@ -440,9 +433,17 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { mutationType: MutationEvent.MutationType ) -> Future { Future { promise in - storageAdapter.save(remoteModel.syncMetadata, - condition: nil, - eagerLoad: self.isEagerLoad) { result in + storageAdapter.save( + remoteModel.syncMetadata, + condition: nil, + eagerLoad: self.isEagerLoad + ) { result in + switch result { + case .failure(let error): + self.notifyDropped(error: error) + case .success: + self.notifyHub(remoteModel: remoteModel, mutationType: mutationType) + } promise(result) } } @@ -454,28 +455,46 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { } } - private func notify(savedModel: AppliedModel, - mutationType: MutationEvent.MutationType) { - let version = savedModel.syncMetadata.version + /// Inform the mutationEvents subscribers about the updated model, + /// which incorporates lazy loading information if applicable. + private func notify(appliedModel: AppliedModel, mutationType: MutationEvent.MutationType) { + guard let json = try? appliedModel.model.instance.toJSON() else { + log.error("Could not notify mutation event") + return + } + + let modelIdentifier = appliedModel.model.instance.identifier(schema: modelSchema).stringValue + let mutationEvent = MutationEvent(modelId: modelIdentifier, + modelName: modelSchema.name, + json: json, + mutationType: mutationType, + version: appliedModel.syncMetadata.version) + mutationEventPublisher.send(.mutationEvent(mutationEvent)) + } + /// Inform the remote mutationEvents to Hub event subscribers, + /// which only contains information received from AppSync server. + private func notifyHub( + remoteModel: RemoteModel, + mutationType: MutationEvent.MutationType + ) { // TODO: Dispatch/notify error if we can't erase to any model? Would imply an error in JSON decoding, // which shouldn't be possible this late in the process. Possibly notify global conflict/error handler? - guard let json = try? savedModel.model.instance.toJSON() else { - log.error("Could not notify mutation event") + guard let json = try? remoteModel.model.instance.toJSON() else { + log.error("Could not notify Hub mutation event") return } - let modelIdentifier = savedModel.model.instance.identifier(schema: modelSchema).stringValue + + let modelIdentifier = remoteModel.model.instance.identifier(schema: modelSchema).stringValue let mutationEvent = MutationEvent(modelId: modelIdentifier, modelName: modelSchema.name, json: json, mutationType: mutationType, - version: version) + version: remoteModel.syncMetadata.version) let payload = HubPayload(eventName: HubPayload.EventName.DataStore.syncReceived, data: mutationEvent) Amplify.Hub.dispatch(to: .dataStore, payload: payload) - - mutationEventPublisher.send(.mutationEvent(mutationEvent)) } private func notifyFinished() { diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift index 610c58ba31..961adb38d7 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift @@ -705,16 +705,16 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { waitForExpectations(timeout: 1) } - func testApplyRemoteModels_deleteDisposition() { + func testApplyRemoteModels_deleteDisposition() async { let expect = expectation(description: "operation should send value and complete successfully") expect.expectedFulfillmentCount = 2 - let stoargeExpect = expectation(description: "storage delete should be called") + let storageExpect = expectation(description: "storage delete should be called") let storageMetadataExpect = expectation(description: "storage save metadata should be called") let notifyExpect = expectation(description: "mutation event should be emitted") let hubExpect = expectation(description: "Hub is notified") let deleteResponder = DeleteUntypedModelCompletionResponder { _, id in XCTAssertEqual(id, self.anyPostMutationSync.model.id) - stoargeExpect.fulfill() + storageExpect.fulfill() return .emptyResult } storageAdapter.responders[.deleteUntypedModel] = deleteResponder @@ -758,24 +758,33 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { }, receiveValue: { _ in expect.fulfill() }).store(in: &cancellables) - waitForExpectations(timeout: 1) - + + await fulfillment(of: [ + expect, + storageExpect, + storageMetadataExpect, + notifyExpect, + hubExpect + ], timeout: 1) } - func testApplyRemoteModels_multipleDispositions() { - let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync), - .create(anyPostMutationSync), - .update(anyPostMutationSync), - .update(anyPostMutationSync), - .delete(anyPostMutationSync), - .delete(anyPostMutationSync), - .create(anyPostMutationSync), - .update(anyPostMutationSync), - .delete(anyPostMutationSync)] + func testApplyRemoteModels_multipleDispositions() async { + let dispositions: [RemoteSyncReconciler.Disposition] = [ + .create(anyPostMutationSync), + .create(anyPostMutationSync), + .update(anyPostMutationSync), + .update(anyPostMutationSync), + .delete(anyPostMutationSync), + .delete(anyPostMutationSync), + .create(anyPostMutationSync), + .update(anyPostMutationSync), + .delete(anyPostMutationSync) + ] + let expect = expectation(description: "should complete successfully") expect.expectedFulfillmentCount = 2 - let stoargeExpect = expectation(description: "storage save/delete should be called") - stoargeExpect.expectedFulfillmentCount = dispositions.count + let storageExpect = expectation(description: "storage save/delete should be called") + storageExpect.expectedFulfillmentCount = dispositions.count let storageMetadataExpect = expectation(description: "storage save metadata should be called") storageMetadataExpect.expectedFulfillmentCount = dispositions.count let notifyExpect = expectation(description: "mutation event should be emitted") @@ -784,14 +793,14 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { hubExpect.expectedFulfillmentCount = dispositions.count let saveResponder = SaveUntypedModelResponder { _, completion in - stoargeExpect.fulfill() + storageExpect.fulfill() completion(.success(self.anyPostMutationSync.model)) } storageAdapter.responders[.saveUntypedModel] = saveResponder let deleteResponder = DeleteUntypedModelCompletionResponder { _, id in XCTAssertEqual(id, self.anyPostMutationSync.model.id) - stoargeExpect.fulfill() + storageExpect.fulfill() return .emptyResult } storageAdapter.responders[.deleteUntypedModel] = deleteResponder @@ -835,10 +844,16 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { }, receiveValue: { _ in expect.fulfill() }).store(in: &cancellables) - waitForExpectations(timeout: 1) + await fulfillment(of: [ + expect, + storageExpect, + storageMetadataExpect, + notifyExpect, + hubExpect + ], timeout: 1) } - func testApplyRemoteModels_skipFailedOperations() throws { + func testApplyRemoteModels_skipFailedOperations() async throws { let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync), .create(anyPostMutationSync), .update(anyPostMutationSync), @@ -890,7 +905,12 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { }, receiveValue: { _ in }).store(in: &cancellables) - waitForExpectations(timeout: 1) + + await fulfillment(of: [ + expect, + expectedDropped, + expectedDeleteSuccess + ], timeout: 1) } func testApplyRemoteModels_failWithConstraintViolationShouldBeSuccessful() {