From 0ce22b1362372819fc3e5239f333efe22b1c658b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 5 Jan 2024 17:15:30 -0800 Subject: [PATCH 1/2] Use SegmentReplicationTarget test to validate non-active on-disk files are reused for replication Signed-off-by: Suraj Singh --- .../index/shard/RemoteIndexShardTests.java | 91 ------------------- .../SegmentReplicationTargetTests.java | 79 +++++++++++++--- 2 files changed, 64 insertions(+), 106 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index dd92bfb47afdb..0561a7cedd44f 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -366,97 +366,6 @@ public void testPrimaryRestart() throws Exception { } } - /** - * This test validates that unreferenced on disk file are ignored while requesting files from replication source to - * prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without - * committing locally so that in next round of segment replication those files are not considered for download again - */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10885") - public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - shards.indexDocs(10); - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - when(sourceFactory.get(any())).thenReturn( - getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) - ); - CountDownLatch latch = new CountDownLatch(1); - - // Start first round of segment replication. This should fail with simulated error but with replica having - // files in its local store but not in active reader. - final SegmentReplicationTarget target = targetService.startReplication( - replica, - primary.getLatestReplicationCheckpoint(), - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - latch.countDown(); - Assert.fail("Replication should fail with simulated error"); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - latch.countDown(); - assertFalse(sendShardFailure); - logger.error("Replication error", e); - } - } - ); - latch.await(); - Set onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll())); - onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS)); - List activeFiles = replica.getSegmentMetadataMap() - .values() - .stream() - .map(metadata -> metadata.name()) - .collect(Collectors.toList()); - assertTrue("Files should not be committed", activeFiles.isEmpty()); - assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); - assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); - - // Start next round of segment replication and not throwing exception resulting in commit on replica - when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {})); - CountDownLatch waitForSecondRound = new CountDownLatch(1); - final SegmentReplicationTarget newTarget = targetService.startReplication( - replica, - primary.getLatestReplicationCheckpoint(), - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - waitForSecondRound.countDown(); - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - waitForSecondRound.countDown(); - logger.error("Replication error", e); - Assert.fail("Replication should not fail"); - } - } - ); - waitForSecondRound.await(); - assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE); - activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList()); - assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles)); - shards.removeReplica(replica); - closeShards(replica); - } - } - /** * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 8b4b3aff701b4..56fc90d130da3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -29,21 +29,16 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.DummyShardLock; -import org.opensearch.test.IndexSettingsModule; import org.junit.Assert; import java.io.FileNotFoundException; @@ -81,11 +76,6 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private static final Map SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF); - private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() - ); - private SegmentInfos testSegmentInfos; @Override @@ -441,7 +431,10 @@ public void test_MissingFiles_NotCausingFailure() throws IOException { // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). - List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); + // Delete on-disk files so that they are not considered for file diff + deleteContent(spyIndexShard.store().directory()); + spyIndexShard.store().close(); SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override @@ -486,14 +479,72 @@ public void onFailure(Exception e) { }); } + /** + * This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff + * from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter + * out on-disk files be exercised. + * @throws IOException if an indexing operation fails or segment replication fails + */ + public void test_OnDiskFiles_ReusedForReplication() throws IOException { + int docCount = 1 + random().nextInt(10); + // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files + // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard + // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); + + SegmentReplicationSource segrepSource = new TestReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + BiConsumer fileProgressTracker, + ActionListener listener + ) { + // No files should be requested from replication source + assertEquals(0, filesToFetch.size()); + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + )); + // Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + segrepTarget.markAsDone(); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + /** * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * operation. A list of snapshots is returned so that identical files have same checksum. * @param docCount the number of documents to index in the first snapshot + * @param shard The IndexShard object to use for writing * @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * @throws IOException if one of the indexing operations fails */ - private List generateStoreMetadataSnapshot(int docCount) throws IOException { + private List generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException { List docList = new ArrayList<>(); for (int i = 0; i < docCount; i++) { Document document = new Document(); @@ -507,8 +558,7 @@ private List generateStoreMetadataSnapshot(int docCount) IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); iwc.setMergePolicy(NoMergePolicy.INSTANCE); iwc.setUseCompoundFile(true); - final ShardId shardId = new ShardId("index", "_na_", 1); - Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + Store store = shard.store(); IndexWriter writer = new IndexWriter(store.directory(), iwc); for (Document d : docList) { writer.addDocument(d); @@ -519,7 +569,6 @@ private List generateStoreMetadataSnapshot(int docCount) writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); writer.commit(); Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); - deleteContent(store.directory()); writer.close(); store.close(); return Arrays.asList(storeMetadata, storeMetadataWithDeletes); From 95a866ad8f9ef4115425246d467f31dbccaf8af5 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 8 Jan 2024 10:20:50 -0800 Subject: [PATCH 2/2] Fix original shard level unit test Signed-off-by: Suraj Singh --- .../index/shard/RemoteIndexShardTests.java | 108 +++++++++++++++++- .../SegmentReplicationTargetTests.java | 79 +++---------- 2 files changed, 121 insertions(+), 66 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 0561a7cedd44f..eacc504428ef1 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; +import org.opensearch.action.StepListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; @@ -366,6 +367,98 @@ public void testPrimaryRestart() throws Exception { } } + /** + * This test validates that unreferenced on disk file are ignored while requesting files from replication source to + * prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without + * committing locally so that in next round of segment replication those files are not considered for download again + */ + public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + shards.indexDocs(10); + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + when(sourceFactory.get(any())).thenReturn( + getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) + ); + CountDownLatch latch = new CountDownLatch(1); + + logger.info("--> Starting first round of replication"); + // Start first round of segment replication. This should fail with simulated error but with replica having + // files in its local store but not in active reader. + final SegmentReplicationTarget target = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + latch.countDown(); + Assert.fail("Replication should fail with simulated error"); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + latch.countDown(); + assertFalse(sendShardFailure); + logger.error("Replication error", e); + } + } + ); + latch.await(); + Set onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll())); + onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS)); + List activeFiles = replica.getSegmentMetadataMap() + .values() + .stream() + .map(metadata -> metadata.name()) + .collect(Collectors.toList()); + assertTrue("Files should not be committed", activeFiles.isEmpty()); + assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty()); + assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES); + + // Start next round of segment replication and not throwing exception resulting in commit on replica + when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {})); + CountDownLatch waitForSecondRound = new CountDownLatch(1); + logger.info("--> Starting second round of replication"); + final SegmentReplicationTarget newTarget = targetService.startReplication( + replica, + primary.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + waitForSecondRound.countDown(); + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + waitForSecondRound.countDown(); + logger.error("Replication error", e); + Assert.fail("Replication should not fail"); + } + } + ); + waitForSecondRound.await(); + assertEquals(newTarget.state().getStage(), SegmentReplicationState.Stage.DONE); + activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList()); + assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles)); + shards.removeReplica(replica); + closeShards(replica); + } + } + /** * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and @@ -469,8 +562,19 @@ public void getSegmentFiles( BiConsumer fileProgressTracker, ActionListener listener ) { - super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener); - postGetFilesRunnable.run(); + StepListener waitForCopyFilesListener = new StepListener(); + super.getSegmentFiles( + replicationId, + checkpoint, + filesToFetch, + indexShard, + (fileName, bytesRecovered) -> {}, + waitForCopyFilesListener + ); + waitForCopyFilesListener.whenComplete(response -> { + postGetFilesRunnable.run(); + listener.onResponse(response); + }, listener::onFailure); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 56fc90d130da3..8b4b3aff701b4 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -29,16 +29,21 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.DummyShardLock; +import org.opensearch.test.IndexSettingsModule; import org.junit.Assert; import java.io.FileNotFoundException; @@ -76,6 +81,11 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private static final Map SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF); + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() + ); + private SegmentInfos testSegmentInfos; @Override @@ -431,10 +441,7 @@ public void test_MissingFiles_NotCausingFailure() throws IOException { // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). - List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); - // Delete on-disk files so that they are not considered for file diff - deleteContent(spyIndexShard.store().directory()); - spyIndexShard.store().close(); + List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount); SegmentReplicationSource segrepSource = new TestReplicationSource() { @Override @@ -479,72 +486,14 @@ public void onFailure(Exception e) { }); } - /** - * This tests ensures that on-disk files on replica are taken into consideration while evaluating the files diff - * from primary. The test mocks the files referred by active reader to a smaller subset so that logic to filter - * out on-disk files be exercised. - * @throws IOException if an indexing operation fails or segment replication fails - */ - public void test_OnDiskFiles_ReusedForReplication() throws IOException { - int docCount = 1 + random().nextInt(10); - // Generate a list of MetadataSnapshot containing two elements. The second snapshot contains extra files - // generated due to delete operations. These two snapshots can then be used in test to mock the primary shard - // snapshot (2nd element which contains delete operations) and replica's existing snapshot (1st element). - List storeMetadataSnapshots = generateStoreMetadataSnapshot(docCount, spyIndexShard); - - SegmentReplicationSource segrepSource = new TestReplicationSource() { - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - BiConsumer fileProgressTracker, - ActionListener listener - ) { - // No files should be requested from replication source - assertEquals(0, filesToFetch.size()); - listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); - } - }; - - segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, mock( - SegmentReplicationTargetService.SegmentReplicationListener.class - )); - // Mask the files returned by active reader. This is needed so that logic to filter out on disk is exercised - when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); - segrepTarget.startReplication(new ActionListener() { - @Override - public void onResponse(Void replicationResponse) { - segrepTarget.markAsDone(); - } - - @Override - public void onFailure(Exception e) { - Assert.fail(); - } - }); - } - - /** * Generates a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * operation. A list of snapshots is returned so that identical files have same checksum. * @param docCount the number of documents to index in the first snapshot - * @param shard The IndexShard object to use for writing * @return a list of Store.MetadataSnapshot with two elements where second snapshot has extra files due to delete * @throws IOException if one of the indexing operations fails */ - private List generateStoreMetadataSnapshot(int docCount, IndexShard shard) throws IOException { + private List generateStoreMetadataSnapshot(int docCount) throws IOException { List docList = new ArrayList<>(); for (int i = 0; i < docCount; i++) { Document document = new Document(); @@ -558,7 +507,8 @@ private List generateStoreMetadataSnapshot(int docCount, IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random)).setCodec(TestUtil.getDefaultCodec()); iwc.setMergePolicy(NoMergePolicy.INSTANCE); iwc.setUseCompoundFile(true); - Store store = shard.store(); + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); IndexWriter writer = new IndexWriter(store.directory(), iwc); for (Document d : docList) { writer.addDocument(d); @@ -569,6 +519,7 @@ private List generateStoreMetadataSnapshot(int docCount, writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(docCount)))); writer.commit(); Store.MetadataSnapshot storeMetadataWithDeletes = store.getMetadata(); + deleteContent(store.directory()); writer.close(); store.close(); return Arrays.asList(storeMetadata, storeMetadataWithDeletes);