Skip to content

Commit

Permalink
fix(datastore): update pending mutation events version from mutation …
Browse files Browse the repository at this point in the history
…response
  • Loading branch information
5d committed Jan 3, 2024
1 parent a0ee257 commit c60f716
Show file tree
Hide file tree
Showing 4 changed files with 538 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,22 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
guard let reconciliationQueue = reconciliationQueue else {
let dataStoreError = DataStoreError.configuration(
"reconciliationQueue is unexpectedly nil",
"""
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
\(AmplifyErrorMessages.reportBugToAWS())
"""
"""
The reference to reconciliationQueue has been released while an ongoing mutation was being processed.
\(AmplifyErrorMessages.reportBugToAWS())
"""
)
stateMachine.notify(action: .errored(dataStoreError))
return
}
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
MutationEvent.reconcilePendingMutationEventsVersion(
sent: mutationEvent,
received: mutationSync,
storageAdapter: storageAdapter
) { _ in
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
}
} else {
completeProcessingEvent(mutationEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
}

private func getLatestVersion(_ mutationEvent: MutationEvent) -> Int? {
let latestSyncedMetadataVersion = getLatestSyncMetadata()?.version
let mutationEventVersion = mutationEvent.version
switch (latestSyncedMetadataVersion, mutationEventVersion) {
case let (.some(syncedVersion), .some(version)):
return max(syncedVersion, version)
case let (.some(syncedVersion), .none):
return syncedVersion
case let (.none, .some(version)):
return version
case (.none, .none):
return nil
}
}

/// Creates a GraphQLRequest based on given `mutationType`
/// - Parameters:
/// - mutationType: mutation type
Expand All @@ -112,7 +127,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
mutationType: GraphQLMutationType,
authType: AWSAuthorizationType? = nil
) -> GraphQLRequest<MutationSync<AnyModel>>? {
let latestSyncMetadata = getLatestSyncMetadata()
let version = getLatestVersion(mutationEvent)
var request: GraphQLRequest<MutationSync<AnyModel>>

do {
Expand All @@ -133,7 +148,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: latestSyncMetadata?.version)
version: version)
case .update:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -145,7 +160,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: latestSyncMetadata?.version)
version: version)
case .create:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -156,7 +171,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
modelSchema: modelSchema,
version: latestSyncMetadata?.version)
version: version)
}
} catch {
let apiError = APIError.unknown("Couldn't decode model", "", error)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import Dispatch
import AWSPluginsCore

extension MutationEvent {
// Consecutive operations that modify a model results in a sequence of pending mutation events that
// have the current version of the model. The first mutation event has the correct version of the model,
// while the subsequent events will have lower versions if the first mutation event is successfully synced
// to the cloud. By reconciling the pending mutation events after syncing the first mutation event,
// we attempt to update the pending version to the latest version from the response.
// The before and after conditions for consecutive update scenarios are as below:
// - Save, then immediately update
// Queue Before - [(version: nil, inprocess: true, type: .create),
// (version: nil, inprocess: false, type: .update)]
// Response - [version: 1, type: .create]
// Queue After - [(version: 1, inprocess: false, type: .update)]
// - Save, then immediately delete
// Queue Before - [(version: nil, inprocess: true, type: .create),
// (version: nil, inprocess: false, type: .delete)]
// Response - [version: 1, type: .create]
// Queue After - [(version: 1, inprocess: false, type: .delete)]
// - Save, sync, then immediately update and delete
// Queue Before (After save, sync)
// - [(version: 1, inprocess: true, type: .update), (version: 1, inprocess: false, type: .delete)]
// Response - [version: 2, type: .update]
// Queue After - [(version: 2, inprocess: false, type: .delete)]
//
// For a given model `id`, checks the version of the head of pending mutation event queue
// against the API response version in `mutationSync` and saves it in the mutation event table if
// the response version is a newer one
static func reconcilePendingMutationEventsVersion(sent mutationEvent: MutationEvent,
received mutationSync: MutationSync<AnyModel>,
storageAdapter: StorageEngineAdapter,
completion: @escaping DataStoreCallback<Void>) {
MutationEvent.pendingMutationEvents(
forMutationEvent: mutationEvent,
storageAdapter: storageAdapter
) { queryResult in
switch queryResult {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let localMutationEvents):
guard let existingEvent = localMutationEvents.first else {
completion(.success(()))
return
}

guard let reconciledEvent = reconcile(pendingMutationEvent: existingEvent,
with: mutationEvent,
responseMutationSync: mutationSync) else {
completion(.success(()))
return
}

storageAdapter.save(reconciledEvent, condition: nil, eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success:
completion(.success(()))
}
}
}
}
}

static func reconcile(pendingMutationEvent: MutationEvent,
with requestMutationEvent: MutationEvent,
responseMutationSync: MutationSync<AnyModel>) -> MutationEvent? {
// return if version of the pending mutation event is not nil and
// is >= version contained in the response
if pendingMutationEvent.version != nil &&
pendingMutationEvent.version! >= responseMutationSync.syncMetadata.version {
return nil
}

do {
let responseModel = responseMutationSync.model.instance
let requestModel = try requestMutationEvent.decodeModel()

// check if the data sent in the request is the same as the response
// if it is, update the pending mutation event version to the response version
guard let modelSchema = ModelRegistry.modelSchema(from: requestMutationEvent.modelName),
modelSchema.compare(responseModel, requestModel) else {
return nil
}

var pendingMutationEvent = pendingMutationEvent
pendingMutationEvent.version = responseMutationSync.syncMetadata.version
return pendingMutationEvent
} catch {
Amplify.log.verbose("Error decoding models: \(error)")
return nil
}
}

}
Loading

0 comments on commit c60f716

Please sign in to comment.