Skip to content

Commit

Permalink
Revert "Revert "Adding subshard work items on lease expiry" (opensear…
Browse files Browse the repository at this point in the history
…ch-project#1183)"

This reverts commit b055711
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 9, 2024
1 parent 8c95468 commit a7c48e2
Show file tree
Hide file tree
Showing 28 changed files with 608 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
log.atTrace().setMessage("Created doc for index {}: {}")
.addArgument(indexName)
.addArgument(doc::toString).log();
return new BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), indexName, null, doc.toString());
var docId = docIdCounter.incrementAndGet();
return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null);
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.migrations;

import java.time.Duration;
import java.time.Instant;

import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


class RfsMigrateDocumentsTest {


private static class TestClass extends RfsMigrateDocuments {
public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
return RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);
}
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

// Lease at 40 minutes, shard prep 59 seconds, successor lease should be decreased since shard prep is < 2.5%
// and exponent is > 0
var existingLeaseExponent = 2;
var shardPrepTime = Duration.ofSeconds(59);
Duration initialLeaseDuration = Duration.ofMinutes(10);
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent - 1, successorNextAcquisitionLeaseExponent, "Should decrease successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThresholdWith0Exponent() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(1);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(0, successorNextAcquisitionLeaseExponent, "Should return 0 for successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(59);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is less than 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_EqualToUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(60);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent when shard prep time is equal to 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_ExceedsUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(61);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent + 1, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is greater than to 10% of lease duration");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -126,18 +127,21 @@ private void migrationDocumentsWithClusters(
final var clockJitter = new Random(1);

// ExpectedMigrationWorkTerminationException is thrown on completion.
var expectedTerminationException = Assertions.assertThrows(
ExpectedMigrationWorkTerminationException.class,
() -> migrateDocumentsSequentially(
sourceRepo,
snapshotName,
List.of(),
targetCluster.getUrl(),
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
false
var expectedTerminationException = Assertions.assertTimeout(
Duration.ofSeconds(30),
() -> Assertions.assertThrows(
ExpectedMigrationWorkTerminationException.class,
() -> migrateDocumentsSequentially(
sourceRepo,
snapshotName,
List.of(),
targetCluster.getUrl(),
runCounter,
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
false
)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ protected DirectoryReader getReader() {
}

@Override
protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean isLive) {
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) {
ingestedDocuments.incrementAndGet();
return super.getDocument(reader, docId, isLive);
return super.getDocument(reader, luceneDocId, isLive, segmentDocBase);
}
};

Expand All @@ -92,7 +92,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
return null;
}).subscribeOn(blockingScheduler)
.then(Mono.just(response))
.doOnTerminate(blockingScheduler::dispose);
.doFinally(s -> blockingScheduler.dispose());
});

// Create DocumentReindexer
Expand All @@ -107,7 +107,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i

// Start reindexing in a separate thread
Thread reindexThread = new Thread(() -> {
reindexer.reindex("test-index", reader.readDocuments(), mockContext).block();
reindexer.reindex("test-index", reader.readDocuments(), mockContext).then().block();
});
reindexThread.start();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.opensearch.migrations.bulkload;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
Expand Down Expand Up @@ -244,35 +241,4 @@ private static ProcessBuilder setupProcess(
processBuilder.redirectOutput();
return processBuilder;
}

@NotNull
private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
var readerThread = new Thread(() -> {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log();
return;
}
String finalLine = line;
log.atInfo()
.setMessage("from sub-process [{}]: {}")
.addArgument(() -> process.toHandle().pid())
.addArgument(finalLine)
.log();
}
});

// Kill the process and fail if we have to wait too long
readerThread.start();
return process;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.opensearch.migrations.bulkload;


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
Expand All @@ -12,6 +14,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;

Expand All @@ -32,7 +35,9 @@
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.transform.TransformationLoader;
Expand All @@ -43,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import reactor.core.publisher.Flux;

Expand All @@ -60,6 +66,36 @@ protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVers
new String[] { "/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/" } };
}

@NotNull
protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
var readerThread = new Thread(() -> {
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) break;
} catch (IOException e) {
log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log();
return;
}
String finalLine = line;
log.atInfo()
.setMessage("from sub-process [{}]: {}")
.addArgument(() -> process.toHandle().pid())
.addArgument(finalLine)
.log();
}
});

// Kill the process and fail if we have to wait too long
readerThread.start();
return process;
}

@AllArgsConstructor
public static class ExpectedMigrationWorkTerminationException extends RuntimeException {
public final RfsMigrateDocuments.NoWorkLeftException exception;
Expand Down Expand Up @@ -141,8 +177,8 @@ public FilteredLuceneDocumentsReader(Path luceneFilesBasePath, boolean softDelet
}

@Override
public Flux<RfsLuceneDocument> readDocuments(int startSegmentIndex, int startDoc) {
return super.readDocuments(startSegmentIndex, startDoc).map(docTransformer::apply);
public Flux<RfsLuceneDocument> readDocuments(int startDoc) {
return super.readDocuments(startDoc).map(docTransformer);
}
}

Expand Down Expand Up @@ -191,6 +227,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(

var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);

AtomicReference<WorkItemCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
Expand All @@ -207,6 +244,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
.compressionEnabled(compressionEnabled)
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer),
progressCursor,
new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
Expand All @@ -224,7 +262,9 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
sourceResourceProvider.getShardMetadata(),
unpackerFactory,
MAX_SHARD_SIZE_BYTES,
context);
context,
new AtomicReference<>(),
new WorkItemTimeProvider());
}
} finally {
deleteTree(tempDir);
Expand Down
Loading

0 comments on commit a7c48e2

Please sign in to comment.