From c085ca004203d508417592d33a9f24e831fef8de Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 12:03:26 -0400 Subject: [PATCH 01/13] If no models are connected, complete sync hydrations in parallel --- .../datastore/syncengine/SyncProcessor.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java index 2acf4acd7..5119f37f9 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java @@ -126,16 +126,27 @@ Completable hydrate() { TopologicalOrdering.forRegisteredModels(schemaRegistry, modelProvider); Collections.sort(modelSchemas, ordering::compare); ArrayList toBeSyncedModelArray = new ArrayList<>(); + boolean syncInParallel = true; for (ModelSchema schema : modelSchemas) { //Check to see if query predicate for this schema is not equal to none. This means customer does // not want to sync the data for this model. if (!QueryPredicates.none().equals(queryPredicateProvider.getPredicate(schema.getName()))) { hydrationTasks.add(createHydrationTask(schema)); toBeSyncedModelArray.add(schema.getName()); + if (schema.getAssociations().size() > 0) { + syncInParallel = false; + } } } - return Completable.concat(hydrationTasks) + Completable syncCompletable; + if (syncInParallel) { + syncCompletable = Completable.mergeDelayError(hydrationTasks); + } else { + syncCompletable = Completable.concat(hydrationTasks); + } + + return syncCompletable .doOnSubscribe(ignore -> { // This is where we trigger the syncQueriesStarted event since // doOnSubscribe means that all upstream hydration tasks From 7ef8c9909318a319076443efe5ba46b534fc7714 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 12:31:06 -0400 Subject: [PATCH 02/13] User configurable sync parallelization limit --- .../datastore/DataStoreConfiguration.java | 48 ++++++++++++++++--- .../datastore/syncengine/SyncProcessor.java | 16 ++++++- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index f24444ad0..286bf477a 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -48,6 +48,8 @@ public final class DataStoreConfiguration { static final int DEFAULT_SYNC_PAGE_SIZE = 1_000; @VisibleForTesting static final boolean DEFAULT_DO_SYNC_RETRY = false; + @VisibleForTesting + static final int DEFAULT_SYNC_CONCURRENCY_LIMIT = 1; static final int MAX_RECORDS = 1000; static final long MAX_TIME_SEC = 2; @@ -58,6 +60,7 @@ public final class DataStoreConfiguration { private final Integer syncMaxRecords; private final Integer syncPageSize; private final boolean doSyncRetry; + private final Integer syncConcurrencyLimit; private final Map syncExpressions; private final Long syncIntervalInMinutes; private final Long maxTimeLapseForObserveQuery; @@ -71,6 +74,7 @@ private DataStoreConfiguration(Builder builder) { this.syncIntervalInMinutes = builder.syncIntervalInMinutes; this.syncExpressions = builder.syncExpressions; this.doSyncRetry = builder.doSyncRetry; + this.syncConcurrencyLimit = builder.syncConcurrencyLimit; this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery; this.observeQueryMaxRecords = builder.observeQueryMaxRecords; } @@ -126,9 +130,10 @@ public static DataStoreConfiguration defaults() throws DataStoreException { .syncInterval(DEFAULT_SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES) .syncPageSize(DEFAULT_SYNC_PAGE_SIZE) .syncMaxRecords(DEFAULT_SYNC_MAX_RECORDS) - .doSyncRetry(DEFAULT_DO_SYNC_RETRY) - .observeQueryMaxTime(MAX_TIME_SEC) - .observeQueryMaxRecords(MAX_RECORDS) + .doSyncRetry(DEFAULT_DO_SYNC_RETRY) + .observeQueryMaxTime(MAX_TIME_SEC) + .observeQueryMaxRecords(MAX_RECORDS) + .syncConcurrencyLimit(DEFAULT_SYNC_CONCURRENCY_LIMIT) .build(); } @@ -201,6 +206,16 @@ public Boolean getDoSyncRetry() { return this.doSyncRetry; } + /** + * Gets the number of models that are allowed to concurrently sync. + * NOTE: This value will not be used if any models have associations, instead, the default (1) + * will be used. + * @return Limit to the number of models that can sync concurrently + */ + public Integer getSyncConcurrencyLimit() { + return syncConcurrencyLimit; + } + /** * Returns the Map of all {@link DataStoreSyncExpression}s used to filter data received from AppSync, either during * a sync or over the real-time subscription. @@ -247,6 +262,9 @@ public boolean equals(@Nullable Object thatObject) { if (!ObjectsCompat.equals(getObserveQueryMaxRecords(), that.getObserveQueryMaxRecords())) { return false; } + if (!ObjectsCompat.equals(getSyncConcurrencyLimit(), that.getSyncConcurrencyLimit())) { + return false; + } return true; } @@ -261,6 +279,7 @@ public int hashCode() { result = 31 * result + getDoSyncRetry().hashCode(); result = 31 * result + (getObserveQueryMaxRecords() != null ? getObserveQueryMaxRecords().hashCode() : 0); result = 31 * result + getMaxTimeLapseForObserveQuery().hashCode(); + result = 31 * result + getSyncConcurrencyLimit().hashCode(); return result; } @@ -273,9 +292,10 @@ public String toString() { ", syncPageSize=" + syncPageSize + ", syncIntervalInMinutes=" + syncIntervalInMinutes + ", syncExpressions=" + syncExpressions + - ", doSyncRetry=" + doSyncRetry + - ", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery + - ", observeQueryMaxRecords=" + observeQueryMaxRecords + + ", doSyncRetry=" + doSyncRetry + + ", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery + + ", observeQueryMaxRecords=" + observeQueryMaxRecords + + ", syncConcurrencyLimit=" + syncConcurrencyLimit + '}'; } @@ -309,6 +329,7 @@ public static final class Builder { private Integer syncMaxRecords; private Integer syncPageSize; private boolean doSyncRetry; + private Integer syncConcurrencyLimit; private Map syncExpressions; private boolean ensureDefaults; private JSONObject pluginJson; @@ -429,6 +450,19 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) { return Builder.this; } + /** + * Sets the concurrency limit for model syncing. Default is 1 + * NOTE: If any sync models have associations, this value will be unused and the default (1) + * will be used. + * @param syncConcurrencyLimit Number of models that can sync concurrently + * @return Current builder + */ + @NonNull + public Builder syncConcurrencyLimit(@IntRange(from = 0) Integer syncConcurrencyLimit) { + this.syncConcurrencyLimit = syncConcurrencyLimit; + return Builder.this; + } + /** * Sets a sync expression for a particular model to filter which data is synced locally. * The expression is evaluated each time DataStore is started. @@ -518,6 +552,7 @@ private void applyUserProvidedConfiguration() { syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize); syncExpressions = userProvidedConfiguration.getSyncExpressions(); doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry); + syncConcurrencyLimit = getValueOrDefault(userProvidedConfiguration.getSyncConcurrencyLimit(), syncConcurrencyLimit); observeQueryMaxRecords = getValueOrDefault(userProvidedConfiguration.getObserveQueryMaxRecords(), observeQueryMaxRecords); maxTimeLapseForObserveQuery = userProvidedConfiguration.getMaxTimeLapseForObserveQuery() @@ -548,6 +583,7 @@ public DataStoreConfiguration build() throws DataStoreException { syncIntervalInMinutes = getValueOrDefault(syncIntervalInMinutes, DEFAULT_SYNC_INTERVAL_MINUTES); syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS); syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE); + syncConcurrencyLimit = getValueOrDefault(syncConcurrencyLimit, DEFAULT_SYNC_CONCURRENCY_LIMIT); observeQueryMaxRecords = getValueOrDefault(observeQueryMaxRecords, MAX_RECORDS); maxTimeLapseForObserveQuery = maxTimeLapseForObserveQuery == 0 ? MAX_TIME_SEC : maxTimeLapseForObserveQuery; diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java index 5119f37f9..41168dfb6 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java @@ -139,9 +139,21 @@ Completable hydrate() { } } + int syncConcurrencyLimit; + try { + syncConcurrencyLimit = dataStoreConfigurationProvider + .getConfiguration() + .getSyncConcurrencyLimit(); + } catch (DataStoreException exception) { + syncConcurrencyLimit = 1; + } + Completable syncCompletable; - if (syncInParallel) { - syncCompletable = Completable.mergeDelayError(hydrationTasks); + if (syncInParallel && syncConcurrencyLimit > 1) { + syncCompletable = Completable.mergeDelayError( + Flowable.fromIterable(hydrationTasks), + syncConcurrencyLimit + ); } else { syncCompletable = Completable.concat(hydrationTasks); } From 9d7aacba664ea2e5f8a96455f76d3b165d919169 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 12:32:36 -0400 Subject: [PATCH 03/13] api dump --- aws-datastore/api/aws-datastore.api | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws-datastore/api/aws-datastore.api b/aws-datastore/api/aws-datastore.api index 2232919d6..2d38ba22f 100644 --- a/aws-datastore/api/aws-datastore.api +++ b/aws-datastore/api/aws-datastore.api @@ -61,6 +61,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration { public fun getErrorHandler ()Lcom/amplifyframework/datastore/DataStoreErrorHandler; public fun getMaxTimeLapseForObserveQuery ()Ljava/lang/Long; public fun getObserveQueryMaxRecords ()Ljava/lang/Integer; + public fun getSyncConcurrencyLimit ()Ljava/lang/Integer; public fun getSyncExpressions ()Ljava/util/Map; public fun getSyncIntervalInMinutes ()Ljava/lang/Long; public fun getSyncIntervalMs ()Ljava/lang/Long; @@ -77,6 +78,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration$Builder public fun errorHandler (Lcom/amplifyframework/datastore/DataStoreErrorHandler;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun observeQueryMaxRecords (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun observeQueryMaxTime (J)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; + public fun syncConcurrencyLimit (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncExpression (Ljava/lang/Class;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncExpression (Ljava/lang/String;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncInterval (JLjava/util/concurrent/TimeUnit;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; From 16199c28648ef0148999bf94f9621778887be513 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 12:43:00 -0400 Subject: [PATCH 04/13] safety explanation --- .../datastore/DataStoreConfiguration.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index 286bf477a..adf9dd44c 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -210,6 +210,11 @@ public Boolean getDoSyncRetry() { * Gets the number of models that are allowed to concurrently sync. * NOTE: This value will not be used if any models have associations, instead, the default (1) * will be used. + * Setting this number to a high value requires that the developer ensure app memory is not a + * concern. If the expected sync data contains a large number of models, with a large number + * of records per model, the concurrency limit should be set to a conservative value. However, + * if the expected sync data contains a large number of models, with a small amount of data in + * each model, setting this limit to a high value will greatly improve sync speeds. * @return Limit to the number of models that can sync concurrently */ public Integer getSyncConcurrencyLimit() { @@ -454,6 +459,11 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) { * Sets the concurrency limit for model syncing. Default is 1 * NOTE: If any sync models have associations, this value will be unused and the default (1) * will be used. + * Setting this number to a high value requires that the developer ensure app memory is not a + * concern. If the expected sync data contains a large number of models, with a large number + * of records per model, the concurrency limit should be set to a conservative value. However, + * if the expected sync data contains a large number of models, with a small amount of data in + * each model, setting this limit to a high value will greatly improve sync speeds. * @param syncConcurrencyLimit Number of models that can sync concurrently * @return Current builder */ From 8de25f7175942e836ab3d140bac6f37b9e233e41 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 12:53:14 -0400 Subject: [PATCH 05/13] checkstyle --- .../amplifyframework/datastore/DataStoreConfiguration.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index adf9dd44c..b9bec1371 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -562,7 +562,10 @@ private void applyUserProvidedConfiguration() { syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize); syncExpressions = userProvidedConfiguration.getSyncExpressions(); doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry); - syncConcurrencyLimit = getValueOrDefault(userProvidedConfiguration.getSyncConcurrencyLimit(), syncConcurrencyLimit); + syncConcurrencyLimit = getValueOrDefault( + userProvidedConfiguration.getSyncConcurrencyLimit(), + syncConcurrencyLimit + ); observeQueryMaxRecords = getValueOrDefault(userProvidedConfiguration.getObserveQueryMaxRecords(), observeQueryMaxRecords); maxTimeLapseForObserveQuery = userProvidedConfiguration.getMaxTimeLapseForObserveQuery() From 79e750bc3aa6f5febbcf74922d63348b6eeeef66 Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 17:09:19 -0400 Subject: [PATCH 06/13] fix default --- .../com/amplifyframework/datastore/DataStoreConfiguration.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index b9bec1371..7eaa97039 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -74,7 +74,8 @@ private DataStoreConfiguration(Builder builder) { this.syncIntervalInMinutes = builder.syncIntervalInMinutes; this.syncExpressions = builder.syncExpressions; this.doSyncRetry = builder.doSyncRetry; - this.syncConcurrencyLimit = builder.syncConcurrencyLimit; + this.syncConcurrencyLimit = builder.syncConcurrencyLimit != null ? + builder.syncConcurrencyLimit : DEFAULT_SYNC_CONCURRENCY_LIMIT; this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery; this.observeQueryMaxRecords = builder.observeQueryMaxRecords; } From 9237e1041775549a9cf39f5adcd9900bde78a6ab Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 7 May 2024 17:10:17 -0400 Subject: [PATCH 07/13] fix annotations --- .../amplifyframework/datastore/DataStoreConfiguration.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index 7eaa97039..dca8306c9 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -218,6 +218,8 @@ public Boolean getDoSyncRetry() { * each model, setting this limit to a high value will greatly improve sync speeds. * @return Limit to the number of models that can sync concurrently */ + @IntRange(from = 1) + @NonNull public Integer getSyncConcurrencyLimit() { return syncConcurrencyLimit; } @@ -469,7 +471,7 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) { * @return Current builder */ @NonNull - public Builder syncConcurrencyLimit(@IntRange(from = 0) Integer syncConcurrencyLimit) { + public Builder syncConcurrencyLimit(@IntRange(from = 1) Integer syncConcurrencyLimit) { this.syncConcurrencyLimit = syncConcurrencyLimit; return Builder.this; } From fcb45eb51d2849b5a7c4660ea914cd7c512cfad6 Mon Sep 17 00:00:00 2001 From: tjroach Date: Thu, 23 May 2024 14:08:24 -0400 Subject: [PATCH 08/13] rename per API review --- .../datastore/DataStoreConfiguration.java | 36 +++++++++---------- .../datastore/syncengine/SyncProcessor.java | 12 +++---- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index dca8306c9..952b80912 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -60,7 +60,7 @@ public final class DataStoreConfiguration { private final Integer syncMaxRecords; private final Integer syncPageSize; private final boolean doSyncRetry; - private final Integer syncConcurrencyLimit; + private final Integer syncMaxConcurrentModels; private final Map syncExpressions; private final Long syncIntervalInMinutes; private final Long maxTimeLapseForObserveQuery; @@ -74,8 +74,8 @@ private DataStoreConfiguration(Builder builder) { this.syncIntervalInMinutes = builder.syncIntervalInMinutes; this.syncExpressions = builder.syncExpressions; this.doSyncRetry = builder.doSyncRetry; - this.syncConcurrencyLimit = builder.syncConcurrencyLimit != null ? - builder.syncConcurrencyLimit : DEFAULT_SYNC_CONCURRENCY_LIMIT; + this.syncMaxConcurrentModels = builder.syncMaxConcurrentModels != null ? + builder.syncMaxConcurrentModels : DEFAULT_SYNC_CONCURRENCY_LIMIT; this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery; this.observeQueryMaxRecords = builder.observeQueryMaxRecords; } @@ -134,7 +134,7 @@ public static DataStoreConfiguration defaults() throws DataStoreException { .doSyncRetry(DEFAULT_DO_SYNC_RETRY) .observeQueryMaxTime(MAX_TIME_SEC) .observeQueryMaxRecords(MAX_RECORDS) - .syncConcurrencyLimit(DEFAULT_SYNC_CONCURRENCY_LIMIT) + .syncMaxConcurrentModels(DEFAULT_SYNC_CONCURRENCY_LIMIT) .build(); } @@ -220,8 +220,8 @@ public Boolean getDoSyncRetry() { */ @IntRange(from = 1) @NonNull - public Integer getSyncConcurrencyLimit() { - return syncConcurrencyLimit; + public Integer getSyncMaxConcurrentModels() { + return syncMaxConcurrentModels; } /** @@ -270,7 +270,7 @@ public boolean equals(@Nullable Object thatObject) { if (!ObjectsCompat.equals(getObserveQueryMaxRecords(), that.getObserveQueryMaxRecords())) { return false; } - if (!ObjectsCompat.equals(getSyncConcurrencyLimit(), that.getSyncConcurrencyLimit())) { + if (!ObjectsCompat.equals(getSyncMaxConcurrentModels(), that.getSyncMaxConcurrentModels())) { return false; } return true; @@ -287,7 +287,7 @@ public int hashCode() { result = 31 * result + getDoSyncRetry().hashCode(); result = 31 * result + (getObserveQueryMaxRecords() != null ? getObserveQueryMaxRecords().hashCode() : 0); result = 31 * result + getMaxTimeLapseForObserveQuery().hashCode(); - result = 31 * result + getSyncConcurrencyLimit().hashCode(); + result = 31 * result + getSyncMaxConcurrentModels().hashCode(); return result; } @@ -303,7 +303,7 @@ public String toString() { ", doSyncRetry=" + doSyncRetry + ", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery + ", observeQueryMaxRecords=" + observeQueryMaxRecords + - ", syncConcurrencyLimit=" + syncConcurrencyLimit + + ", syncMaxConcurrentModels=" + syncMaxConcurrentModels + '}'; } @@ -337,7 +337,7 @@ public static final class Builder { private Integer syncMaxRecords; private Integer syncPageSize; private boolean doSyncRetry; - private Integer syncConcurrencyLimit; + private Integer syncMaxConcurrentModels; private Map syncExpressions; private boolean ensureDefaults; private JSONObject pluginJson; @@ -459,7 +459,7 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) { } /** - * Sets the concurrency limit for model syncing. Default is 1 + * Sets the max concurrency limit for model syncing. Default is 1 * NOTE: If any sync models have associations, this value will be unused and the default (1) * will be used. * Setting this number to a high value requires that the developer ensure app memory is not a @@ -467,12 +467,12 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) { * of records per model, the concurrency limit should be set to a conservative value. However, * if the expected sync data contains a large number of models, with a small amount of data in * each model, setting this limit to a high value will greatly improve sync speeds. - * @param syncConcurrencyLimit Number of models that can sync concurrently + * @param syncMaxConcurrentModels Number of models that can sync concurrently * @return Current builder */ @NonNull - public Builder syncConcurrencyLimit(@IntRange(from = 1) Integer syncConcurrencyLimit) { - this.syncConcurrencyLimit = syncConcurrencyLimit; + public Builder syncMaxConcurrentModels(@IntRange(from = 1) Integer syncMaxConcurrentModels) { + this.syncMaxConcurrentModels = syncMaxConcurrentModels; return Builder.this; } @@ -565,9 +565,9 @@ private void applyUserProvidedConfiguration() { syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize); syncExpressions = userProvidedConfiguration.getSyncExpressions(); doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry); - syncConcurrencyLimit = getValueOrDefault( - userProvidedConfiguration.getSyncConcurrencyLimit(), - syncConcurrencyLimit + syncMaxConcurrentModels = getValueOrDefault( + userProvidedConfiguration.getSyncMaxConcurrentModels(), + syncMaxConcurrentModels ); observeQueryMaxRecords = getValueOrDefault(userProvidedConfiguration.getObserveQueryMaxRecords(), observeQueryMaxRecords); @@ -599,7 +599,7 @@ public DataStoreConfiguration build() throws DataStoreException { syncIntervalInMinutes = getValueOrDefault(syncIntervalInMinutes, DEFAULT_SYNC_INTERVAL_MINUTES); syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS); syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE); - syncConcurrencyLimit = getValueOrDefault(syncConcurrencyLimit, DEFAULT_SYNC_CONCURRENCY_LIMIT); + syncMaxConcurrentModels = getValueOrDefault(syncMaxConcurrentModels, DEFAULT_SYNC_CONCURRENCY_LIMIT); observeQueryMaxRecords = getValueOrDefault(observeQueryMaxRecords, MAX_RECORDS); maxTimeLapseForObserveQuery = maxTimeLapseForObserveQuery == 0 ? MAX_TIME_SEC : maxTimeLapseForObserveQuery; diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java index 41168dfb6..6e11004f1 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java @@ -139,20 +139,20 @@ Completable hydrate() { } } - int syncConcurrencyLimit; + int syncMaxConcurrentModels; try { - syncConcurrencyLimit = dataStoreConfigurationProvider + syncMaxConcurrentModels = dataStoreConfigurationProvider .getConfiguration() - .getSyncConcurrencyLimit(); + .getSyncMaxConcurrentModels(); } catch (DataStoreException exception) { - syncConcurrencyLimit = 1; + syncMaxConcurrentModels = 1; } Completable syncCompletable; - if (syncInParallel && syncConcurrencyLimit > 1) { + if (syncInParallel && syncMaxConcurrentModels > 1) { syncCompletable = Completable.mergeDelayError( Flowable.fromIterable(hydrationTasks), - syncConcurrencyLimit + syncMaxConcurrentModels ); } else { syncCompletable = Completable.concat(hydrationTasks); From 15da6ac9157fd7d7a168eb445bf588af3ad10e2e Mon Sep 17 00:00:00 2001 From: tjroach Date: Thu, 23 May 2024 14:33:53 -0400 Subject: [PATCH 09/13] rename per API review --- aws-datastore/api/aws-datastore.api | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws-datastore/api/aws-datastore.api b/aws-datastore/api/aws-datastore.api index 2d38ba22f..b319bee8d 100644 --- a/aws-datastore/api/aws-datastore.api +++ b/aws-datastore/api/aws-datastore.api @@ -61,10 +61,10 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration { public fun getErrorHandler ()Lcom/amplifyframework/datastore/DataStoreErrorHandler; public fun getMaxTimeLapseForObserveQuery ()Ljava/lang/Long; public fun getObserveQueryMaxRecords ()Ljava/lang/Integer; - public fun getSyncConcurrencyLimit ()Ljava/lang/Integer; public fun getSyncExpressions ()Ljava/util/Map; public fun getSyncIntervalInMinutes ()Ljava/lang/Long; public fun getSyncIntervalMs ()Ljava/lang/Long; + public fun getSyncMaxConcurrentModels ()Ljava/lang/Integer; public fun getSyncMaxRecords ()Ljava/lang/Integer; public fun getSyncPageSize ()Ljava/lang/Integer; public fun hashCode ()I @@ -78,10 +78,10 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration$Builder public fun errorHandler (Lcom/amplifyframework/datastore/DataStoreErrorHandler;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun observeQueryMaxRecords (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun observeQueryMaxTime (J)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; - public fun syncConcurrencyLimit (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncExpression (Ljava/lang/Class;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncExpression (Ljava/lang/String;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncInterval (JLjava/util/concurrent/TimeUnit;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; + public fun syncMaxConcurrentModels (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncMaxRecords (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; public fun syncPageSize (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder; } From 4705505aa0f1b775d16faad62faedd5eadf0be0d Mon Sep 17 00:00:00 2001 From: tjroach Date: Tue, 4 Jun 2024 10:45:38 -0400 Subject: [PATCH 10/13] max concurrent models config test --- .../datastore/DataStoreConfiguration.java | 11 +++++++---- .../datastore/DataStoreConfigurationTest.java | 7 ++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java index 952b80912..97ce66618 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/DataStoreConfiguration.java @@ -49,7 +49,7 @@ public final class DataStoreConfiguration { @VisibleForTesting static final boolean DEFAULT_DO_SYNC_RETRY = false; @VisibleForTesting - static final int DEFAULT_SYNC_CONCURRENCY_LIMIT = 1; + static final int DEFAULT_SYNC_MAX_CONCURRENT_MODELS = 1; static final int MAX_RECORDS = 1000; static final long MAX_TIME_SEC = 2; @@ -75,7 +75,7 @@ private DataStoreConfiguration(Builder builder) { this.syncExpressions = builder.syncExpressions; this.doSyncRetry = builder.doSyncRetry; this.syncMaxConcurrentModels = builder.syncMaxConcurrentModels != null ? - builder.syncMaxConcurrentModels : DEFAULT_SYNC_CONCURRENCY_LIMIT; + builder.syncMaxConcurrentModels : DEFAULT_SYNC_MAX_CONCURRENT_MODELS; this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery; this.observeQueryMaxRecords = builder.observeQueryMaxRecords; } @@ -134,7 +134,7 @@ public static DataStoreConfiguration defaults() throws DataStoreException { .doSyncRetry(DEFAULT_DO_SYNC_RETRY) .observeQueryMaxTime(MAX_TIME_SEC) .observeQueryMaxRecords(MAX_RECORDS) - .syncMaxConcurrentModels(DEFAULT_SYNC_CONCURRENCY_LIMIT) + .syncMaxConcurrentModels(DEFAULT_SYNC_MAX_CONCURRENT_MODELS) .build(); } @@ -599,7 +599,10 @@ public DataStoreConfiguration build() throws DataStoreException { syncIntervalInMinutes = getValueOrDefault(syncIntervalInMinutes, DEFAULT_SYNC_INTERVAL_MINUTES); syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS); syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE); - syncMaxConcurrentModels = getValueOrDefault(syncMaxConcurrentModels, DEFAULT_SYNC_CONCURRENCY_LIMIT); + syncMaxConcurrentModels = getValueOrDefault( + syncMaxConcurrentModels, + DEFAULT_SYNC_MAX_CONCURRENT_MODELS + ); observeQueryMaxRecords = getValueOrDefault(observeQueryMaxRecords, MAX_RECORDS); maxTimeLapseForObserveQuery = maxTimeLapseForObserveQuery == 0 ? MAX_TIME_SEC : maxTimeLapseForObserveQuery; diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java b/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java index 4eaf9bdf2..c4a546a46 100644 --- a/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/DataStoreConfigurationTest.java @@ -62,6 +62,8 @@ public void testDefaultConfiguration() throws DataStoreException { dataStoreConfiguration.getSyncMaxRecords().intValue()); assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE, dataStoreConfiguration.getSyncPageSize().intValue()); + assertEquals(DataStoreConfiguration.DEFAULT_SYNC_MAX_CONCURRENT_MODELS, + dataStoreConfiguration.getSyncMaxConcurrentModels().intValue()); assertTrue(dataStoreConfiguration.getConflictHandler() instanceof AlwaysApplyRemoteHandler); assertTrue(dataStoreConfiguration.getErrorHandler() instanceof DefaultDataStoreErrorHandler); @@ -107,6 +109,7 @@ public void testDefaultOverriddenFromConfigurationAndObject() long expectedSyncIntervalMinutes = 6L; Long expectedSyncIntervalMs = TimeUnit.MINUTES.toMillis(expectedSyncIntervalMinutes); Integer expectedSyncMaxRecords = 3; + Integer expectedSyncMaxConcurrentModels = 5; DummyConflictHandler dummyConflictHandler = new DummyConflictHandler(); DataStoreErrorHandler errorHandler = DefaultDataStoreErrorHandler.instance(); @@ -121,7 +124,8 @@ public void testDefaultOverriddenFromConfigurationAndObject() .errorHandler(errorHandler) .syncExpression(BlogOwner.class, ownerSyncExpression) .syncExpression("Post", postSyncExpression) - .doSyncRetry(true) + .doSyncRetry(true) + .syncMaxConcurrentModels(expectedSyncMaxConcurrentModels) .build(); JSONObject jsonConfigFromFile = new JSONObject() @@ -132,6 +136,7 @@ public void testDefaultOverriddenFromConfigurationAndObject() assertEquals(expectedSyncIntervalMs, dataStoreConfiguration.getSyncIntervalMs()); assertEquals(expectedSyncMaxRecords, dataStoreConfiguration.getSyncMaxRecords()); + assertEquals(expectedSyncMaxConcurrentModels, dataStoreConfiguration.getSyncMaxConcurrentModels()); assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE, dataStoreConfiguration.getSyncPageSize().longValue()); assertTrue(dataStoreConfiguration.getDoSyncRetry()); From c3e783f719386b639e8ff1c8b7de65f3bc62a78f Mon Sep 17 00:00:00 2001 From: tjroach Date: Wed, 5 Jun 2024 14:36:45 -0400 Subject: [PATCH 11/13] add datastore model concurrency tests --- .../datastore/syncengine/SyncProcessor.java | 2 +- .../syncengine/ConcurrentSyncProcessorTest.kt | 287 ++++++++++++++++++ .../testmodels/flat/AmplifyModelProvider.java | 53 ++++ .../testmodels/flat/Model1.java | 185 +++++++++++ .../testmodels/flat/Model2.java | 179 +++++++++++ 5 files changed, 705 insertions(+), 1 deletion(-) create mode 100644 aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt create mode 100644 testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java create mode 100644 testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java create mode 100644 testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java index 6e11004f1..8531a57a2 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java @@ -133,7 +133,7 @@ Completable hydrate() { if (!QueryPredicates.none().equals(queryPredicateProvider.getPredicate(schema.getName()))) { hydrationTasks.add(createHydrationTask(schema)); toBeSyncedModelArray.add(schema.getName()); - if (schema.getAssociations().size() > 0) { + if (!schema.getAssociations().isEmpty()) { syncInParallel = false; } } diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt new file mode 100644 index 000000000..ee2775e47 --- /dev/null +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt @@ -0,0 +1,287 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.datastore.syncengine + +import androidx.test.core.app.ApplicationProvider +import com.amplifyframework.core.model.Model +import com.amplifyframework.core.model.ModelProvider +import com.amplifyframework.core.model.SchemaRegistry +import com.amplifyframework.core.model.temporal.Temporal +import com.amplifyframework.datastore.DataStoreConfiguration +import com.amplifyframework.datastore.DataStoreConfigurationProvider +import com.amplifyframework.datastore.DataStoreErrorHandler +import com.amplifyframework.datastore.DataStoreException +import com.amplifyframework.datastore.appsync.AppSync +import com.amplifyframework.datastore.appsync.AppSyncMocking +import com.amplifyframework.datastore.appsync.ModelMetadata +import com.amplifyframework.datastore.appsync.ModelWithMetadata +import com.amplifyframework.datastore.model.SystemModelsProviderFactory +import com.amplifyframework.datastore.storage.SynchronousStorageAdapter +import com.amplifyframework.datastore.storage.sqlite.SQLiteStorageAdapter +import com.amplifyframework.testmodels.flat.AmplifyModelProvider +import com.amplifyframework.testmodels.flat.Model1 +import com.amplifyframework.testmodels.flat.Model2 +import io.mockk.mockk +import io.mockk.verify +import io.reactivex.rxjava3.observers.TestObserver +import java.util.concurrent.TimeUnit +import junit.framework.TestCase.assertEquals +import junit.framework.TestCase.assertTrue +import kotlin.random.Random +import org.junit.After +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.mockito.Mockito.mock +import org.robolectric.RobolectricTestRunner + +/** + * Tests the [SyncProcessor] with concurrency enabled. + */ +@RunWith(RobolectricTestRunner::class) +class ConcurrentSyncProcessorTest { + private val appSync = mock(AppSync::class.java) // using Mockito due to existing AppSyncMocking setup + private val errorHandler = mockk(relaxed = true) + + private lateinit var modelProvider: ModelProvider + private lateinit var storageAdapter: SynchronousStorageAdapter + + private lateinit var syncProcessor: SyncProcessor + + @Before + fun setup() { + modelProvider = AmplifyModelProvider.getInstance() + val schemaRegistry = SchemaRegistry.instance().apply { + clear() + register(modelProvider.models()) + } + + val dataStoreConfiguration = DataStoreConfiguration.builder() + .errorHandler(errorHandler) + .syncInterval(1, TimeUnit.MINUTES) + .syncPageSize(100) + .syncMaxRecords(1000) + .syncMaxConcurrentModels(3) + .build() + + val sqliteStorageAdapter = SQLiteStorageAdapter.forModels( + schemaRegistry, + modelProvider + ) + + storageAdapter = SynchronousStorageAdapter.delegatingTo(sqliteStorageAdapter).apply { + initialize(ApplicationProvider.getApplicationContext(), dataStoreConfiguration) + } + + val syncTimeRegistry = SyncTimeRegistry(sqliteStorageAdapter) + val mutationOutbox: MutationOutbox = PersistentMutationOutbox(sqliteStorageAdapter) + val versionRepository = VersionRepository(sqliteStorageAdapter) + val merger = Merger(mutationOutbox, versionRepository, sqliteStorageAdapter) + + val dataStoreConfigurationProvider = DataStoreConfigurationProvider { dataStoreConfiguration } + + this.syncProcessor = SyncProcessor.builder() + .modelProvider(modelProvider) + .schemaRegistry(schemaRegistry) + .syncTimeRegistry(syncTimeRegistry) + .appSync(appSync) + .merger(merger) + .dataStoreConfigurationProvider(dataStoreConfigurationProvider) + .queryPredicateProvider( + QueryPredicateProvider(dataStoreConfigurationProvider).apply { + resolvePredicates() + } + ) + .retryHandler(RetryHandler()) + .isSyncRetryEnabled(false) + .build() + } + + /** + * Test Cleanup. + * @throws DataStoreException On storage adapter terminate failure + */ + @After + fun tearDown() { + storageAdapter.terminate() + } + + @Test + fun `sync with concurrency`() { + // Arrange a subscription to the storage adapter. We're going to watch for changes. + // We expect to see content here as a result of the SyncProcessor applying updates. + val adapterObserver = storageAdapter.observe().test() + + // Arrange: return some responses for the sync() call on the RemoteModelState + val configurator = AppSyncMocking.sync(appSync) + + val model1 = Model1.builder().name("M1_1").build() + val model1Metadata = ModelMetadata(model1.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model1WithMetadata = ModelWithMetadata(model1, model1Metadata) + val expectedModel1Response = mutableListOf(model1WithMetadata) + + val model2 = Model2.builder().name("M2_1").build() + val model2Metadata = ModelMetadata(model2.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model2WithMetadata = ModelWithMetadata(model2, model2Metadata) + val expectedModel2Response = mutableListOf(model2WithMetadata) + + val allExpectedModels = expectedModel1Response + expectedModel2Response + + configurator.mockSuccessResponse(Model1::class.java, model1WithMetadata) + configurator.mockSuccessResponse(Model2::class.java, model2WithMetadata) + + // Act: Call hydrate, and await its completion - assert it completed without error + val hydrationObserver = TestObserver.create>() + syncProcessor.hydrate().subscribe(hydrationObserver) + + assertTrue(hydrationObserver.await(2, TimeUnit.SECONDS)) + hydrationObserver.assertNoErrors() + hydrationObserver.assertComplete() + + // Since hydrate() completed, the storage adapter observer should see some values. + // The number should match expectedResponseItems * 2 (1 for model, 1 for metadata) + // Additionally, there should be 1 LastSyncMetadata record for each model in the provider + adapterObserver.awaitCount(allExpectedModels.size * 2 + AmplifyModelProvider.getInstance().models().size) + + // Validate the changes emitted from the storage adapter's observe(). Sorted to compare lists + assertEquals( + allExpectedModels.flatMap { + listOf(it.model, it.syncMetadata) + }.sortedBy { it.primaryKeyString }, + adapterObserver.values() + .map { it.item() } + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + ) + + // Lastly: validate the current contents of the storage adapter. + val itemsInStorage = storageAdapter.query(modelProvider) + assertEquals(allExpectedModels.size, itemsInStorage.size) + + val expectedModels = allExpectedModels.map { it.model }.sortedBy { it.primaryKeyString } + val expectedMetadata = allExpectedModels.map { it.syncMetadata }.sortedBy { it.primaryKeyString } + // system model size excluding metadata should = number of models and + // + 1 (LastSyncMetadata for each + PersistentModelVersion) + val expectedSystemModelsSize = modelProvider.models().size + 1 + + val actualModels = itemsInStorage + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + val actualMetadata = + storageAdapter.query(SystemModelsProviderFactory.create()) + .filter { ModelMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + + val actualSystemModels = storageAdapter.query(SystemModelsProviderFactory.create()).filter { + !ModelMetadata::class.java.isAssignableFrom(it.javaClass) + } + + assertEquals(expectedModels, actualModels) + assertEquals(expectedMetadata, actualMetadata) + assertEquals(expectedSystemModelsSize, actualSystemModels.size) + + adapterObserver.dispose() + hydrationObserver.dispose() + } + + @Test + fun `sync with concurrency continues when single model fails`() { + // Arrange a subscription to the storage adapter. We're going to watch for changes. + // We expect to see content here as a result of the SyncProcessor applying updates. + val adapterObserver = storageAdapter.observe().test() + + // Arrange: return some responses for the sync() call on the RemoteModelState + val configurator = AppSyncMocking.sync(appSync) + + val expectedModel1Exception = DataStoreException("Failed to sync Model1", "Failed to sync Model1") + + val model2Item1 = Model2.builder().name("M2_1").build() + val model2Item1Metadata = ModelMetadata(model2Item1.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model2Item1WithMetadata = ModelWithMetadata(model2Item1, model2Item1Metadata) + val expectedModel2Response1 = mutableListOf(model2Item1WithMetadata) + + val model2Item2 = Model2.builder().name("M2_2").build() + val model2Item2Metadata = ModelMetadata(model2Item2.id, null, Random.nextInt(), Temporal.Timestamp.now()) + val model2Item2WithMetadata = ModelWithMetadata(model2Item2, model2Item2Metadata) + val expectedModel2Response2 = mutableListOf(model2Item2WithMetadata) + + val allExpectedModels = expectedModel2Response1 + expectedModel2Response2 + + configurator.mockFailure(expectedModel1Exception) + configurator.mockSuccessResponse(Model2::class.java, null, "page2", model2Item1WithMetadata) + configurator.mockSuccessResponse(Model2::class.java, "page2", null, model2Item2WithMetadata) + + // Act: Call hydrate, and await its completion - assert it completed without error + val hydrationObserver = TestObserver.create>() + syncProcessor.hydrate().subscribe(hydrationObserver) + + assertTrue(hydrationObserver.await(2, TimeUnit.SECONDS)) + hydrationObserver.assertError { it == expectedModel1Exception } + verify { + errorHandler.accept( + DataStoreException( + "Initial cloud sync failed for Model1.", + expectedModel1Exception, + "Check your internet connection." + ) + ) + } + + // Since hydrate() completed, the storage adapter observer should see some values. + // The number should match expectedResponseItems * 2 (1 for model, 1 for metadata) + // Additionally, there should be 1 LastSyncMetadata for model2 sync success. + // Model1 will not have a last sync record due to failure + adapterObserver.awaitCount(allExpectedModels.size * 2 + 1) + + // Validate the changes emitted from the storage adapter's observe(). Sorted to compare lists + assertEquals( + allExpectedModels.flatMap { + listOf(it.model, it.syncMetadata) + }.sortedBy { it.primaryKeyString }, + adapterObserver.values() + .map { it.item() } + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + ) + + // Lastly: validate the current contents of the storage adapter. + val itemsInStorage = storageAdapter.query(modelProvider) + assertEquals(allExpectedModels.size, itemsInStorage.size) + + val expectedModels = allExpectedModels.map { it.model }.sortedBy { it.primaryKeyString } + val expectedMetadata = allExpectedModels.map { it.syncMetadata }.sortedBy { it.primaryKeyString } + // LastSyncMetadata and PersistentModel version for Model2 success + val expectedSystemModelsSize = 2 + + val actualModels = itemsInStorage + .filter { !LastSyncMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + val actualMetadata = + storageAdapter.query(SystemModelsProviderFactory.create()) + .filter { ModelMetadata::class.java.isAssignableFrom(it.javaClass) } + .sortedBy { it.primaryKeyString } + + val actualSystemModels = storageAdapter.query(SystemModelsProviderFactory.create()).filter { + !ModelMetadata::class.java.isAssignableFrom(it.javaClass) + } + + assertEquals(expectedModels, actualModels) + assertEquals(expectedMetadata, actualMetadata) + assertEquals(expectedSystemModelsSize, actualSystemModels.size) + + adapterObserver.dispose() + hydrationObserver.dispose() + } +} diff --git a/testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java new file mode 100644 index 000000000..5c177bd2d --- /dev/null +++ b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/AmplifyModelProvider.java @@ -0,0 +1,53 @@ +package com.amplifyframework.testmodels.flat; + +import com.amplifyframework.core.model.Model; +import com.amplifyframework.core.model.ModelProvider; +import com.amplifyframework.util.Immutable; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +/** + * Contains the set of model classes that implement {@link Model} + * interface. + */ + +public final class AmplifyModelProvider implements ModelProvider { + private static final String AMPLIFY_MODEL_VERSION = "a5218086c100c39df9b1bc3dd3e87c93"; + private static AmplifyModelProvider amplifyGeneratedModelInstance; + private AmplifyModelProvider() { + + } + + public static synchronized AmplifyModelProvider getInstance() { + if (amplifyGeneratedModelInstance == null) { + amplifyGeneratedModelInstance = new AmplifyModelProvider(); + } + return amplifyGeneratedModelInstance; + } + + /** + * Get a set of the model classes. + * + * @return a set of the model classes. + */ + @Override + public Set> models() { + final Set> modifiableSet = new HashSet<>( + Arrays.>asList(Model1.class, Model2.class) + ); + + return Immutable.of(modifiableSet); + + } + + /** + * Get the version of the models. + * + * @return the version string of the models. + */ + @Override + public String version() { + return AMPLIFY_MODEL_VERSION; + } +} diff --git a/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java new file mode 100644 index 000000000..72a5425bc --- /dev/null +++ b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model1.java @@ -0,0 +1,185 @@ +package com.amplifyframework.testmodels.flat; + +import androidx.core.util.ObjectsCompat; + +import com.amplifyframework.core.model.Model; +import com.amplifyframework.core.model.ModelIdentifier; +import com.amplifyframework.core.model.annotations.ModelConfig; +import com.amplifyframework.core.model.annotations.ModelField; +import com.amplifyframework.core.model.query.predicate.QueryField; +import com.amplifyframework.core.model.temporal.Temporal; + +import java.util.Objects; +import java.util.UUID; + +import static com.amplifyframework.core.model.query.predicate.QueryField.field; + +/** This is an auto generated class representing the Model1 type in your schema. */ +@SuppressWarnings("all") +@ModelConfig(pluralName = "Model1s", type = Model.Type.USER, version = 1) +public final class Model1 implements Model { + public static final QueryField ID = field("Model1", "id"); + public static final QueryField NAME = field("Model1", "name"); + private final @ModelField(targetType="ID", isRequired = true) String id; + private final @ModelField(targetType="String", isRequired = true) String name; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime createdAt; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime updatedAt; + /** @deprecated This API is internal to Amplify and should not be used. */ + @Deprecated + public String resolveIdentifier() { + return id; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public Temporal.DateTime getCreatedAt() { + return createdAt; + } + + public Temporal.DateTime getUpdatedAt() { + return updatedAt; + } + + private Model1(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if(obj == null || getClass() != obj.getClass()) { + return false; + } else { + Model1 model1 = (Model1) obj; + return ObjectsCompat.equals(getId(), model1.getId()) && + ObjectsCompat.equals(getName(), model1.getName()) && + ObjectsCompat.equals(getCreatedAt(), model1.getCreatedAt()) && + ObjectsCompat.equals(getUpdatedAt(), model1.getUpdatedAt()); + } + } + + @Override + public int hashCode() { + return new StringBuilder() + .append(getId()) + .append(getName()) + .append(getCreatedAt()) + .append(getUpdatedAt()) + .toString() + .hashCode(); + } + + @Override + public String toString() { + return new StringBuilder() + .append("Model1 {") + .append("id=" + String.valueOf(getId()) + ", ") + .append("name=" + String.valueOf(getName()) + ", ") + .append("createdAt=" + String.valueOf(getCreatedAt()) + ", ") + .append("updatedAt=" + String.valueOf(getUpdatedAt())) + .append("}") + .toString(); + } + + public static NameStep builder() { + return new Builder(); + } + + /** + * WARNING: This method should not be used to build an instance of this object for a CREATE mutation. + * This is a convenience method to return an instance of the object with only its ID populated + * to be used in the context of a parameter in a delete mutation or referencing a foreign key + * in a relationship. + * @param id the id of the existing item this instance will represent + * @return an instance of this model with only ID populated + */ + public static Model1 justId(String id) { + return new Model1( + id, + null + ); + } + + public CopyOfBuilder copyOfBuilder() { + return new CopyOfBuilder(id, + name); + } + public interface NameStep { + BuildStep name(String name); + } + + + public interface BuildStep { + Model1 build(); + BuildStep id(String id); + } + + + public static class Builder implements NameStep, BuildStep { + private String id; + private String name; + public Builder() { + + } + + private Builder(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public Model1 build() { + String id = this.id != null ? this.id : UUID.randomUUID().toString(); + + return new Model1( + id, + name); + } + + @Override + public BuildStep name(String name) { + Objects.requireNonNull(name); + this.name = name; + return this; + } + + /** + * @param id id + * @return Current Builder instance, for fluent method chaining + */ + public BuildStep id(String id) { + this.id = id; + return this; + } + } + + + public final class CopyOfBuilder extends Builder { + private CopyOfBuilder(String id, String name) { + super(id, name); + Objects.requireNonNull(name); + } + + @Override + public CopyOfBuilder name(String name) { + return (CopyOfBuilder) super.name(name); + } + } + + + public static class Model1Identifier extends ModelIdentifier { + private static final long serialVersionUID = 1L; + public Model1Identifier(String id) { + super(id); + } + } + +} diff --git a/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java new file mode 100644 index 000000000..329d12c1f --- /dev/null +++ b/testmodels/src/main/java/com/amplifyframework/testmodels/flat/Model2.java @@ -0,0 +1,179 @@ +package com.amplifyframework.testmodels.flat; + +import androidx.core.util.ObjectsCompat; + +import com.amplifyframework.core.model.Model; +import com.amplifyframework.core.model.ModelIdentifier; +import com.amplifyframework.core.model.annotations.ModelConfig; +import com.amplifyframework.core.model.annotations.ModelField; +import com.amplifyframework.core.model.query.predicate.QueryField; +import com.amplifyframework.core.model.temporal.Temporal; + +import java.util.UUID; + +import static com.amplifyframework.core.model.query.predicate.QueryField.field; + +/** This is an auto generated class representing the Model2 type in your schema. */ +@SuppressWarnings("all") +@ModelConfig(pluralName = "Model2s", type = Model.Type.USER, version = 1) +public final class Model2 implements Model { + public static final QueryField ID = field("Model2", "id"); + public static final QueryField NAME = field("Model2", "name"); + private final @ModelField(targetType="ID", isRequired = true) String id; + private final @ModelField(targetType="String") String name; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime createdAt; + private @ModelField(targetType="AWSDateTime", isReadOnly = true) Temporal.DateTime updatedAt; + /** @deprecated This API is internal to Amplify and should not be used. */ + @Deprecated + public String resolveIdentifier() { + return id; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public Temporal.DateTime getCreatedAt() { + return createdAt; + } + + public Temporal.DateTime getUpdatedAt() { + return updatedAt; + } + + private Model2(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if(obj == null || getClass() != obj.getClass()) { + return false; + } else { + Model2 model2 = (Model2) obj; + return ObjectsCompat.equals(getId(), model2.getId()) && + ObjectsCompat.equals(getName(), model2.getName()) && + ObjectsCompat.equals(getCreatedAt(), model2.getCreatedAt()) && + ObjectsCompat.equals(getUpdatedAt(), model2.getUpdatedAt()); + } + } + + @Override + public int hashCode() { + return new StringBuilder() + .append(getId()) + .append(getName()) + .append(getCreatedAt()) + .append(getUpdatedAt()) + .toString() + .hashCode(); + } + + @Override + public String toString() { + return new StringBuilder() + .append("Model2 {") + .append("id=" + String.valueOf(getId()) + ", ") + .append("name=" + String.valueOf(getName()) + ", ") + .append("createdAt=" + String.valueOf(getCreatedAt()) + ", ") + .append("updatedAt=" + String.valueOf(getUpdatedAt())) + .append("}") + .toString(); + } + + public static BuildStep builder() { + return new Builder(); + } + + /** + * WARNING: This method should not be used to build an instance of this object for a CREATE mutation. + * This is a convenience method to return an instance of the object with only its ID populated + * to be used in the context of a parameter in a delete mutation or referencing a foreign key + * in a relationship. + * @param id the id of the existing item this instance will represent + * @return an instance of this model with only ID populated + */ + public static Model2 justId(String id) { + return new Model2( + id, + null + ); + } + + public CopyOfBuilder copyOfBuilder() { + return new CopyOfBuilder(id, + name); + } + public interface BuildStep { + Model2 build(); + BuildStep id(String id); + BuildStep name(String name); + } + + + public static class Builder implements BuildStep { + private String id; + private String name; + public Builder() { + + } + + private Builder(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public Model2 build() { + String id = this.id != null ? this.id : UUID.randomUUID().toString(); + + return new Model2( + id, + name); + } + + @Override + public BuildStep name(String name) { + this.name = name; + return this; + } + + /** + * @param id id + * @return Current Builder instance, for fluent method chaining + */ + public BuildStep id(String id) { + this.id = id; + return this; + } + } + + + public final class CopyOfBuilder extends Builder { + private CopyOfBuilder(String id, String name) { + super(id, name); + + } + + @Override + public CopyOfBuilder name(String name) { + return (CopyOfBuilder) super.name(name); + } + } + + + public static class Model2Identifier extends ModelIdentifier { + private static final long serialVersionUID = 1L; + public Model2Identifier(String id) { + super(id); + } + } + +} From 8e65be9e062f8ebb4a550636ed716612db6a4126 Mon Sep 17 00:00:00 2001 From: tjroach Date: Thu, 6 Jun 2024 13:48:57 -0400 Subject: [PATCH 12/13] pr comments --- .../datastore/syncengine/SyncProcessor.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java index 8531a57a2..43bd39e08 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SyncProcessor.java @@ -126,7 +126,7 @@ Completable hydrate() { TopologicalOrdering.forRegisteredModels(schemaRegistry, modelProvider); Collections.sort(modelSchemas, ordering::compare); ArrayList toBeSyncedModelArray = new ArrayList<>(); - boolean syncInParallel = true; + boolean canSyncConcurrently = true; for (ModelSchema schema : modelSchemas) { //Check to see if query predicate for this schema is not equal to none. This means customer does // not want to sync the data for this model. @@ -134,7 +134,7 @@ Completable hydrate() { hydrationTasks.add(createHydrationTask(schema)); toBeSyncedModelArray.add(schema.getName()); if (!schema.getAssociations().isEmpty()) { - syncInParallel = false; + canSyncConcurrently = false; } } } @@ -149,12 +149,16 @@ Completable hydrate() { } Completable syncCompletable; - if (syncInParallel && syncMaxConcurrentModels > 1) { + if (canSyncConcurrently && syncMaxConcurrentModels > 1) { syncCompletable = Completable.mergeDelayError( Flowable.fromIterable(hydrationTasks), syncMaxConcurrentModels ); } else { + // The reason we don't do mergeDelayError here with maxConcurrency = 1 is because it would create a + // behavioral difference. If a failure is encountered in concat, sync immediately stops. This would be + // the wrong behavior when concurrency is enabled, but in the single concurrency use case, this matches + // previous behavior syncCompletable = Completable.concat(hydrationTasks); } From eafe454336e3c37c0f8a9875c3caeef0ea35994f Mon Sep 17 00:00:00 2001 From: tjroach Date: Thu, 6 Jun 2024 15:25:41 -0400 Subject: [PATCH 13/13] wait a bit longer for hydration in test --- .../datastore/syncengine/ConcurrentSyncProcessorTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt index ee2775e47..9b1562ee9 100644 --- a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt +++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ConcurrentSyncProcessorTest.kt @@ -146,7 +146,7 @@ class ConcurrentSyncProcessorTest { val hydrationObserver = TestObserver.create>() syncProcessor.hydrate().subscribe(hydrationObserver) - assertTrue(hydrationObserver.await(2, TimeUnit.SECONDS)) + assertTrue(hydrationObserver.await(5, TimeUnit.SECONDS)) hydrationObserver.assertNoErrors() hydrationObserver.assertComplete() @@ -227,7 +227,7 @@ class ConcurrentSyncProcessorTest { val hydrationObserver = TestObserver.create>() syncProcessor.hydrate().subscribe(hydrationObserver) - assertTrue(hydrationObserver.await(2, TimeUnit.SECONDS)) + assertTrue(hydrationObserver.await(5, TimeUnit.SECONDS)) hydrationObserver.assertError { it == expectedModel1Exception } verify { errorHandler.accept(