diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift index d027edc0aa..9afaf2e8a1 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift @@ -264,79 +264,64 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation { return dispositions } + func applyRemoteModelsDisposition( + storageAdapter: StorageEngineAdapter, + disposition: RemoteSyncReconciler.Disposition + ) -> AnyPublisher, Never> { + let operation: Future + let mutationType: MutationEvent.MutationType + switch disposition { + case .create(let remoteModel): + operation = self.save(storageAdapter: storageAdapter, remoteModel: remoteModel) + mutationType = .create + case .update(let remoteModel): + operation = self.save(storageAdapter: storageAdapter, remoteModel: remoteModel) + mutationType = .update + case .delete(let remoteModel): + operation = self.delete(storageAdapter: storageAdapter, remoteModel: remoteModel) + mutationType = .delete + } + + return operation + .flatMap { applyResult in + self.saveMetadata(storageAdapter: storageAdapter, applyResult: applyResult, mutationType: mutationType) + } + .map {_ in Result.success(()) } + .catch { Just>(.failure($0))} + .eraseToAnyPublisher() + } + // TODO: refactor - move each the publisher constructions to its own utility method for readability of the // `switch` and a single method that you can invoke in the `map` func applyRemoteModelsDispositions( - _ dispositions: [RemoteSyncReconciler.Disposition]) -> Future { - Future { promise in - var result: Result = .failure(Self.unfulfilledDataStoreError()) - defer { - promise(result) - } - guard !self.isCancelled else { - self.log.info("\(#function) - cancelled, aborting") - result = .successfulVoid - return - } - guard let storageAdapter = self.storageAdapter else { - let error = DataStoreError.nilStorageAdapter() - self.notifyDropped(count: dispositions.count, error: error) - result = .failure(error) - return - } + _ dispositions: [RemoteSyncReconciler.Disposition] + ) -> Future { + guard !self.isCancelled else { + self.log.info("\(#function) - cancelled, aborting") + return Future { $0(.successfulVoid) } + } - guard !dispositions.isEmpty else { - result = .successfulVoid - return - } + guard let storageAdapter = self.storageAdapter else { + let error = DataStoreError.nilStorageAdapter() + self.notifyDropped(count: dispositions.count, error: error) + return Future { $0(.failure(error)) } + } - let publishers = dispositions.map { disposition -> - Publishers.FlatMap, - Future> in - - switch disposition { - case .create(let remoteModel): - let publisher = self.save(storageAdapter: storageAdapter, - remoteModel: remoteModel) - .flatMap { applyResult in - self.saveMetadata(storageAdapter: storageAdapter, - applyResult: applyResult, - mutationType: .create) - } - return publisher - case .update(let remoteModel): - let publisher = self.save(storageAdapter: storageAdapter, - remoteModel: remoteModel) - .flatMap { applyResult in - self.saveMetadata(storageAdapter: storageAdapter, - applyResult: applyResult, - mutationType: .update) - } - return publisher - case .delete(let remoteModel): - let publisher = self.delete(storageAdapter: storageAdapter, - remoteModel: remoteModel) - .flatMap { applyResult in - self.saveMetadata(storageAdapter: storageAdapter, - applyResult: applyResult, - mutationType: .delete) - } - return publisher - } - } + guard !dispositions.isEmpty else { + return Future { $0(.successfulVoid) } + } + + let publishers = dispositions.map { + applyRemoteModelsDisposition(storageAdapter: storageAdapter, disposition: $0) + } + return Future { promise in Publishers.MergeMany(publishers) .collect() - .sink( - receiveCompletion: { - if case .failure(let error) = $0 { - result = .failure(error) - } - }, - receiveValue: { _ in - result = .successfulVoid - } - ) + .sink { _ in + // This stream will never fail, as we wrapped error in the result type. + promise(.successfulVoid) + } receiveValue: { _ in } .store(in: &self.cancellables) } } diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift index ac7d3d0af8..2b7d4220e6 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/ReconcileAndLocalSaveOperationTests.swift @@ -832,11 +832,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { waitForExpectations(timeout: 1) } - func testApplyRemoteModels_saveFail() throws { - if skipBrokenTests { - throw XCTSkip("TODO: fix this test") - } - + func testApplyRemoteModels_skipFailedOperations() throws { let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync), .create(anyPostMutationSync), .update(anyPostMutationSync), @@ -846,7 +842,7 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { .create(anyPostMutationSync), .update(anyPostMutationSync), .delete(anyPostMutationSync)] - let expect = expectation(description: "should fail") + let expect = expectation(description: "should complete") let expectedDeleteSuccess = expectation(description: "delete should be successful") expectedDeleteSuccess.expectedFulfillmentCount = 3 // 3 delete depositions let expectedDropped = expectation(description: "mutationEventDropped received") @@ -881,12 +877,12 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { .sink(receiveCompletion: { completion in switch completion { case .failure: - expect.fulfill() + XCTFail("Unexpected failure completion") case .finished: - XCTFail("Unexpected successfully completion") + expect.fulfill() } }, receiveValue: { _ in - XCTFail("Unexpected value received") + }).store(in: &cancellables) waitForExpectations(timeout: 1) } @@ -949,20 +945,18 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { } func testApplyRemoteModels_deleteFail() throws { - if skipBrokenTests { - throw XCTSkip("TODO: fix this test") - } - - let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync), - .create(anyPostMutationSync), - .update(anyPostMutationSync), - .update(anyPostMutationSync), - .delete(anyPostMutationSync), - .delete(anyPostMutationSync), - .create(anyPostMutationSync), - .update(anyPostMutationSync), - .delete(anyPostMutationSync)] - let expect = expectation(description: "should fail") + let dispositions: [RemoteSyncReconciler.Disposition] = [ + .create(anyPostMutationSync), + .create(anyPostMutationSync), + .update(anyPostMutationSync), + .update(anyPostMutationSync), + .delete(anyPostMutationSync), + .delete(anyPostMutationSync), + .create(anyPostMutationSync), + .update(anyPostMutationSync), + .delete(anyPostMutationSync) + ] + let expect = expectation(description: "should success") let expectedCreateAndUpdateSuccess = expectation(description: "create and updates should be successful") expectedCreateAndUpdateSuccess.expectedFulfillmentCount = 6 // 3 creates and 3 updates let expectedDropped = expectation(description: "mutationEventDropped received") @@ -997,31 +991,29 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { .sink(receiveCompletion: { completion in switch completion { case .failure: - expect.fulfill() + XCTFail("Unexpected failure completion") case .finished: - XCTFail("Unexpected successfully completion") + expect.fulfill() } }, receiveValue: { _ in - XCTFail("Unexpected value received") + }).store(in: &cancellables) waitForExpectations(timeout: 1) } func testApplyRemoteModels_saveMetadataFail() throws { - if skipBrokenTests { - throw XCTSkip("TODO: fix this test") - } - - let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync), - .create(anyPostMutationSync), - .update(anyPostMutationSync), - .update(anyPostMutationSync), - .delete(anyPostMutationSync), - .delete(anyPostMutationSync), - .create(anyPostMutationSync), - .update(anyPostMutationSync), - .delete(anyPostMutationSync)] - let expect = expectation(description: "should fail") + let dispositions: [RemoteSyncReconciler.Disposition] = [ + .create(anyPostMutationSync), + .create(anyPostMutationSync), + .update(anyPostMutationSync), + .update(anyPostMutationSync), + .delete(anyPostMutationSync), + .delete(anyPostMutationSync), + .create(anyPostMutationSync), + .update(anyPostMutationSync), + .delete(anyPostMutationSync) + ] + let expect = expectation(description: "should success") let expectedDropped = expectation(description: "mutationEventDropped received") expectedDropped.expectedFulfillmentCount = 9 // 1 for each of the 9 dispositions let saveResponder = SaveUntypedModelResponder { _, completion in @@ -1053,12 +1045,12 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase { .sink(receiveCompletion: { completion in switch completion { case .failure: - expect.fulfill() + XCTFail("Unexpected failure completion") case .finished: - XCTFail("Unexpected successfully completion") + expect.fulfill() } }, receiveValue: { _ in - XCTFail("Unexpected value received") + }).store(in: &cancellables) waitForExpectations(timeout: 1) }