Skip to content

Commit

Permalink
fix(datastore): propagate remote mutationEvents to Hub for sync received
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed May 14, 2024
1 parent 9df1524 commit 86bf872
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}

enum ApplyRemoteModelResult {
case applied(RemoteModel)
case applied(RemoteModel, AppliedModel)
case dropped
}

Expand All @@ -363,7 +363,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
promise(.failure(dataStoreError))
}
case .success:
promise(.success(.applied(remoteModel)))
promise(.success(.applied(remoteModel, remoteModel)))
}
}
}
Expand All @@ -387,14 +387,13 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
let anyModel: AnyModel
do {
anyModel = try savedModel.eraseToAnyModel()
let appliedModel = MutationSync(model: anyModel, syncMetadata: remoteModel.syncMetadata)
promise(.success(.applied(remoteModel, appliedModel)))
} catch {
let dataStoreError = DataStoreError(error: error)
self.notifyDropped(error: dataStoreError)
promise(.failure(dataStoreError))
return
}
let inProcessModel = MutationSync(model: anyModel, syncMetadata: remoteModel.syncMetadata)
promise(.success(.applied(inProcessModel)))
}
}
}
Expand All @@ -417,21 +416,15 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
result: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType
) -> AnyPublisher<Void, DataStoreError> {
if case let .applied(inProcessModel) = result {
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType)
.handleEvents( receiveOutput: { syncMetadata in
let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata)
self.notify(savedModel: appliedModel, mutationType: mutationType)
}, receiveCompletion: { completion in
if case .failure(let error) = completion {
self.notifyDropped(error: error)
}
})
.map { _ in () }
switch result {
case .applied(let remoteModel, let appliedModel):
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: remoteModel, mutationType: mutationType)
.map { MutationSync(model: appliedModel.model, syncMetadata: $0) }
.map { [weak self] in self?.notify(appliedModel: $0, mutationType: mutationType) }
.eraseToAnyPublisher()

case .dropped:
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}

private func saveMetadata(
Expand All @@ -440,9 +433,17 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
mutationType: MutationEvent.MutationType
) -> Future<MutationSyncMetadata, DataStoreError> {
Future { promise in
storageAdapter.save(remoteModel.syncMetadata,
condition: nil,
eagerLoad: self.isEagerLoad) { result in
storageAdapter.save(
remoteModel.syncMetadata,
condition: nil,
eagerLoad: self.isEagerLoad
) { result in
switch result {
case .failure(let error):
self.notifyDropped(error: error)
case .success:
self.notifyHub(remoteModel: remoteModel, mutationType: mutationType)
}
promise(result)
}
}
Expand All @@ -454,28 +455,46 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func notify(savedModel: AppliedModel,
mutationType: MutationEvent.MutationType) {
let version = savedModel.syncMetadata.version
/// Inform the mutationEvents subscribers about the updated model,
/// which incorporates lazy loading information if applicable.
private func notify(appliedModel: AppliedModel, mutationType: MutationEvent.MutationType) {
guard let json = try? appliedModel.model.instance.toJSON() else {
log.error("Could not notify mutation event")
return

Check warning on line 463 in AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

View check run for this annotation

Codecov / codecov/patch

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift#L462-L463

Added lines #L462 - L463 were not covered by tests
}

let modelIdentifier = appliedModel.model.instance.identifier(schema: modelSchema).stringValue
let mutationEvent = MutationEvent(modelId: modelIdentifier,
modelName: modelSchema.name,
json: json,
mutationType: mutationType,
version: appliedModel.syncMetadata.version)
mutationEventPublisher.send(.mutationEvent(mutationEvent))
}

/// Inform the remote mutationEvents to Hub event subscribers,
/// which only contains information received from AppSync server.
private func notifyHub(
remoteModel: RemoteModel,
mutationType: MutationEvent.MutationType
) {
// TODO: Dispatch/notify error if we can't erase to any model? Would imply an error in JSON decoding,
// which shouldn't be possible this late in the process. Possibly notify global conflict/error handler?
guard let json = try? savedModel.model.instance.toJSON() else {
log.error("Could not notify mutation event")
guard let json = try? remoteModel.model.instance.toJSON() else {
log.error("Could not notify Hub mutation event")

Check warning on line 484 in AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift

View check run for this annotation

Codecov / codecov/patch

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ReconcileAndLocalSaveOperation.swift#L484

Added line #L484 was not covered by tests
return
}
let modelIdentifier = savedModel.model.instance.identifier(schema: modelSchema).stringValue

let modelIdentifier = remoteModel.model.instance.identifier(schema: modelSchema).stringValue
let mutationEvent = MutationEvent(modelId: modelIdentifier,
modelName: modelSchema.name,
json: json,
mutationType: mutationType,
version: version)
version: remoteModel.syncMetadata.version)

let payload = HubPayload(eventName: HubPayload.EventName.DataStore.syncReceived,
data: mutationEvent)
Amplify.Hub.dispatch(to: .dataStore, payload: payload)

mutationEventPublisher.send(.mutationEvent(mutationEvent))
}

private func notifyFinished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,16 +705,16 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
waitForExpectations(timeout: 1)
}

func testApplyRemoteModels_deleteDisposition() {
func testApplyRemoteModels_deleteDisposition() async {
let expect = expectation(description: "operation should send value and complete successfully")
expect.expectedFulfillmentCount = 2
let stoargeExpect = expectation(description: "storage delete should be called")
let storageExpect = expectation(description: "storage delete should be called")
let storageMetadataExpect = expectation(description: "storage save metadata should be called")
let notifyExpect = expectation(description: "mutation event should be emitted")
let hubExpect = expectation(description: "Hub is notified")
let deleteResponder = DeleteUntypedModelCompletionResponder { _, id in
XCTAssertEqual(id, self.anyPostMutationSync.model.id)
stoargeExpect.fulfill()
storageExpect.fulfill()
return .emptyResult
}
storageAdapter.responders[.deleteUntypedModel] = deleteResponder
Expand Down Expand Up @@ -758,24 +758,33 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
}, receiveValue: { _ in
expect.fulfill()
}).store(in: &cancellables)
waitForExpectations(timeout: 1)


await fulfillment(of: [
expect,
storageExpect,
storageMetadataExpect,
notifyExpect,
hubExpect
], timeout: 1)
}

func testApplyRemoteModels_multipleDispositions() {
let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync),
.delete(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync)]
func testApplyRemoteModels_multipleDispositions() async {
let dispositions: [RemoteSyncReconciler.Disposition] = [
.create(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync),
.delete(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync)
]

let expect = expectation(description: "should complete successfully")
expect.expectedFulfillmentCount = 2
let stoargeExpect = expectation(description: "storage save/delete should be called")
stoargeExpect.expectedFulfillmentCount = dispositions.count
let storageExpect = expectation(description: "storage save/delete should be called")
storageExpect.expectedFulfillmentCount = dispositions.count
let storageMetadataExpect = expectation(description: "storage save metadata should be called")
storageMetadataExpect.expectedFulfillmentCount = dispositions.count
let notifyExpect = expectation(description: "mutation event should be emitted")
Expand All @@ -784,14 +793,14 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
hubExpect.expectedFulfillmentCount = dispositions.count

let saveResponder = SaveUntypedModelResponder { _, completion in
stoargeExpect.fulfill()
storageExpect.fulfill()
completion(.success(self.anyPostMutationSync.model))
}
storageAdapter.responders[.saveUntypedModel] = saveResponder

let deleteResponder = DeleteUntypedModelCompletionResponder { _, id in
XCTAssertEqual(id, self.anyPostMutationSync.model.id)
stoargeExpect.fulfill()
storageExpect.fulfill()
return .emptyResult
}
storageAdapter.responders[.deleteUntypedModel] = deleteResponder
Expand Down Expand Up @@ -835,10 +844,16 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
}, receiveValue: { _ in
expect.fulfill()
}).store(in: &cancellables)
waitForExpectations(timeout: 1)
await fulfillment(of: [
expect,
storageExpect,
storageMetadataExpect,
notifyExpect,
hubExpect
], timeout: 1)
}

func testApplyRemoteModels_skipFailedOperations() throws {
func testApplyRemoteModels_skipFailedOperations() async throws {
let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
Expand Down Expand Up @@ -890,7 +905,12 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
}, receiveValue: { _ in

}).store(in: &cancellables)
waitForExpectations(timeout: 1)

await fulfillment(of: [
expect,
expectedDropped,
expectedDeleteSuccess
], timeout: 1)
}

func testApplyRemoteModels_failWithConstraintViolationShouldBeSuccessful() {
Expand Down

0 comments on commit 86bf872

Please sign in to comment.