Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More consistent index template allow list behavior #1174

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MetadataMigration/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ If your target cluster has basic auth enabled on it, you can supply those creden

### Allowlisting the templates and indices to migrate

By default, the tool has an empty allowlist for templates, meaning none will be migrated. In contrast, the default allowlist for indices is open, meaning all non-system indices (those not prefixed with `.`) will be migrated. You can tweak these allowlists with a comma-separated list of items you specifically with to migrate. If you specify an custom allowlist for the templates or indices, the default allowlist is disregarded and **only** the items you have in your allowlist will be moved.
By default, allowlist for indices and all templates types is open, meaning all non-system indices (those not prefixed with `.`) will be migrated. You can tweak these allowlists with a comma-separated list of items you specifically with to migrate. If you specify an custom allowlist for the templates or indices, the default allowlist is disregarded and **only** the items you have in your allowlist will be moved.

```shell
./gradlew MetadataMigration:run --args='--snapshot-name reindex-from-snapshot --s3-local-dir /tmp/s3_files --s3-repo-uri s3://your-s3-uri --s3-region us-fake-1 --target-host http://hostname:9200 --index-allowlist Index1,.my_system_index,logs-2023 --index-template-allowlist logs_template --component-template-allowlist component2,component7'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

public class FilterScheme {
private FilterScheme() {}

public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(
List<String> indexAllowlist,
BiConsumer<String, Boolean> indexNameAcceptanceObserver
) {
return index -> {
public static Predicate<String> filterByAllowList(List<String> allowlist) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the logging features, IMO much cleaner to have the source of the filter know how/when to log directly.

return item -> {
boolean accepted;
if (indexAllowlist.isEmpty()) {
accepted = !index.getName().startsWith(".");
// By default allow all items except 'system' items that start with a period
if (allowlist == null || allowlist.isEmpty()) {
accepted = !item.startsWith(".");
} else {
accepted = indexAllowlist.contains(index.getName());
accepted = allowlist.contains(item);
}

indexNameAcceptanceObserver.accept(index.getName(), accepted);

return accepted;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ public class DataFilterArgs {

@Parameter(names = {
"--index-template-allowlist" }, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {
"--component-template-allowlist" }, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
+ " (e.g. 'posts_template1, posts_template2'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> componentTemplateAllowlist = List.of();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.metadata.CreationResult;
Expand Down Expand Up @@ -35,14 +36,13 @@ public GlobalMetadataCreatorResults create(
log.info("Setting Global Metadata");

var results = GlobalMetadataCreatorResults.builder();
GlobalMetadataData_OS_2_11 globalMetadata = new GlobalMetadataData_OS_2_11(root.toObjectNode());
results.legacyTemplates(createLegacyTemplates(globalMetadata, mode, context));
results.componentTemplates(createComponentTemplates(globalMetadata, mode, context));
results.indexTemplates(createIndexTemplates(globalMetadata, mode, context));
results.legacyTemplates(createLegacyTemplates(root, mode, context));
results.componentTemplates(createComponentTemplates(root, mode, context));
results.indexTemplates(createIndexTemplates(root, mode, context));
return results.build();
}

public List<CreationResult> createLegacyTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createLegacyTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getTemplates(),
legacyTemplateAllowlist,
Expand All @@ -52,7 +52,7 @@ public List<CreationResult> createLegacyTemplates(GlobalMetadataData_OS_2_11 met
);
}

public List<CreationResult> createComponentTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createComponentTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getComponentTemplates(),
componentTemplateAllowlist,
Expand All @@ -62,7 +62,7 @@ public List<CreationResult> createComponentTemplates(GlobalMetadataData_OS_2_11
);
}

public List<CreationResult> createIndexTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createIndexTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getIndexTemplates(),
indexTemplateAllowlist,
Expand All @@ -73,7 +73,7 @@ public List<CreationResult> createIndexTemplates(GlobalMetadataData_OS_2_11 meta
}

@AllArgsConstructor
private enum TemplateTypes {
enum TemplateTypes {
INDEX_TEMPLATE(
(targetClient, name, body, context) -> targetClient.createIndexTemplate(name, body, context.createMigrateTemplateContext()),
(targetClient, name) -> targetClient.hasIndexTemplate(name)
Expand Down Expand Up @@ -118,49 +118,41 @@ private List<CreationResult> createTemplates(
return List.of();
}

if (templateAllowlist != null && templateAllowlist.isEmpty()) {
log.info("No {} in specified allowlist", templateType);
return List.of();
}
var templatesToCreate = getAllTemplates(templates, templateType);

var templatesToCreate = getTemplatesToCreate(templates, templateAllowlist, templateType);

return processTemplateCreation(templatesToCreate, templateType, mode, context);
return processTemplateCreation(templatesToCreate, templateType, templateAllowlist, mode, context);
}

private Map<String, ObjectNode> getTemplatesToCreate(ObjectNode templates, List<String> templateAllowlist, TemplateTypes templateType) {
Map<String, ObjectNode> getAllTemplates(ObjectNode templates, TemplateTypes templateType) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the filtering from this method because it prevented reporting templates that were not migrated.

var templatesToCreate = new HashMap<String, ObjectNode>();

if (templateAllowlist != null) {
for (String templateName : templateAllowlist) {
if (!templates.has(templateName) || templates.get(templateName) == null) {
log.warn("{} not found: {}", templateType, templateName);
continue;
}
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
}
} else {
templates.fieldNames().forEachRemaining(templateName -> {
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
});
}
templates.fieldNames().forEachRemaining(templateName -> {
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
});

return templatesToCreate;
}

private List<CreationResult> processTemplateCreation(
Map<String, ObjectNode> templatesToCreate,
TemplateTypes templateType,
List<String> templateAllowList,
MigrationMode mode,
IClusterMetadataContext context
) {
var skipCreation = FilterScheme.filterByAllowList(templateAllowList).negate();

List<CreationResult> templateList = new ArrayList<>();

templatesToCreate.forEach((templateName, templateBody) -> {
return templatesToCreate.entrySet().stream().map((kvp) -> {
var templateName = kvp.getKey();
var templateBody = kvp.getValue();
var creationResult = CreationResult.builder().name(templateName);

if (skipCreation.test(templateName)) {
log.atInfo().setMessage("Template {} was skipped due to allowlist filter {}").addArgument(templateName).addArgument(templateAllowList).log();
return creationResult.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER).build();
}

log.info("Creating {}: {}", templateType, templateName);
try {
if (mode == MigrationMode.SIMULATE) {
Expand All @@ -179,9 +171,7 @@ private List<CreationResult> processTemplateCreation(
creationResult.failureType(CreationFailureType.TARGET_CLUSTER_FAILURE);
creationResult.exception(e);
}
templateList.add(creationResult.build());
});

return templateList;
return creationResult.build();
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.opensearch.migrations.bulkload.worker;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
Expand All @@ -29,52 +27,50 @@ public class IndexRunner {

public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexContext context) {
var repoDataProvider = metadataFactory.getRepoDataProvider();
BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (Boolean.FALSE.equals(accepted)) {
log.atInfo().setMessage("Index {} rejected by allowlist").addArgument(indexName).log();
}
};
var results = IndexMetadataResults.builder();

// Set results for filtered items
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(Predicate.not(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)))
.forEach(index -> results.index(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build()));

var skipCreation = FilterScheme.filterByAllowList(indexAllowlist).negate();

repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.forEach(index -> {
var indexName = index.getName();
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);

CreationResult indexResult = null;
var indexMetadata = originalIndexMetadata.deepCopy();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
indexResult = indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
indexResult = CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
CreationResult creationResult;
if (skipCreation.test(index.getName())) {
log.atInfo()
.setMessage("Index {} was not part of the allowlist and will not be migrated.")
.addArgument(index.getName())
.log();
creationResult = CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build();
} else {
creationResult = createIndex(index.getName(), mode, context);
}

var finalResult = indexResult;
results.index(finalResult);
results.index(creationResult);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(finalResult.getFailureType());
aliasResult.failureType(creationResult.getFailureType());
results.alias(aliasResult.build());
});
});
return results.build();
}

private CreationResult createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var indexMetadata = originalIndexMetadata.deepCopy();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
return indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
return CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;

import org.opensearch.migrations.bulkload.common.FilterScheme;
Expand Down Expand Up @@ -99,24 +98,37 @@ private static void prepareShardWorkItems(
List<String> indexAllowlist,
IDocumentMigrationContexts.IShardSetupAttemptContext context
) {
log.info("Setting up the Documents Work Items...");
log.atInfo()
.setMessage("Setting up the Documents Work Items...")
.log();
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider();

BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (Boolean.FALSE.equals(accepted)) {
log.info("Index " + indexName + " rejected by allowlist");
}
};
var allowedIndexes = FilterScheme.filterByAllowList(indexAllowlist);
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.filter(index -> {
var accepted = allowedIndexes.test(index.getName());
if (!accepted) {
log.atInfo()
.setMessage("None of the documents in index {} will be reindexed, it was not included in the allowlist: {} ")
.addArgument(index.getName())
.addArgument(indexAllowlist)
.log();
}
return accepted;
})
.forEach(index -> {
IndexMetadata indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards");
log.atInfo()
.setMessage("Index {} has {} shards")
.addArgument(indexMetadata.getName())
.addArgument(indexMetadata.getNumberOfShards())
.log();
IntStream.range(0, indexMetadata.getNumberOfShards()).forEach(shardId -> {
log.info(
"Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId
);
log.atInfo()
.setMessage("Creating Documents Work Item for index: {}, shard: {}")
.addArgument(indexMetadata.getName())
.addArgument(shardId)
.log();
try (var shardSetupContext = context.createShardWorkItemContext()) {
workCoordinator.createUnassignedWorkItem(
IndexAndShardCursor.formatAsWorkItemString(indexMetadata.getName(), shardId),
Expand All @@ -128,6 +140,8 @@ private static void prepareShardWorkItems(
});
});

log.info("Finished setting up the Documents Work Items.");
log.atInfo()
.setMessage("Finished setting up the Documents Work Items.")
.log();
}
}
Loading
Loading