diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 77556f8391473..8773b37aa7d4c 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -458,8 +458,8 @@ private void uploadNewSegments( } } - private boolean isLowPriorityUpload() { - return isLocalOrSnapshotRecovery(); + boolean isLowPriorityUpload() { + return isLocalOrSnapshotRecoveryOrSeeding(); } /** @@ -549,7 +549,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException { * @return true iff the shard is a started with primary mode true or it is local or snapshot recovery. */ private boolean isReadyForUpload() { - boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery() || indexShard.shouldSeedRemoteStore(); + boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecoveryOrSeeding(); if (isReady == false) { StringBuilder sb = new StringBuilder("Skipped syncing segments with"); @@ -571,14 +571,15 @@ private boolean isReadyForUpload() { return isReady; } - private boolean isLocalOrSnapshotRecovery() { + boolean isLocalOrSnapshotRecoveryOrSeeding() { // In this case when the primary mode is false, we need to upload segments to Remote Store - // This is required in case of snapshots/shrink/ split/clone where we need to durable persist + // This is required in case of remote migration seeding/snapshots/shrink/ split/clone where we need to durable persist // all segments to remote before completing the recovery to ensure durability. return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary()) && indexShard.recoveryState() != null && (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS - || indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT); + || indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT + || indexShard.shouldSeedRemoteStore()); } /** diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 67787e8583930..94269de9349fe 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -16,6 +16,7 @@ import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; @@ -37,6 +38,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.indices.DefaultRemoteStoreSettings; import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.ClusterServiceUtils; @@ -126,6 +128,52 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testIsLowPriorityUpload() throws IOException { + setup(true, 3); + + // Mocking the IndexShard methods and dependent classes. + IndexShard shard = mock(IndexShard.class); + Store store = mock(Store.class); + ShardId shardId = new ShardId("index1", "_na_", 1); + ShardRouting shardRouting = mock(ShardRouting.class); + shard.shardRouting = shardRouting; + when(shard.shouldSeedRemoteStore()).thenReturn(true); + when(shard.state()).thenReturn(IndexShardState.RECOVERING); + when(shardRouting.primary()).thenReturn(true); + when(shard.shardId()).thenReturn(shardId); + when(shard.store()).thenReturn(store); + when(shard.routingEntry()).thenReturn(shardRouting); + when(shard.getThreadPool()).thenReturn(mock(ThreadPool.class)); + RecoveryState recoveryState = mock(RecoveryState.class); + when(recoveryState.getRecoverySource()).thenReturn(RecoverySource.PeerRecoverySource.INSTANCE); + when(shard.recoveryState()).thenReturn(recoveryState); + + // Mock the Store, Directory and RemoteSegmentStoreDirectory classes + Store remoteStore = mock(Store.class); + when(shard.remoteStore()).thenReturn(remoteStore); + RemoteDirectory remoteMetadataDirectory = mock(RemoteDirectory.class); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + mock(RemoteDirectory.class), + remoteMetadataDirectory, + mock(RemoteStoreLockManager.class), + mock(ThreadPool.class), + shardId + ); + FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( + new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) + ); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + + RemoteStoreRefreshListener remoteStoreRefreshListener = new RemoteStoreRefreshListener( + shard, + SegmentReplicationCheckpointPublisher.EMPTY, + mock(RemoteSegmentTransferTracker.class), + DefaultRemoteStoreSettings.INSTANCE + ); + assertTrue(remoteStoreRefreshListener.isLocalOrSnapshotRecoveryOrSeeding()); + assertTrue(remoteStoreRefreshListener.isLowPriorityUpload()); + } + public void testRemoteDirectoryInitThrowsException() throws IOException { // Methods used in the constructor of RemoteSegmentTrackerListener have been mocked to reproduce specific exceptions // to test the failure modes possible during construction of RemoteSegmentTrackerListener object.