From 9010fcf6af95ca2d0b087130f760d10d1cf54f92 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 20 Dec 2024 11:59:32 -0600 Subject: [PATCH] Update DocumentReindexer to support 1:many transformations Signed-off-by: Andre Kurait --- .../migrations/RfsMigrateDocuments.java | 4 ++- .../bulkload/common/DocumentReindexer.java | 10 +++--- .../bulkload/common/RfsDocument.java | 32 +++++++++++++++---- .../common/DocumentReindexerTest.java | 2 +- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 667a1d8ce..593bc0060 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -472,9 +472,11 @@ private static ArrayList getSuccessorWorkItemIds(IWorkCoordinator.WorkIt throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null."); } var workItem = workItemAndDuration.getWorkItem(); + // Set successor as same last docId, this will ensure we process every document fully in cases where there is a 1:many doc split + var successorStartingDocId = progressCursor.getDocId(); var successorWorkItem = new IWorkCoordinator.WorkItemAndDuration .WorkItem(workItem.getIndexName(), workItem.getShardNumber(), - progressCursor.getDocId() + 1); + successorStartingDocId); ArrayList successorWorkItemIds = new ArrayList<>(); successorWorkItemIds.add(successorWorkItem.toString()); return successorWorkItemIds; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 1e0aed8f1..94ee11933 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -31,7 +31,7 @@ public Flux reindex(String indexName, Flux do var scheduler = Schedulers.newParallel("DocumentBulkAggregator"); var rfsDocs = documentStream .publishOn(scheduler, 1) - .map(doc -> transformDocument(doc, indexName)); + .concatMapIterable(doc -> transformDocument(doc, indexName)); return this.reindexDocsInParallelBatches(rfsDocs, indexName, context) .doFinally(s -> scheduler.dispose()); @@ -52,12 +52,12 @@ Flux reindexDocsInParallelBatches(Flux docs, String } @SneakyThrows - RfsDocument transformDocument(RfsLuceneDocument doc, String indexName) { - var finalDocument = RfsDocument.fromLuceneDocument(doc, indexName); + List transformDocument(RfsLuceneDocument doc, String indexName) { + var originalDoc = RfsDocument.fromLuceneDocument(doc, indexName); if (transformer != null) { - finalDocument = RfsDocument.transform(transformer::transformJson, finalDocument); + return RfsDocument.transform(transformer, originalDoc); } - return finalDocument; + return List.of(originalDoc); } /* diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java index cf775823e..0b2ff5fb6 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java @@ -1,7 +1,10 @@ package org.opensearch.migrations.bulkload.common; +import java.util.List; import java.util.Map; -import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import org.opensearch.migrations.transform.IJsonTransformer; import lombok.AllArgsConstructor; @@ -32,10 +35,27 @@ public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String index ); } - public static RfsDocument transform(UnaryOperator> transformer, RfsDocument doc) { - return new RfsDocument( - doc.luceneDocNumber, - BulkDocSection.fromMap(transformer.apply(doc.document.toMap())) - ); + @SuppressWarnings("unchecked") + public static List transform(IJsonTransformer transformer, RfsDocument doc) { + var transformedObject = transformer.transformJson(doc.document.toMap()); + if (transformedObject instanceof Map) { + Map transformedMap = (Map) transformedObject; + return List.of(new RfsDocument( + doc.luceneDocNumber, + BulkDocSection.fromMap(transformedMap) + )); + } else if (transformedObject instanceof List) { + var transformedList = (List>) transformedObject; + return transformedList.stream() + .map(item -> new RfsDocument( + doc.luceneDocNumber, + BulkDocSection.fromMap(item) + )) + .collect(Collectors.toList()); + } else { + throw new IllegalArgumentException( + "Unsupported transformed document type: " + transformedObject.getClass().getName() + ); + } } } diff --git a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java index eb3a31b03..fc7ddad00 100644 --- a/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java +++ b/RFS/src/test/java/org/opensearch/migrations/bulkload/common/DocumentReindexerTest.java @@ -132,7 +132,7 @@ void reindex_shouldBufferByTransformedSize() throws JsonProcessingException { // Set up the transformer that replaces the sourceDoc from the document var replacedSourceDoc = Map.of("simpleKey", "simpleValue"); IJsonTransformer transformer = originalJson -> { - originalJson.put("source", replacedSourceDoc); + ((Map) originalJson).put("source", replacedSourceDoc); return originalJson; }; int numDocs = 5;