diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 57a561bc8f2a3..4d85a3c491af8 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -12,6 +12,9 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.action.StepListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.SnapshotsInProgress; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Settings; @@ -20,6 +23,7 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; @@ -32,6 +36,11 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.Snapshot; +import org.opensearch.snapshots.SnapshotId; +import org.opensearch.snapshots.SnapshotShardsService; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.junit.annotations.TestLogging; import org.hamcrest.MatcherAssert; @@ -41,6 +50,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -55,6 +65,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -541,6 +553,81 @@ public void onReplicationFailure( } } + public void testShallowCopySnapshotForClosedIndexSuccessful() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings)) { + final IndexShard primaryShard = shards.getPrimary(); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + shards.flush(); + shards.assertAllEqual(10); + + RepositoriesService repositoriesService = createRepositoriesService(); + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository("random"); + + doAnswer(invocation -> { + IndexShardSnapshotStatus snapshotStatus = invocation.getArgument(5); + long commitGeneration = invocation.getArgument(7); + long startTime = invocation.getArgument(8); + final Map indexFilesToFileLengthMap = invocation.getArgument(9); + ActionListener listener = invocation.getArgument(10); + if (indexFilesToFileLengthMap != null) { + List fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet()); + long indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum(); + int indexTotalNumberOfFiles = fileNames.size(); + snapshotStatus.moveToStarted(startTime, 0, indexTotalNumberOfFiles, 0, indexTotalFileSize); + // Not performing actual snapshot, just modifying the state + snapshotStatus.moveToFinalize(commitGeneration); + snapshotStatus.moveToDone(System.currentTimeMillis(), snapshotStatus.generation()); + listener.onResponse(snapshotStatus.generation()); + return null; + } + listener.onResponse(snapshotStatus.generation()); + return null; + }).when(repository) + .snapshotRemoteStoreIndexShard(any(), any(), any(), any(), any(), any(), anyLong(), anyLong(), anyLong(), any(), any()); + + final SnapshotShardsService shardsService = getSnapshotShardsService( + primaryShard, + shards.getIndexMetadata(), + true, + repositoriesService + ); + final Snapshot snapshot1 = new Snapshot( + randomAlphaOfLength(10), + new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)) + ); + + // Initialize the shallow copy snapshot + final ClusterState initState = addSnapshotIndex( + clusterService.state(), + snapshot1, + primaryShard, + SnapshotsInProgress.State.INIT, + true + ); + shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state())); + + // start the snapshot + shardsService.clusterChanged( + new ClusterChangedEvent( + "test", + addSnapshotIndex(clusterService.state(), snapshot1, primaryShard, SnapshotsInProgress.State.STARTED, true), + initState + ) + ); + + // Check the snapshot got completed successfully + assertBusy(() -> { + final IndexShardSnapshotStatus.Copy copy = shardsService.currentSnapshotShards(snapshot1) + .get(primaryShard.shardId) + .asCopy(); + final IndexShardSnapshotStatus.Stage stage = copy.getStage(); + assertEquals(IndexShardSnapshotStatus.Stage.DONE, stage); + }); + } + } + private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) { return new RemoteStoreReplicationSource(shard) { @Override diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index d2f85e9cfafaf..f4f94baabd7b0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -68,6 +68,7 @@ import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfoTests; @@ -892,10 +893,21 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { replicateSegments(primaryShard, shards.getReplicas()); shards.assertAllEqual(10); - final SnapshotShardsService shardsService = getSnapshotShardsService(replicaShard, shards.getIndexMetadata()); + final SnapshotShardsService shardsService = getSnapshotShardsService( + replicaShard, + shards.getIndexMetadata(), + false, + createRepositoriesService() + ); final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); - final ClusterState initState = addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.INIT); + final ClusterState initState = addSnapshotIndex( + clusterService.state(), + snapshot, + replicaShard, + SnapshotsInProgress.State.INIT, + false + ); shardsService.clusterChanged(new ClusterChangedEvent("test", initState, clusterService.state())); CountDownLatch latch = new CountDownLatch(1); @@ -907,7 +919,7 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { shardsService.clusterChanged( new ClusterChangedEvent( "test", - addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED), + addSnapshotIndex(clusterService.state(), snapshot, replicaShard, SnapshotsInProgress.State.STARTED, false), initState ) ); @@ -956,22 +968,30 @@ public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() th } } - private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard, IndexMetadata indexMetadata) { + protected SnapshotShardsService getSnapshotShardsService( + IndexShard indexShard, + IndexMetadata indexMetadata, + boolean closedIdx, + RepositoriesService repositoriesService + ) { final TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool); final IndicesService indicesService = mock(IndicesService.class); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(any())).thenReturn(indexService); - when(indexService.getShardOrNull(anyInt())).thenReturn(replicaShard); - when(indexService.getMetadata()).thenReturn(indexMetadata); - return new SnapshotShardsService(settings, clusterService, createRepositoriesService(), transportService, indicesService); + when(indexService.getShardOrNull(anyInt())).thenReturn(indexShard); + when(indexService.getMetadata()).thenReturn( + new IndexMetadata.Builder(indexMetadata).state(closedIdx ? IndexMetadata.State.CLOSE : IndexMetadata.State.OPEN).build() + ); + return new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService); } - private ClusterState addSnapshotIndex( + protected ClusterState addSnapshotIndex( ClusterState state, Snapshot snapshot, IndexShard shard, - SnapshotsInProgress.State snapshotState + SnapshotsInProgress.State snapshotState, + boolean shallowCopySnapshot ) { final Map shardsBuilder = new HashMap<>(); ShardRouting shardRouting = shard.shardRouting; @@ -992,7 +1012,7 @@ private ClusterState addSnapshotIndex( null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()), - false + shallowCopySnapshot ); return ClusterState.builder(state) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry)))