Skip to content

Commit

Permalink
Merge pull request #1164 from kanatti/main
Browse files Browse the repository at this point in the history
[RFS] Use shard routing from source while bulk indexing.
  • Loading branch information
AndreKurait authored Nov 25, 2024
2 parents 6217e1c + b5a21d6 commit 1c23ab4
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Tag("isolatedTest")
public class EndToEndTest extends SourceTestBase {
Expand Down Expand Up @@ -75,8 +77,31 @@ private void migrationDocumentsWithClusters(
bothClustersStarted.join();

var indexName = "blog_2023";
var numberOfShards = 3;
var sourceClusterOperations = new ClusterOperations(sourceCluster.getUrl());
var targetClusterOperations = new ClusterOperations(targetCluster.getUrl());

// Number of default shards is different across different versions on ES/OS.
// So we explicitly set it.
String body = String.format(
"{" +
" \"settings\": {" +
" \"index\": {" +
" \"number_of_shards\": %d," +
" \"number_of_replicas\": 0" +
" }" +
" }" +
"}",
numberOfShards
);
sourceClusterOperations.createIndex(indexName, body);
targetClusterOperations.createIndex(indexName, body);


sourceClusterOperations.createDocument(indexName, "222", "{\"author\":\"Tobias Funke\"}");
sourceClusterOperations.createDocument(indexName, "223", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1");
sourceClusterOperations.createDocument(indexName, "224", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1");
sourceClusterOperations.createDocument(indexName, "225", "{\"author\":\"Tobias Funke\", \"category\": \"tech\"}", "2");

// === ACTION: Take a snapshot ===
var snapshotName = "my_snap";
Expand All @@ -97,24 +122,57 @@ private void migrationDocumentsWithClusters(
var sourceRepo = new FileSystemRepo(localDirectory.toPath());

// === ACTION: Migrate the documents ===
var runCounter = new AtomicInteger();
final var clockJitter = new Random(1);
var result = migrateDocumentsWithOneWorker(
sourceRepo,
snapshotName,
List.of(),
targetCluster.getUrl(),
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
false

// ExpectedMigrationWorkTerminationException is thrown on completion.
var expectedTerminationException = Assertions.assertThrows(
ExpectedMigrationWorkTerminationException.class,
() -> migrateDocumentsSequentially(
sourceRepo,
snapshotName,
List.of(),
targetCluster.getUrl(),
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
false
)
);
assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED));

Assertions.assertEquals(numberOfShards + 1, expectedTerminationException.numRuns);

// Check that the docs were migrated
checkClusterMigrationOnFinished(sourceCluster, targetCluster, testDocMigrationContext);

// Check that that docs were migrated with routing
checkDocsWithRouting(sourceCluster, testDocMigrationContext);
checkDocsWithRouting(targetCluster, testDocMigrationContext);
} finally {
deleteTree(localDirectory.toPath());
}
}

private void checkDocsWithRouting(
SearchClusterContainer clusterContainer,
DocumentMigrationTestContext context) {
var clusterClient = new RestClient(ConnectionContextTestParams.builder()
.host(clusterContainer.getUrl())
.build()
.toConnectionContext()
);

// Check that search by routing works as expected.
var requests = new SearchClusterRequests(context);
var hits = requests.searchIndexByQueryString(clusterClient, "blog_2023", "category:cooking", "1");

Assertions.assertTrue(hits.isArray() && hits.size() == 2);

for (JsonNode hit : hits) {
String routing = hit.path("_routing").asText();
Assertions.assertEquals("1", routing);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ public class BulkDocSection {
private final BulkIndex bulkIndex;

public BulkDocSection(String id, String indexName, String type, String docBody) {
this(id, indexName, type, docBody, null);
}

public BulkDocSection(String id, String indexName, String type, String docBody, String routing) {
this.docId = id;
this.bulkIndex = new BulkIndex(new BulkIndex.Metadata(id, type, indexName), parseSource(docBody));
this.bulkIndex = new BulkIndex(new BulkIndex.Metadata(id, type, indexName, routing), parseSource(docBody));
}

private BulkDocSection(BulkIndex bulkIndex) {
Expand Down Expand Up @@ -124,6 +128,8 @@ private static class Metadata {
private final String type;
@JsonProperty("_index")
private final String index;
@JsonProperty("routing")
private final String routing;
}

public static class BulkIndexRequestSerializer extends JsonSerializer<BulkIndex> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexN

@SneakyThrows
BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) {
var original = new BulkDocSection(doc.id, indexName, doc.type, doc.source);
var original = new BulkDocSection(doc.id, indexName, doc.type, doc.source, doc.routing);
if (transformer != null) {
final Map<String,Object> transformedDoc = transformer.transformJson(original.toMap());
return BulkDocSection.fromMap(transformedDoc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
String id = null;
String type = null;
BytesRef sourceBytes = null;
String routing = null;

try {
for (var field : document.getFields()) {
String fieldName = field.name();
Expand All @@ -196,6 +198,10 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
sourceBytes = field.binaryValue();
break;
}
case "_routing": {
routing = field.stringValue();
break;
}
default:
break;
}
Expand Down Expand Up @@ -227,6 +233,6 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
}

log.atDebug().setMessage("Document {} read successfully").addArgument(id).log();
return new RfsLuceneDocument(id, type, sourceBytes.utf8ToString());
return new RfsLuceneDocument(id, type, sourceBytes.utf8ToString(), routing);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.opensearch.migrations.bulkload.common;

import lombok.RequiredArgsConstructor;
import lombok.AllArgsConstructor;

@RequiredArgsConstructor
@AllArgsConstructor
public class RfsLuceneDocument {
public final String id;
public final String type;
public final String source;
public final String routing;

public RfsLuceneDocument(String id, String type, String source) {
this(id, type, source, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,94 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class BulkDocSectionTest {
static final Map<String, Object> METADATA_1 = Map.of(
"_id", "test-id",
"_index", "test-index",
"_type", "_doc");

@Test
void testConvertToBulkRequestBody() {
String id1 = "id1";
String indexName1 = "index1";
String type1 = "_doc";
String docBody1 = "{\"field\":\"value1\"}";
static final Map<String, Object> METADATA_2 = Map.of(
"_id", "test-id",
"_index", "test-index",
"_type", "_doc",
"routing", "routing1");

String id2 = "id2";
String indexName2 = "index2";
String type2 = "_doc";
String docBody2 = "{\"field\":\"value2\"}";
static final Map<String, Object> SOURCE_DOC_1 = Map.of("field", "value");

BulkDocSection section1 = new BulkDocSection(id1, indexName1, type1, docBody1);
BulkDocSection section2 = new BulkDocSection(id2, indexName2, type2, docBody2);
static final BulkDocSection BULK_DOC_SECTION_1 = new BulkDocSection("test-id", "test-index", "_doc",
"{\"field\":\"value\"}");

Collection<BulkDocSection> bulkSections = Arrays.asList(section1, section2);
static final BulkDocSection BULK_DOC_SECTION_2 = new BulkDocSection("test-id", "test-index", "_doc",
"{\"field\":\"value\"}", "routing1");

String bulkRequestBody = BulkDocSection.convertToBulkRequestBody(bulkSections);
static final String BULK_DOC_SECTION_1_STRING = "{\"index\":{\"_id\":\"test-id\",\"_type\":\"_doc\",\"_index\":\"test-index\"}}\n"
+ "{\"field\":\"value\"}";

static final String BULK_DOC_SECTION_2_STRING = "{\"index\":{\"_id\":\"test-id\",\"_type\":\"_doc\",\"_index\":\"test-index\",\"routing\":\"routing1\"}}\n"
+ "{\"field\":\"value\"}";

static Stream<Arguments> provideFromMapArgs() {
return Stream.of(
Arguments.of(METADATA_1, SOURCE_DOC_1),
Arguments.of(METADATA_2, SOURCE_DOC_1));
}

static Stream<BulkDocSection> provideSerializedLengthArgs() {
return Stream.of(
BULK_DOC_SECTION_1,
BULK_DOC_SECTION_2);
}

assertEquals("{\"index\":{\"_id\":\"id1\",\"_type\":\"_doc\",\"_index\":\"index1\"}}\n" +
"{\"field\":\"value1\"}\n" +
"{\"index\":{\"_id\":\"id2\",\"_type\":\"_doc\",\"_index\":\"index2\"}}\n" +
"{\"field\":\"value2\"}\n", bulkRequestBody);
static Stream<Arguments> provideBulkIndexStringArgs() {
return Stream.of(
Arguments.of(BULK_DOC_SECTION_1, BULK_DOC_SECTION_1_STRING),
Arguments.of(BULK_DOC_SECTION_2, BULK_DOC_SECTION_2_STRING));
}

static Stream<Arguments> provideToMapArgs() {
return Stream.of(
Arguments.of(BULK_DOC_SECTION_1, METADATA_1, SOURCE_DOC_1),
Arguments.of(BULK_DOC_SECTION_2, METADATA_2, SOURCE_DOC_1));
}

@Test
void testFromMap() {
Map<String, Object> metadata = new HashMap<>();
metadata.put("_id", "test-id");
metadata.put("_index", "test-index");
metadata.put("_type", "_doc");
void testConvertToBulkRequestBody() {
BulkDocSection section1 = new BulkDocSection("id1", "index1", "_doc", "{\"field\":\"value1\"}");
BulkDocSection section2 = new BulkDocSection("id2", "index2", "_doc", "{\"field\":\"value2\"}");
BulkDocSection section3 = new BulkDocSection("id3", "index3", "_doc", "{\"field\":\"value3\"}", "routing1");

Map<String, Object> sourceDoc = new HashMap<>();
sourceDoc.put("field", "value");
Collection<BulkDocSection> bulkSections = Arrays.asList(section1, section2, section3);

String bulkRequestBody = BulkDocSection.convertToBulkRequestBody(bulkSections);

String expectedRequestBody = "{"
+ "\"index\":{\"_id\":\"id1\",\"_type\":\"_doc\",\"_index\":\"index1\"}}\n"
+ "{\"field\":\"value1\"}\n"
+ "{\"index\":{\"_id\":\"id2\",\"_type\":\"_doc\",\"_index\":\"index2\"}}\n"
+ "{\"field\":\"value2\"}\n"
+ "{\"index\":{\"_id\":\"id3\",\"_type\":\"_doc\",\"_index\":\"index3\",\"routing\":\"routing1\"}}\n"
+ "{\"field\":\"value3\"}\n";

assertEquals(expectedRequestBody, bulkRequestBody);
}

@ParameterizedTest
@MethodSource("provideFromMapArgs")
void testFromMap(Map<String, Object> metadata, Map<String, Object> sourceDoc) {
Map<String, Object> indexMap = new HashMap<>();
indexMap.put("index", metadata);
indexMap.put("source", sourceDoc);
Expand All @@ -64,48 +105,29 @@ void testFromMap() {
assertEquals(sourceDoc, bulkDocSection.toMap().get("source"));
}

@Test
void testGetSerializedLength() {
String id = "test-id";
String indexName = "test-index";
String type = "_doc";
String docBody = "{\"field\":\"value\"}";

BulkDocSection bulkDocSection = new BulkDocSection(id, indexName, type, docBody);
@ParameterizedTest
@MethodSource("provideSerializedLengthArgs")
void testGetSerializedLength(BulkDocSection bulkDocSection) {
assertEquals(bulkDocSection.asBulkIndexString().length(), bulkDocSection.getSerializedLength());
}

@Test
void testAsBulkIndexString() {
String id = "test-id";
String indexName = "test-index";
String type = "_doc";
String docBody = "{\"field\":\"value\"}";

BulkDocSection bulkDocSection = new BulkDocSection(id, indexName, type, docBody);

@ParameterizedTest
@MethodSource("provideBulkIndexStringArgs")
void testAsBulkIndexString(BulkDocSection bulkDocSection, String expected) {
String asString = bulkDocSection.asBulkIndexString();

assertEquals("{\"index\":{\"_id\":\"test-id\",\"_type\":\"_doc\",\"_index\":\"test-index\"}}\n" +
"{\"field\":\"value\"}", asString);
assertEquals(expected, asString);
}

@Test
void testToMap() {
String id = "test-id";
String indexName = "test-index";
String type = "_doc";
String docBody = "{\"field\":\"value\"}";

BulkDocSection bulkDocSection = new BulkDocSection(id, indexName, type, docBody);

@ParameterizedTest
@MethodSource("provideToMapArgs")
void testToMap(BulkDocSection bulkDocSection, Map<String, Object> metaData, Map<String, Object> source) {
Map<String, Object> map = bulkDocSection.toMap();

assertNotNull(map);
assertEquals(Map.of("_index",indexName,
"_type", type,
"_id", id ), map.get("index"));
assertEquals(Map.of("field","value"), map.get("source"));
assertEquals(metaData, map.get("index"));
assertEquals(source, map.get("source"));
}

@Test
Expand Down
Loading

0 comments on commit 1c23ab4

Please sign in to comment.