Skip to content

Commit

Permalink
Fix original shard level unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jan 8, 2024
1 parent 0ce22b1 commit 95a866a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -366,6 +367,98 @@ public void testPrimaryRestart() throws Exception {
}
}

/**
* This test validates that unreferenced on disk file are ignored while requesting files from replication source to
* prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
* committing locally so that in next round of segment replication those files are not considered for download again
*/
public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

shards.indexDocs(10);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch latch = new CountDownLatch(1);

logger.info("--> Starting first round of replication");
// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
latch.countDown();
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
latch.countDown();
assertFalse(sendShardFailure);
logger.error("Replication error", e);
}
}
);
latch.await();
Set<String> onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll()));
onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS));
List<String> activeFiles = replica.getSegmentMetadataMap()
.values()
.stream()
.map(metadata -> metadata.name())
.collect(Collectors.toList());
assertTrue("Files should not be committed", activeFiles.isEmpty());
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
logger.info("--> Starting second round of replication");
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
waitForSecondRound.countDown();
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
waitForSecondRound.countDown();
logger.error("Replication error", e);
Assert.fail("Replication should not fail");
}
}
);
waitForSecondRound.await();
assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE);
activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList());
assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles));
shards.removeReplica(replica);
closeShards(replica);
}
}

/**
* This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the
* replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and
Expand Down Expand Up @@ -469,8 +562,19 @@ public void getSegmentFiles(
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
postGetFilesRunnable.run();
StepListener<GetSegmentFilesResponse> waitForCopyFilesListener = new StepListener();
super.getSegmentFiles(
replicationId,
checkpoint,
filesToFetch,
indexShard,
(fileName, bytesRecovered) -> {},
waitForCopyFilesListener
);
waitForCopyFilesListener.whenComplete(response -> {
postGetFilesRunnable.run();
listener.onResponse(response);
}, listener::onFailure);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreTests;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
import org.junit.Assert;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -76,6 +81,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {

private static final Map<String, StoreFileMetadata> SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF);

private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
);

private SegmentInfos testSegmentInfos;

@Override
Expand Down Expand Up @@ -431,10 +441,7 @@ public void test_MissingFiles_NotCausingFailure() throws IOException {
// Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
// generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
// snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard);
// Delete on-disk files so that they are not considered for file diff
deleteContent(spyIndexShard.store().directory());
spyIndexShard.store().close();
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount);

SegmentReplicationSource segrepSource = new TestReplicationSource() {
@Override
Expand Down Expand Up @@ -479,72 +486,14 @@ public void onFailure(Exception e) {
});
}

/**
* This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff
* from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter
* out on-disk files be exercised.
* @throws IOException if an indexing operation fails or segment replication fails
*/
public void test_OnDiskFiles_ReusedForReplication() throws IOException {
int docCount = 1 + random().nextInt(10);
// Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files
// generated due to delete operations. These two snapshots can then be used in test to mock the primary shard
// snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element).
List<Store.MetadataSnapshot> storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard);

SegmentReplicationSource segrepSource = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy()));
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
// No files should be requested from replication source
assertEquals(0, filesToFetch.size());
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};

segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, mock(
SegmentReplicationTargetService.SegmentReplicationListener.class
));
// Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised
when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap());
segrepTarget.startReplication(new ActionListener<Void>() {
@Override
public void onResponse(Void replicationResponse) {
segrepTarget.markAsDone();
}

@Override
public void onFailure(Exception e) {
Assert.fail();
}
});
}


/**
* Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* operation. A list of snapshots is returned so that identical files have same checksum.
* @param docCount the number of documents to index in the first snapshot
* @param shard The IndexShard object to use for writing
* @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete
* @throws IOException if one of the indexing operations fails
*/
private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException {
private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount) throws IOException {
List<Document> docList = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
Document document = new Document();
Expand All @@ -558,7 +507,8 @@ private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount,
IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec());
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(true);
Store store = shard.store();
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
for (Document d : docList) {
writer.addDocument(d);
Expand All @@ -569,6 +519,7 @@ private List<Store.MetadataSnapshot> generateStoreMetadataSnapshot(int docCount,
writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount))));
writer.commit();
Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata();
deleteContent(store.directory());
writer.close();
store.close();
return Arrays.asList(storeMetadata, storeMetadataWithDeletes);
Expand Down

0 comments on commit 95a866a

Please sign in to comment.