Skip to content

Commit

Permalink
Modify Jenkins Full E2E Integ Test to perform Transformations (#1182)
Browse files Browse the repository at this point in the history
Modifies integ test to add transformation to RFS and Metadata migration to rename an index and modify given RFS documents to go to a new index

After facing a lot of difficulty with crafting a transformation and passing as a command argument, added E2E test for DocumentMigration based off a structure recently introduced by Andre to make top level transformation testing in DocumentMigration much easier to test locally.

---------

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn authored Jan 3, 2025
1 parent 6ba12ba commit c45b35e
Show file tree
Hide file tree
Showing 10 changed files with 436 additions and 108 deletions.
1 change: 1 addition & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {
testImplementation group: 'org.testcontainers', name: 'toxiproxy'
testImplementation group: 'org.mockito', name: 'mockito-core'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.json', name: 'json'

testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package org.opensearch.migrations.bulkload;

import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Network;


@Slf4j
@Tag("isolatedTest")
public class CustomRfsTransformationTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";

@Test
public void testCustomTransformationProducesDesiredTargetClusterState() {
String nameTransformation = createIndexNameTransformation("geonames", "geonames_transformed");
var expectedSourceMap = new HashMap<String, Integer>();
expectedSourceMap.put("geonames", 1);
var expectedTargetMap = new HashMap<String, Integer>();
expectedTargetMap.put("geonames_transformed", 1);
String[] transformationArgs = {
"--doc-transformer-config",
nameTransformation,
};
int totalSourceShards = 1;
Consumer<ClusterOperations> loadDataIntoSource = cluster -> {
// Number of default shards is different across different versions on ES/OS.
// So we explicitly set it.
String body = String.format(
"{" +
" \"settings\": {" +
" \"index\": {" +
" \"number_of_shards\": %d," +
" \"number_of_replicas\": 0" +
" }" +
" }" +
"}",
totalSourceShards
);
cluster.createIndex("geonames", body);
cluster.createDocument("geonames", "111", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}");
};
runTestProcess(
transformationArgs,
expectedSourceMap,
expectedTargetMap,
loadDataIntoSource,
totalSourceShards,
SourceTestBase::runProcessAgainstTarget
);
}

@SneakyThrows
private void runTestProcess(
String[] transformationArgs,
Map<String, Integer> expectedSourceDocs,
Map<String, Integer> expectedTargetDocs,
Consumer<ClusterOperations> preloadDataOperations,
Integer numberOfShards,
Function<String[], Integer> processRunner)
{
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

try (
var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2)
.withAccessToHost(true);
var network = Network.newNetwork();
var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
.withAccessToHost(true)
.withNetwork(network)
.withNetworkAliases(TARGET_DOCKER_HOSTNAME);
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start)
).join();

var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl());
preloadDataOperations.accept(sourceClusterOperations);

// Create the snapshot from the source cluster
var args = new CreateSnapshot.Args();
args.snapshotName = SNAPSHOT_NAME;
args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR;
args.sourceArgs.host = esSourceContainer.getUrl();

var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext());
snapshotCreator.run();
esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

String[] processArgs = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
tempDirSnapshot.toString(),
"--lucene-dir",
tempDirLucene.toString(),
"--target-host",
osTargetContainer.getUrl(),
"--documents-per-bulk-request",
"5",
"--max-connections",
"4",
"--source-version",
"ES_7_10"
};
String[] completeArgs = Stream.concat(Arrays.stream(processArgs), Arrays.stream(transformationArgs)).toArray(String[]::new);

// Perform RFS process for each shard
for(int i = 0; i < numberOfShards; i++) {
int exitCode = processRunner.apply(completeArgs);
log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log();
}

// Assert doc count on the source and target cluster match expected
validateFinalClusterDocs(
esSourceContainer,
osTargetContainer,
DocumentMigrationTestContext.factory().noOtelTracking(),
expectedSourceDocs,
expectedTargetDocs
);
} finally {
deleteTree(tempDirSnapshot);
}
}

// Create a simple Jolt transform which matches documents of a given index name in a snapshot and changes that
// index name to a desired index name when migrated to the target cluster
private static String createIndexNameTransformation(String existingIndexName, String newIndexName) {
JSONArray rootArray = new JSONArray();
JSONObject firstObject = new JSONObject();
JSONArray jsonConditionalTransformerProvider = new JSONArray();

// JsonJMESPathPredicateProvider object
JSONObject jsonJMESPathPredicateProvider = new JSONObject();
jsonJMESPathPredicateProvider.put("script", String.format("index._index == '%s'", existingIndexName));
JSONObject jsonJMESPathPredicateWrapper = new JSONObject();
jsonJMESPathPredicateWrapper.put("JsonJMESPathPredicateProvider", jsonJMESPathPredicateProvider);
jsonConditionalTransformerProvider.put(jsonJMESPathPredicateWrapper);

JSONArray transformerList = new JSONArray();

// First JsonJoltTransformerProvider
JSONObject firstJoltTransformer = new JSONObject();
JSONObject firstJoltScript = new JSONObject();
firstJoltScript.put("operation", "modify-overwrite-beta");
firstJoltScript.put("spec", new JSONObject().put("index", new JSONObject().put("\\_index", newIndexName)));
firstJoltTransformer.put("JsonJoltTransformerProvider", new JSONObject().put("script", firstJoltScript));
transformerList.put(firstJoltTransformer);

jsonConditionalTransformerProvider.put(transformerList);
firstObject.put("JsonConditionalTransformerProvider", jsonConditionalTransformerProvider);
rootArray.put(firstObject);
return rootArray.toString();
}

private static void validateFinalClusterDocs(
SearchClusterContainer esSourceContainer,
SearchClusterContainer osTargetContainer,
DocumentMigrationTestContext context,
Map<String, Integer> expectedSourceDocs,
Map<String, Integer> expectedTargetDocs
) {
var targetClient = new RestClient(ConnectionContextTestParams.builder()
.host(osTargetContainer.getUrl())
.build()
.toConnectionContext()
);
var sourceClient = new RestClient(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);

var requests = new SearchClusterRequests(context);
var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient);
var refreshResponse = targetClient.get("_refresh", context.createUnboundRequestContext());
Assertions.assertEquals(200, refreshResponse.statusCode);
var targetMap = requests.getMapOfIndexAndDocCount(targetClient);

MatcherAssert.assertThat(sourceMap, Matchers.equalTo(expectedSourceDocs));
MatcherAssert.assertThat(targetMap, Matchers.equalTo(expectedTargetDocs));
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,7 +22,6 @@
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -124,8 +118,8 @@ private void testProcess(int expectedExitCode, Function<RunData, Integer> proces
var proxyContainer = new ToxiProxyWrapper(network)
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> esSourceContainer.start()),
CompletableFuture.runAsync(() -> osTargetContainer.start()),
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start),
CompletableFuture.runAsync(() -> proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT))
).join();

Expand Down Expand Up @@ -180,36 +174,7 @@ private static int runProcessAgainstToxicTarget(
}

int timeoutSeconds = 90;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress,
FailHow failHow
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
String[] processArgs = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
Expand All @@ -228,51 +193,21 @@ private static ProcessBuilder setupProcess(
"ES_7_10",
"--initial-lease-duration",
failHow == FailHow.NEVER ? "PT10M" : "PT1S" };
ProcessBuilder processBuilder = setupProcess(processArgs);

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}

@NotNull
private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
var readerThread = new Thread(() -> {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log();
return;
}
String finalLine = line;
log.atInfo()
.setMessage("from sub-process [{}]: {}")
.addArgument(() -> process.toHandle().pid())
.addArgument(finalLine)
.log();
var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
});
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

// Kill the process and fail if we have to wait too long
readerThread.start();
return process;
return process.exitValue();
}

}
Loading

0 comments on commit c45b35e

Please sign in to comment.