diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 5d866b863e4cd..75842212ab318 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -41,6 +41,7 @@ import org.apache.lucene.util.ArrayUtil; import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; @@ -83,12 +84,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.stream.StreamSupport; @@ -195,24 +198,46 @@ public void recoverToTarget(ActionListener listener) { protected abstract void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException; - protected void waitForAssignment(SetOnce retentionLeaseRef) { - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying recovery of {} as it is not listed as assigned to target node {}", - request.shardId(), - request.targetNode() - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); - }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - } + protected void waitForAssignment(SetOnce retentionLeaseRef) { + BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3); + AtomicReference targetShardRouting = new AtomicReference<>(); + Iterator backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); + while (backoffDelayIterator.hasNext()) { + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId())); + if (targetShardRouting.get() == null) { + logger.info( + "delaying recovery of {} as it is not listed as assigned to target node {}", + request.shardId(), + request.targetNode() + ); + Thread.sleep(backoffDelayIterator.next().millis()); + } + if (targetShardRouting.get() != null) { + assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was " + + targetShardRouting; + retentionLeaseRef.set( + shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get())) + ); + } + }, + shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", + shard, + cancellableThreads, + logger + ); + if (targetShardRouting.get() != null) { + return; + } + } + if (targetShardRouting.get() != null) { + return; + } + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } protected void finalizeStepAndCompleteFuture( long startingSeqNo, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java index c3b58b6707a4c..bb5fdb3232ddf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java @@ -10,16 +10,12 @@ import org.apache.lucene.index.IndexCommit; import org.opensearch.action.StepListener; -import org.opensearch.action.bulk.BackoffPolicy; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.SetOnce; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.RecoveryEngineException; -import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; @@ -28,8 +24,6 @@ import org.opensearch.transport.Transports; import java.io.IOException; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** @@ -112,40 +106,4 @@ protected void innerRecoveryToTarget(ActionListener listener, finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); } - protected void waitForAssignment(SetOnce retentionLeaseRef) { - BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff( - TimeValue.timeValueMillis(100), - 3 - ); - AtomicReference targetShardRouting = new AtomicReference<>(); - Iterator backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); - while (backoffDelayIterator.hasNext() ) { - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId())); - if (targetShardRouting.get() == null) { - logger.info( - "delaying recovery of {} as it is not listed as assigned to target node {}", - request.shardId(), - request.targetNode() - ); - Thread.sleep(backoffDelayIterator.next().millis()); - } - if (targetShardRouting.get() != null) { - assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))); - } - - }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); - - if (targetShardRouting.get() != null) { - return; - } - } - if (targetShardRouting.get() != null) { - return; - } - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - } diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 860a280eeab3a..bc3ec1f216949 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -45,8 +45,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.store.BaseDirectoryWrapper; -import org.junit.After; -import org.junit.Before; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; @@ -92,7 +90,6 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardRelocatedException; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; @@ -106,6 +103,8 @@ import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.io.OutputStream; @@ -757,7 +756,7 @@ public void testThrowExceptionOnNoTargetInRouting() throws IOException { when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.isRelocatedPrimary()).thenReturn(false); - final ReplicationGroup replicationGroup = mock(ReplicationGroup.class); + final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); when(routingTable.getByAllocationId(anyString())).thenReturn(null); when(shard.getReplicationGroup()).thenReturn(replicationGroup); @@ -842,7 +841,7 @@ void phase2( handler.recoverToTarget(future); future.actionGet(); }); - verify(routingTable, times(1)).getByAllocationId(null); + verify(routingTable, times(3)).getByAllocationId(null); assertFalse(phase1Called.get()); assertFalse(prepareTargetForTranslogCalled.get()); assertFalse(phase2Called.get()); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index cb63ca78b4667..e4de0edad5419 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -118,27 +118,6 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { } } - public StartRecoveryRequest getStartRecoveryRequest() throws IOException { - Store.MetadataSnapshot metadataSnapshot = randomBoolean() - ? Store.MetadataSnapshot.EMPTY - : new Store.MetadataSnapshot( - Collections.emptyMap(), - Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), - randomIntBetween(0, 100) - ); - return new StartRecoveryRequest( - shardId, - null, - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), - metadataSnapshot, - randomBoolean(), - randomNonNegativeLong(), - randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong() - ); - } - - public void testThrowExceptionOnNoTargetInRouting() throws IOException { final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); final StartRecoveryRequest request = getStartRecoveryRequest(); @@ -236,4 +215,24 @@ void phase2( assertFalse(prepareTargetForTranslogCalled.get()); assertFalse(phase2Called.get()); } + + public StartRecoveryRequest getStartRecoveryRequest() throws IOException { + Store.MetadataSnapshot metadataSnapshot = randomBoolean() + ? Store.MetadataSnapshot.EMPTY + : new Store.MetadataSnapshot( + Collections.emptyMap(), + Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), + randomIntBetween(0, 100) + ); + return new StartRecoveryRequest( + shardId, + null, + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + metadataSnapshot, + randomBoolean(), + randomNonNegativeLong(), + randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong() + ); + } }