Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStore Model Sync Parallelization #2808

Merged
merged 16 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aws-datastore/api/aws-datastore.api
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration {
public fun getSyncExpressions ()Ljava/util/Map;
public fun getSyncIntervalInMinutes ()Ljava/lang/Long;
public fun getSyncIntervalMs ()Ljava/lang/Long;
public fun getSyncMaxConcurrentModels ()Ljava/lang/Integer;
public fun getSyncMaxRecords ()Ljava/lang/Integer;
public fun getSyncPageSize ()Ljava/lang/Integer;
public fun hashCode ()I
Expand All @@ -80,6 +81,7 @@ public final class com/amplifyframework/datastore/DataStoreConfiguration$Builder
public fun syncExpression (Ljava/lang/Class;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
public fun syncExpression (Ljava/lang/String;Lcom/amplifyframework/datastore/DataStoreSyncExpression;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
public fun syncInterval (JLjava/util/concurrent/TimeUnit;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
public fun syncMaxConcurrentModels (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
public fun syncMaxRecords (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
public fun syncPageSize (Ljava/lang/Integer;)Lcom/amplifyframework/datastore/DataStoreConfiguration$Builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class DataStoreConfiguration {
static final int DEFAULT_SYNC_PAGE_SIZE = 1_000;
@VisibleForTesting
static final boolean DEFAULT_DO_SYNC_RETRY = false;
@VisibleForTesting
static final int DEFAULT_SYNC_MAX_CONCURRENT_MODELS = 1;
static final int MAX_RECORDS = 1000;
static final long MAX_TIME_SEC = 2;

Expand All @@ -58,6 +60,7 @@ public final class DataStoreConfiguration {
private final Integer syncMaxRecords;
private final Integer syncPageSize;
private final boolean doSyncRetry;
private final Integer syncMaxConcurrentModels;
private final Map<String, DataStoreSyncExpression> syncExpressions;
private final Long syncIntervalInMinutes;
private final Long maxTimeLapseForObserveQuery;
Expand All @@ -71,6 +74,8 @@ private DataStoreConfiguration(Builder builder) {
this.syncIntervalInMinutes = builder.syncIntervalInMinutes;
this.syncExpressions = builder.syncExpressions;
this.doSyncRetry = builder.doSyncRetry;
this.syncMaxConcurrentModels = builder.syncMaxConcurrentModels != null ?
tylerjroach marked this conversation as resolved.
Show resolved Hide resolved
builder.syncMaxConcurrentModels : DEFAULT_SYNC_MAX_CONCURRENT_MODELS;
this.maxTimeLapseForObserveQuery = builder.maxTimeLapseForObserveQuery;
this.observeQueryMaxRecords = builder.observeQueryMaxRecords;
}
Expand Down Expand Up @@ -126,9 +131,10 @@ public static DataStoreConfiguration defaults() throws DataStoreException {
.syncInterval(DEFAULT_SYNC_INTERVAL_MINUTES, TimeUnit.MINUTES)
.syncPageSize(DEFAULT_SYNC_PAGE_SIZE)
.syncMaxRecords(DEFAULT_SYNC_MAX_RECORDS)
.doSyncRetry(DEFAULT_DO_SYNC_RETRY)
.observeQueryMaxTime(MAX_TIME_SEC)
.observeQueryMaxRecords(MAX_RECORDS)
.doSyncRetry(DEFAULT_DO_SYNC_RETRY)
.observeQueryMaxTime(MAX_TIME_SEC)
.observeQueryMaxRecords(MAX_RECORDS)
.syncMaxConcurrentModels(DEFAULT_SYNC_MAX_CONCURRENT_MODELS)
.build();
}

Expand Down Expand Up @@ -201,6 +207,23 @@ public Boolean getDoSyncRetry() {
return this.doSyncRetry;
}

/**
* Gets the number of models that are allowed to concurrently sync.
* NOTE: This value will not be used if any models have associations, instead, the default (1)
* will be used.
* Setting this number to a high value requires that the developer ensure app memory is not a
* concern. If the expected sync data contains a large number of models, with a large number
* of records per model, the concurrency limit should be set to a conservative value. However,
* if the expected sync data contains a large number of models, with a small amount of data in
* each model, setting this limit to a high value will greatly improve sync speeds.
* @return Limit to the number of models that can sync concurrently
*/
@IntRange(from = 1)
@NonNull
public Integer getSyncMaxConcurrentModels() {
return syncMaxConcurrentModels;
}

/**
* Returns the Map of all {@link DataStoreSyncExpression}s used to filter data received from AppSync, either during
* a sync or over the real-time subscription.
Expand Down Expand Up @@ -247,6 +270,9 @@ public boolean equals(@Nullable Object thatObject) {
if (!ObjectsCompat.equals(getObserveQueryMaxRecords(), that.getObserveQueryMaxRecords())) {
return false;
}
if (!ObjectsCompat.equals(getSyncMaxConcurrentModels(), that.getSyncMaxConcurrentModels())) {
return false;
}
return true;
}

Expand All @@ -261,6 +287,7 @@ public int hashCode() {
result = 31 * result + getDoSyncRetry().hashCode();
result = 31 * result + (getObserveQueryMaxRecords() != null ? getObserveQueryMaxRecords().hashCode() : 0);
result = 31 * result + getMaxTimeLapseForObserveQuery().hashCode();
result = 31 * result + getSyncMaxConcurrentModels().hashCode();
return result;
}

Expand All @@ -273,9 +300,10 @@ public String toString() {
", syncPageSize=" + syncPageSize +
", syncIntervalInMinutes=" + syncIntervalInMinutes +
", syncExpressions=" + syncExpressions +
", doSyncRetry=" + doSyncRetry +
", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery +
", observeQueryMaxRecords=" + observeQueryMaxRecords +
", doSyncRetry=" + doSyncRetry +
", maxTimeRelapseForObserveQuery=" + maxTimeLapseForObserveQuery +
", observeQueryMaxRecords=" + observeQueryMaxRecords +
", syncMaxConcurrentModels=" + syncMaxConcurrentModels +
'}';
}

Expand Down Expand Up @@ -309,6 +337,7 @@ public static final class Builder {
private Integer syncMaxRecords;
private Integer syncPageSize;
private boolean doSyncRetry;
private Integer syncMaxConcurrentModels;
private Map<String, DataStoreSyncExpression> syncExpressions;
private boolean ensureDefaults;
private JSONObject pluginJson;
Expand Down Expand Up @@ -429,6 +458,24 @@ public Builder syncPageSize(@IntRange(from = 0) Integer syncPageSize) {
return Builder.this;
}

/**
* Sets the max concurrency limit for model syncing. Default is 1
* NOTE: If any sync models have associations, this value will be unused and the default (1)
* will be used.
* Setting this number to a high value requires that the developer ensure app memory is not a
* concern. If the expected sync data contains a large number of models, with a large number
* of records per model, the concurrency limit should be set to a conservative value. However,
* if the expected sync data contains a large number of models, with a small amount of data in
* each model, setting this limit to a high value will greatly improve sync speeds.
* @param syncMaxConcurrentModels Number of models that can sync concurrently
* @return Current builder
*/
@NonNull
public Builder syncMaxConcurrentModels(@IntRange(from = 1) Integer syncMaxConcurrentModels) {
this.syncMaxConcurrentModels = syncMaxConcurrentModels;
return Builder.this;
}

/**
* Sets a sync expression for a particular model to filter which data is synced locally.
* The expression is evaluated each time DataStore is started.
Expand Down Expand Up @@ -518,6 +565,10 @@ private void applyUserProvidedConfiguration() {
syncPageSize = getValueOrDefault(userProvidedConfiguration.getSyncPageSize(), syncPageSize);
syncExpressions = userProvidedConfiguration.getSyncExpressions();
doSyncRetry = getValueOrDefault(userProvidedConfiguration.getDoSyncRetry(), doSyncRetry);
syncMaxConcurrentModels = getValueOrDefault(
userProvidedConfiguration.getSyncMaxConcurrentModels(),
syncMaxConcurrentModels
);
observeQueryMaxRecords = getValueOrDefault(userProvidedConfiguration.getObserveQueryMaxRecords(),
observeQueryMaxRecords);
maxTimeLapseForObserveQuery = userProvidedConfiguration.getMaxTimeLapseForObserveQuery()
Expand Down Expand Up @@ -548,6 +599,10 @@ public DataStoreConfiguration build() throws DataStoreException {
syncIntervalInMinutes = getValueOrDefault(syncIntervalInMinutes, DEFAULT_SYNC_INTERVAL_MINUTES);
syncMaxRecords = getValueOrDefault(syncMaxRecords, DEFAULT_SYNC_MAX_RECORDS);
syncPageSize = getValueOrDefault(syncPageSize, DEFAULT_SYNC_PAGE_SIZE);
syncMaxConcurrentModels = getValueOrDefault(
syncMaxConcurrentModels,
DEFAULT_SYNC_MAX_CONCURRENT_MODELS
);
observeQueryMaxRecords = getValueOrDefault(observeQueryMaxRecords, MAX_RECORDS);
maxTimeLapseForObserveQuery = maxTimeLapseForObserveQuery == 0 ? MAX_TIME_SEC :
maxTimeLapseForObserveQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,39 @@ Completable hydrate() {
TopologicalOrdering.forRegisteredModels(schemaRegistry, modelProvider);
Collections.sort(modelSchemas, ordering::compare);
ArrayList<String> toBeSyncedModelArray = new ArrayList<>();
boolean syncInParallel = true;
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
for (ModelSchema schema : modelSchemas) {
//Check to see if query predicate for this schema is not equal to none. This means customer does
// not want to sync the data for this model.
if (!QueryPredicates.none().equals(queryPredicateProvider.getPredicate(schema.getName()))) {
hydrationTasks.add(createHydrationTask(schema));
toBeSyncedModelArray.add(schema.getName());
if (!schema.getAssociations().isEmpty()) {
syncInParallel = false;
}
}
}

return Completable.concat(hydrationTasks)
int syncMaxConcurrentModels;
try {
syncMaxConcurrentModels = dataStoreConfigurationProvider
.getConfiguration()
.getSyncMaxConcurrentModels();
} catch (DataStoreException exception) {
syncMaxConcurrentModels = 1;
}
lawmicha marked this conversation as resolved.
Show resolved Hide resolved

Completable syncCompletable;
if (syncInParallel && syncMaxConcurrentModels > 1) {
syncCompletable = Completable.mergeDelayError(
Flowable.fromIterable(hydrationTasks),
syncMaxConcurrentModels
);
} else {
syncCompletable = Completable.concat(hydrationTasks);
lawmicha marked this conversation as resolved.
Show resolved Hide resolved
}

return syncCompletable
.doOnSubscribe(ignore -> {
// This is where we trigger the syncQueriesStarted event since
// doOnSubscribe means that all upstream hydration tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void testDefaultConfiguration() throws DataStoreException {
dataStoreConfiguration.getSyncMaxRecords().intValue());
assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE,
dataStoreConfiguration.getSyncPageSize().intValue());
assertEquals(DataStoreConfiguration.DEFAULT_SYNC_MAX_CONCURRENT_MODELS,
dataStoreConfiguration.getSyncMaxConcurrentModels().intValue());

assertTrue(dataStoreConfiguration.getConflictHandler() instanceof AlwaysApplyRemoteHandler);
assertTrue(dataStoreConfiguration.getErrorHandler() instanceof DefaultDataStoreErrorHandler);
Expand Down Expand Up @@ -107,6 +109,7 @@ public void testDefaultOverriddenFromConfigurationAndObject()
long expectedSyncIntervalMinutes = 6L;
Long expectedSyncIntervalMs = TimeUnit.MINUTES.toMillis(expectedSyncIntervalMinutes);
Integer expectedSyncMaxRecords = 3;
Integer expectedSyncMaxConcurrentModels = 5;
DummyConflictHandler dummyConflictHandler = new DummyConflictHandler();
DataStoreErrorHandler errorHandler = DefaultDataStoreErrorHandler.instance();

Expand All @@ -121,7 +124,8 @@ public void testDefaultOverriddenFromConfigurationAndObject()
.errorHandler(errorHandler)
.syncExpression(BlogOwner.class, ownerSyncExpression)
.syncExpression("Post", postSyncExpression)
.doSyncRetry(true)
.doSyncRetry(true)
.syncMaxConcurrentModels(expectedSyncMaxConcurrentModels)
.build();

JSONObject jsonConfigFromFile = new JSONObject()
Expand All @@ -132,6 +136,7 @@ public void testDefaultOverriddenFromConfigurationAndObject()

assertEquals(expectedSyncIntervalMs, dataStoreConfiguration.getSyncIntervalMs());
assertEquals(expectedSyncMaxRecords, dataStoreConfiguration.getSyncMaxRecords());
assertEquals(expectedSyncMaxConcurrentModels, dataStoreConfiguration.getSyncMaxConcurrentModels());
assertEquals(DataStoreConfiguration.DEFAULT_SYNC_PAGE_SIZE,
dataStoreConfiguration.getSyncPageSize().longValue());
assertTrue(dataStoreConfiguration.getDoSyncRetry());
Expand Down
Loading
Loading