Skip to content

Commit

Permalink
fix(datastore-v1): sync pending mutation events with latest synced me…
Browse files Browse the repository at this point in the history
…tadata
  • Loading branch information
5d committed Dec 12, 2023
1 parent d06905f commit f5259a1
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 590 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ extension Model where Self: Codable {
resolvedEncoder = encoder
} else {
resolvedEncoder = JSONEncoder(dateEncodingStrategy: ModelDateFormatting.encodingStrategy)
resolvedEncoder.outputFormatting = .sortedKeys
}

if isKnownUniquelyReferenced(&resolvedEncoder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
func resolveConflictsThenSave(mutationEvent: MutationEvent,
storageAdapter: StorageEngineAdapter,
completionPromise: @escaping Future<MutationEvent, DataStoreError>.Promise) {

// We don't want to query MutationSync<AnyModel> because a) we already have the model, and b) delete mutations
// are submitted *after* the delete has already been applied to the local data store, meaning there is no model
// to query.
var mutationEvent = mutationEvent
do {
// swiftlint:disable:next todo
// TODO: Refactor this so that it's clear that the storage engine is not responsible for setting the version
// perhaps as simple as renaming to `submit(unversionedMutationEvent:)` or similar
let syncMetadata = try storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId,
modelName: mutationEvent.modelName)
mutationEvent.version = syncMetadata?.version
} catch {
completionPromise(.failure(DataStoreError(error: error)))
}

MutationEvent.pendingMutationEvents(
forMutationEvent: mutationEvent,
storageAdapter: storageAdapter) { result in
Expand Down Expand Up @@ -201,8 +185,6 @@ extension AWSMutationDatabaseAdapter: MutationEventIngester {
}
resolvedEvent.mutationType = updatedMutationType

resolvedEvent.version = candidate.version

return resolvedEvent
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {

let syncMutationToCloudOperation = SyncMutationToCloudOperation(
mutationEvent: mutationEvent,
getLatestSyncMetadata: {
try? self.storageAdapter.queryMutationSyncMetadata(for: mutationEvent.modelId, modelName: mutationEvent.modelName)
},
api: api,
authModeStrategy: authModeStrategy
) { [weak self] result in
Expand Down Expand Up @@ -259,12 +262,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
return
}
reconciliationQueue.offer([mutationSync], modelName: mutationEvent.modelName)
MutationEvent.reconcilePendingMutationEventsVersion(
sent: mutationEvent,
received: mutationSync,
storageAdapter: storageAdapter) { _ in
self.completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
}
completeProcessingEvent(mutationEvent, mutationSync: mutationSync)
} else {
completeProcessingEvent(mutationEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
typealias MutationSyncCloudResult = GraphQLOperation<MutationSync<AnyModel>>.OperationResult

private weak var api: APICategoryGraphQLBehavior?
private let getLatestSyncMetadata: () -> MutationSyncMetadata?
private let mutationEvent: MutationEvent
private let completion: GraphQLOperation<MutationSync<AnyModel>>.ResultListener
private let requestRetryablePolicy: RequestRetryablePolicy
Expand All @@ -32,13 +33,15 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
private var authTypesIterator: AWSAuthorizationTypeIterator?

init(mutationEvent: MutationEvent,
getLatestSyncMetadata: @escaping () -> MutationSyncMetadata?,
api: APICategoryGraphQLBehavior,
authModeStrategy: AuthModeStrategy,
networkReachabilityPublisher: AnyPublisher<ReachabilityUpdate, Never>? = nil,
currentAttemptNumber: Int = 1,
requestRetryablePolicy: RequestRetryablePolicy? = RequestRetryablePolicy(),
completion: @escaping GraphQLOperation<MutationSync<AnyModel>>.ResultListener) {
self.mutationEvent = mutationEvent
self.getLatestSyncMetadata = getLatestSyncMetadata
self.api = api
self.networkReachabilityPublisher = networkReachabilityPublisher
self.completion = completion
Expand Down Expand Up @@ -109,7 +112,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
authType: AWSAuthorizationType? = nil
) -> GraphQLRequest<MutationSync<AnyModel>>? {
var request: GraphQLRequest<MutationSync<AnyModel>>

let latestSyncMetadata = getLatestSyncMetadata()
do {
var graphQLFilter: GraphQLFilter?
if let graphQLFilterJSON = mutationEvent.graphQLFilterJSON {
Expand All @@ -128,7 +131,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.deleteMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: mutationEvent.version)
version: latestSyncMetadata?.version)
case .update:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -140,7 +143,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
request = GraphQLRequest<MutationSyncResult>.updateMutation(of: model,
modelSchema: modelSchema,
where: graphQLFilter,
version: mutationEvent.version)
version: latestSyncMetadata?.version)
case .create:
let model = try mutationEvent.decodeModel()
guard let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName) else {
Expand All @@ -151,7 +154,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation {
}
request = GraphQLRequest<MutationSyncResult>.createMutation(of: model,
modelSchema: modelSchema,
version: mutationEvent.version)
version: latestSyncMetadata?.version)
}
} catch {
let apiError = APIError.unknown("Couldn't decode model", "", error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {

// MARK: - Responder methods

/// The reconcile function incorporates incoming mutation events into the local database through the following steps:
/// 1. Retrieve the local metadata of the models.
/// 2. Generate dispositions based on incoming mutation events and local metadata.
/// 3. Categorize dispositions into:
/// 3.1 Apply metadata only for those with existing pending mutations.
/// 3.1.1 Notify the count of these incoming mutation events as dropped items.
/// 3.2 Apply incoming mutation and metadata for those without existing pending mutations.
/// 4. Notify the final result.
func reconcile(remoteModels: [RemoteModel]) {
guard !isCancelled else {
log.info("\(#function) - cancelled, aborting")
Expand All @@ -126,16 +134,24 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {

do {
try storageAdapter.transaction {
queryPendingMutations(forModels: remoteModels.map(\.model))
queryLocalMetadata(remoteModels)
.subscribe(on: workQueue)
.flatMap { mutationEvents -> Future<([RemoteModel], [LocalMetadata]), DataStoreError> in
let remoteModelsToApply = self.reconcile(remoteModels, pendingMutations: mutationEvents)
return self.queryLocalMetadata(remoteModelsToApply)
.map { remoteModels, localMetadatas in
self.getDispositions(for: remoteModels, localMetadatas: localMetadatas)
}
.flatMap { (remoteModelsToApply, localMetadatas) -> Future<Void, DataStoreError> in
let dispositions = self.getDispositions(for: remoteModelsToApply,
localMetadatas: localMetadatas)
return self.applyRemoteModelsDispositions(dispositions)
.flatMap { dispositions in
self.queryPendingMutations(forModels: dispositions.map(\.remoteModel.model)).map { pendingMutations in
(pendingMutations, dispositions)
}
}
.map { pendingMutations, dispositions in
self.separateDispositions(pendingMutations: pendingMutations, dispositions: dispositions)
}
.flatMap { dispositions, dispositionsOnlyApplyMetadata in
self.waitAllPublisherFinishes(publishers: dispositionsOnlyApplyMetadata.map(self.saveMetadata(disposition:)))
.flatMap { _ in
self.applyRemoteModelsDispositions(dispositions)
}
}
.sink(
receiveCompletion: {
Expand Down Expand Up @@ -195,6 +211,28 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

func separateDispositions(
pendingMutations: [MutationEvent],
dispositions: [RemoteSyncReconciler.Disposition]
) -> ([RemoteSyncReconciler.Disposition], [RemoteSyncReconciler.Disposition]) {
if dispositions.isEmpty {
return ([], [])
}

let pendingMutationModelIds = Set(pendingMutations.map(\.modelId))

let dispositionsToApply = dispositions.filter {
!pendingMutationModelIds.contains($0.remoteModel.model.identifier)
}

let dispositionsOnlyApplyMetadata = dispositions.filter {
pendingMutationModelIds.contains($0.remoteModel.model.identifier)
}

notifyDropped(count: dispositionsOnlyApplyMetadata.count)
return (dispositionsToApply, dispositionsOnlyApplyMetadata)
}

func reconcile(_ remoteModels: [RemoteModel], pendingMutations: [MutationEvent]) -> [RemoteModel] {
guard !remoteModels.isEmpty else {
return []
Expand Down Expand Up @@ -284,8 +322,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}

let publishers = dispositions.map { disposition ->
Publishers.FlatMap<Future<Void, DataStoreError>,
Future<ReconcileAndLocalSaveOperation.ApplyRemoteModelResult, DataStoreError>> in
AnyPublisher<Void, DataStoreError> in

switch disposition {
case .create(let remoteModel):
Expand All @@ -296,7 +333,8 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyResult: applyResult,
mutationType: .create)
}
return publisher

return publisher.eraseToAnyPublisher()
case .update(let remoteModel):
let publisher = self.save(storageAdapter: storageAdapter,
remoteModel: remoteModel)
Expand All @@ -305,7 +343,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyResult: applyResult,
mutationType: .update)
}
return publisher
return publisher.eraseToAnyPublisher()
case .delete(let remoteModel):
let publisher = self.delete(storageAdapter: storageAdapter,
remoteModel: remoteModel)
Expand All @@ -314,7 +352,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
applyResult: applyResult,
mutationType: .delete)
}
return publisher
return publisher.eraseToAnyPublisher()
}
}

Expand Down Expand Up @@ -367,8 +405,10 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func save(storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel) -> Future<ApplyRemoteModelResult, DataStoreError> {
private func save(
storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel
) -> Future<ApplyRemoteModelResult, DataStoreError> {
Future<ApplyRemoteModelResult, DataStoreError> { promise in
storageAdapter.save(untypedModel: remoteModel.model.instance) { response in
switch response {
Expand Down Expand Up @@ -396,29 +436,69 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func saveMetadata(storageAdapter: StorageEngineAdapter,
applyResult: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType) -> Future<Void, DataStoreError> {
Future<Void, DataStoreError> { promise in
guard case let .applied(inProcessModel) = applyResult else {
promise(.successfulVoid)
return
}
private func saveMetadata(
disposition: RemoteSyncReconciler.Disposition
) -> AnyPublisher<Void, Never> {
guard let storageAdapter = self.storageAdapter else {
return Just(()).eraseToAnyPublisher()
}

storageAdapter.save(inProcessModel.syncMetadata, condition: nil) { result in
switch result {
case .failure(let dataStoreError):
self.notifyDropped(error: dataStoreError)
promise(.failure(dataStoreError))
case .success(let syncMetadata):
return saveMetadata(
storageAdapter: storageAdapter,
remoteModel: disposition.remoteModel,
mutationType: disposition.mutationType
)
.map { _ in () }
.catch { _ in Just(()) }
.eraseToAnyPublisher()
}

private func saveMetadata(
storageAdapter: StorageEngineAdapter,
remoteModel: RemoteModel,
mutationType: MutationEvent.MutationType
) -> Future<MutationSyncMetadata, DataStoreError> {
Future { promise in
storageAdapter.save(
remoteModel.syncMetadata,
condition: nil) { result in
promise(result)
}
}
}

private func saveMetadata(
storageAdapter: StorageEngineAdapter,
applyResult: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType
) -> AnyPublisher<Void, DataStoreError> {
if case .applied(let inProcessModel) = applyResult {
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType)
.handleEvents(receiveOutput: { syncMetadata in
let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata)
self.notify(savedModel: appliedModel, mutationType: mutationType)
promise(.successfulVoid)
}
}
}, receiveCompletion: { completion in
if case .failure(let error) = completion {
self.notifyDropped(error: error)
}
})
.map { _ in () }
.eraseToAnyPublisher()
}
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}

private func waitAllPublisherFinishes<T>(publishers: [AnyPublisher<T, Never>]) -> Future<Void, DataStoreError> {
Future { promise in
Publishers.MergeMany(publishers)
.collect()
.sink(receiveCompletion: { _ in
promise(.successfulVoid)
}, receiveValue: { _ in })
.store(in: &self.cancellables)
}
}

private func notifyDropped(count: Int = 1, error: DataStoreError? = nil) {
for _ in 0 ..< count {
mutationEventPublisher.send(.mutationEventDropped(modelName: modelSchema.name, error: error))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ struct RemoteSyncReconciler {
case create(RemoteModel)
case update(RemoteModel)
case delete(RemoteModel)

var remoteModel: RemoteModel {
switch self {
case .create(let model), .update(let model), .delete(let model):
return model
}
}

var mutationType: MutationEvent.MutationType {
switch self {
case .create: return .create
case .update: return .update
case .delete: return .delete
}
}
}

/// Filter the incoming `remoteModels` against the pending mutations.
Expand Down
Loading

0 comments on commit f5259a1

Please sign in to comment.