diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 0561a7cedd44f..eacc504428ef1 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; +import org.opensearch.action.StepListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; @@ -366,6 +367,98 @@ public void testPrimaryRestart() throws Exception { } } + /** + * This test validates that unreferenced on disk file are ignored while requesting files from replication source to + * prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without + * committing locally so that in next round of segment replication those files are not considered for download again + */ + public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + shards.indexDocs(10); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + when(sourceFactory.get(any())).thenReturn( + getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) + ); + CountDownLatch latch = new CountDownLatch(1); + + logger.info("--> Starting first round of replication"); + // Start first round of segment replication. This should fail with simulated error but with replica having + // files in its local store but not in active reader. + final SegmentReplicationTarget target = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + latch.countDown(); + Assert.fail("Replication should fail with simulated error"); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + latch.countDown(); + assertFalse(sendShardFailure); + logger.error("Replication error", e); + } + } + ); + latch.await(); + Set onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll())); + onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS)); + List activeFiles = replica.getSegmentMetadataMap() + .values() + .stream() + .map(metadata -> metadata.name()) + .collect(Collectors.toList()); + assertTrue("Files should not be committed", activeFiles.isEmpty()); + assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); + assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); + + // Start next round of segment replication and not throwing exception resulting in commit on replica + when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {})); + CountDownLatch waitForSecondRound = new CountDownLatch(1); + logger.info("--> Starting second round of replication"); + final SegmentReplicationTarget newTarget = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + waitForSecondRound.countDown(); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + waitForSecondRound.countDown(); + logger.error("Replication error", e); + Assert.fail("Replication should not fail"); + } + } + ); + waitForSecondRound.await(); + assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE); + activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList()); + assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles)); + shards.removeReplica(replica); + closeShards(replica); + } + } + /** * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and @@ -469,8 +562,19 @@ public void getSegmentFiles( BiConsumer fileProgressTracker, ActionListener listener ) { - super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener); - postGetFilesRunnable.run(); + StepListener waitForCopyFilesListener = new StepListener(); + super.getSegmentFiles( + replicationId, + checkpoint, + filesToFetch, + indexShard, + (fileName, bytesRecovered) -> {}, + waitForCopyFilesListener + ); + waitForCopyFilesListener.whenComplete(response -> { + postGetFilesRunnable.run(); + listener.onResponse(response); + }, listener::onFailure); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 56fc90d130da3..8b4b3aff701b4 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -29,16 +29,21 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.DummyShardLock; +import org.opensearch.test.IndexSettingsModule; import org.junit.Assert; import java.io.FileNotFoundException; @@ -76,6 +81,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private static final Map SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF); + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() + ); + private SegmentInfos testSegmentInfos; @Override @@ -431,10 +441,7 @@ public void test_MissingFiles_NotCausingFailure() throws IOException { // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). - List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); - // Delete on-disk files so that they are not considered for file diff - deleteContent(spyIndexShard.store().directory()); - spyIndexShard.store().close(); + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override @@ -479,72 +486,14 @@ public void onFailure(Exception e) { }); } - /** - * This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff - * from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter - * out on-disk files be exercised. - * @throws IOException if an indexing operation fails or segment replication fails - */ - public void test_OnDiskFiles_ReusedForReplication() throws IOException { - int docCount = 1 + random().nextInt(10); - // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files - // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard - // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). - List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); - - SegmentReplicationSource segrepSource = new TestReplicationSource() { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - BiConsumer fileProgressTracker, - ActionListener listener - ) { - // No files should be requested from replication source - assertEquals(0, filesToFetch.size()); - listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); - } - }; - - segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, mock( - SegmentReplicationTargetService.SegmentReplicationListener.class - )); - // Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised - when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); - segrepTarget.startReplication(new ActionListener() { - @Override - public void onResponse(Void replicationResponse) { - segrepTarget.markAsDone(); - } - - @Override - public void onFailure(Exception e) { - Assert.fail(); - } - }); - } - - /** * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * operation. A list of snapshots is returned so that identical files have same checksum. * @param docCount the number of documents to index in the first snapshot - * @param shard The IndexShard object to use for writing * @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * @throws IOException if one of the indexing operations fails */ - private List generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException { + private List generateStoreMetadataSnapshot(int docCount) throws IOException { List docList = new ArrayList<>(); for (int i = 0; i < docCount; i++) { Document document = new Document(); @@ -558,7 +507,8 @@ private List generateStoreMetadataSnapshot(int docCount, IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); iwc.setMergePolicy(NoMergePolicy.INSTANCE); iwc.setUseCompoundFile(true); - Store store = shard.store(); + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); IndexWriter writer = new IndexWriter(store.directory(), iwc); for (Document d : docList) { writer.addDocument(d); @@ -569,6 +519,7 @@ private List generateStoreMetadataSnapshot(int docCount, writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); writer.commit(); Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); + deleteContent(store.directory()); writer.close(); store.close(); return Arrays.asList(storeMetadata, storeMetadataWithDeletes);