From 74ce992c33f9fee31e0d647b9060f886641ceefa Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Sat, 3 Aug 2024 11:53:20 +0530 Subject: [PATCH] Add wait for target allocation id to appear Signed-off-by: Gaurav Bafna --- .../LocalStorePeerRecoverySourceHandler.java | 19 +- .../recovery/RecoverySourceHandler.java | 23 +++ .../RemoteStorePeerRecoverySourceHandler.java | 48 +++++- ...alStorePeerRecoverySourceHandlerTests.java | 106 +++++++++++- ...teStorePeerRecoverySourceHandlerTests.java | 163 ++++++++++++++++++ 5 files changed, 338 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index ac6b2e6b77d18..03e9c904d0a4a 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -12,15 +12,12 @@ import org.opensearch.action.StepListener; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.SetOnce; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.RecoveryEngineException; -import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseNotFoundException; import org.opensearch.index.seqno.RetentionLeases; @@ -58,21 +55,7 @@ public LocalStorePeerRecoverySourceHandler( @Override protected void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException { final SetOnce retentionLeaseRef = new SetOnce<>(); - - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying recovery of {} as it is not listed as assigned to target node {}", - request.shardId(), - request.targetNode() - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); - }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); + waitForAssignment(retentionLeaseRef); final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 8966516d3ef7f..5d866b863e4cd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -44,7 +44,10 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.SetOnce; import org.opensearch.common.StopWatch; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; @@ -59,6 +62,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.engine.RecoveryEngineException; +import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseNotFoundException; import org.opensearch.index.seqno.RetentionLeases; @@ -191,6 +195,25 @@ public void recoverToTarget(ActionListener listener) { protected abstract void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException; + protected void waitForAssignment(SetOnce retentionLeaseRef) { + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying recovery of {} as it is not listed as assigned to target node {}", + request.shardId(), + request.targetNode() + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; + retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); + }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); + } + + + protected void finalizeStepAndCompleteFuture( long startingSeqNo, StepListener sendSnapshotStep, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java index 66c7a3b48f28f..c3b58b6707a4c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java @@ -10,11 +10,17 @@ import org.apache.lucene.index.IndexCommit; import org.opensearch.action.StepListener; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.SetOnce; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.RecoveryEngineException; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.RunUnderPrimaryPermit; @@ -22,6 +28,8 @@ import org.opensearch.transport.Transports; import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** @@ -48,7 +56,8 @@ protected void innerRecoveryToTarget(ActionListener listener, // A replica of an index with remote translog does not require the translogs locally and keeps receiving the // updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed // and there is no translog replay done. - + final SetOnce retentionLeaseRef = new SetOnce<>(); + waitForAssignment(retentionLeaseRef); final StepListener sendFileStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); @@ -102,4 +111,41 @@ protected void innerRecoveryToTarget(ActionListener listener, finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); } + + protected void waitForAssignment(SetOnce retentionLeaseRef) { + BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(100), + 3 + ); + AtomicReference targetShardRouting = new AtomicReference<>(); + Iterator backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); + while (backoffDelayIterator.hasNext() ) { + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId())); + if (targetShardRouting.get() == null) { + logger.info( + "delaying recovery of {} as it is not listed as assigned to target node {}", + request.shardId(), + request.targetNode() + ); + Thread.sleep(backoffDelayIterator.next().millis()); + } + if (targetShardRouting.get() != null) { + assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was " + targetShardRouting; + retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))); + } + + }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); + + if (targetShardRouting.get() != null) { + return; + } + } + if (targetShardRouting.get() != null) { + return; + } + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + } diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 4685a7196b85a..860a280eeab3a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -45,6 +45,8 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.store.BaseDirectoryWrapper; +import org.junit.After; +import org.junit.Before; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; @@ -52,6 +54,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.common.Numbers; import org.opensearch.common.Randomness; import org.opensearch.common.SetOnce; @@ -89,6 +92,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardRelocatedException; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; @@ -102,8 +106,6 @@ import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; import java.io.IOException; import java.io.OutputStream; @@ -141,6 +143,8 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -746,6 +750,104 @@ void phase2( assertFalse(phase2Called.get()); } + public void testThrowExceptionOnNoTargetInRouting() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final StartRecoveryRequest request = getStartRecoveryRequest(); + final IndexShard shard = mock(IndexShard.class); + when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.isRelocatedPrimary()).thenReturn(false); + final ReplicationGroup replicationGroup = mock(ReplicationGroup.class); + final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); + when(routingTable.getByAllocationId(anyString())).thenReturn(null); + when(shard.getReplicationGroup()).thenReturn(replicationGroup); + when(replicationGroup.getRoutingTable()).thenReturn(routingTable); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[0]).onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any()); + + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ); + if (randomBoolean()) { + indexMetadata.state(IndexMetadata.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY)); + + final AtomicBoolean phase1Called = new AtomicBoolean(); + final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); + final AtomicBoolean phase2Called = new AtomicBoolean(); + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( + shard, + mock(RecoveryTargetHandler.class), + threadPool, + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 8), + between(1, 8) + ) { + + @Override + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { + phase1Called.set(true); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); + } + + @Override + void prepareTargetForTranslog(int totalTranslogOps, ActionListener listener) { + prepareTargetForTranslogCalled.set(true); + super.prepareTargetForTranslog(totalTranslogOps, listener); + } + + @Override + void phase2( + long startingSeqNo, + long endingSeqNo, + Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, + RetentionLeases retentionLeases, + long mappingVersion, + ActionListener listener + ) throws IOException { + phase2Called.set(true); + super.phase2( + startingSeqNo, + endingSeqNo, + snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersion, + listener + ); + } + + }; + PlainActionFuture future = new PlainActionFuture<>(); + expectThrows(DelayRecoveryException.class, () -> { + handler.recoverToTarget(future); + future.actionGet(); + }); + verify(routingTable, times(1)).getByAllocationId(null); + assertFalse(phase1Called.get()); + assertFalse(prepareTargetForTranslogCalled.get()); + assertFalse(phase2Called.get()); + } + public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { final CancellableThreads cancellableThreads = new CancellableThreads(); final IndexShard shard = mock(IndexShard.class); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 131514eb019b3..cb63ca78b4667 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -8,20 +8,64 @@ package org.opensearch.indices.recovery; +import org.apache.lucene.index.IndexCommit; +import org.opensearch.Version; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.common.UUIDs; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.engine.SegmentsStats; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.VersionUtils; +import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() + ); + private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1); + + private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private static final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") @@ -73,4 +117,123 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry()))); } } + + public StartRecoveryRequest getStartRecoveryRequest() throws IOException { + Store.MetadataSnapshot metadataSnapshot = randomBoolean() + ? Store.MetadataSnapshot.EMPTY + : new Store.MetadataSnapshot( + Collections.emptyMap(), + Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), + randomIntBetween(0, 100) + ); + return new StartRecoveryRequest( + shardId, + null, + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + metadataSnapshot, + randomBoolean(), + randomNonNegativeLong(), + randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong() + ); + } + + + public void testThrowExceptionOnNoTargetInRouting() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final StartRecoveryRequest request = getStartRecoveryRequest(); + final IndexShard shard = mock(IndexShard.class); + when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.isRelocatedPrimary()).thenReturn(false); + final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); + final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); + when(routingTable.getByAllocationId(anyString())).thenReturn(null); + when(shard.getReplicationGroup()).thenReturn(replicationGroup); + when(replicationGroup.getRoutingTable()).thenReturn(routingTable); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[0]).onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any()); + + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ); + if (randomBoolean()) { + indexMetadata.state(IndexMetadata.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY)); + + final AtomicBoolean phase1Called = new AtomicBoolean(); + final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); + final AtomicBoolean phase2Called = new AtomicBoolean(); + final RecoverySourceHandler handler = new RemoteStorePeerRecoverySourceHandler( + shard, + mock(RecoveryTargetHandler.class), + threadPool, + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 8), + between(1, 8) + ) { + + @Override + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { + phase1Called.set(true); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); + } + + @Override + void prepareTargetForTranslog(int totalTranslogOps, ActionListener listener) { + prepareTargetForTranslogCalled.set(true); + super.prepareTargetForTranslog(totalTranslogOps, listener); + } + + @Override + void phase2( + long startingSeqNo, + long endingSeqNo, + Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, + RetentionLeases retentionLeases, + long mappingVersion, + ActionListener listener + ) throws IOException { + phase2Called.set(true); + super.phase2( + startingSeqNo, + endingSeqNo, + snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersion, + listener + ); + } + + }; + PlainActionFuture future = new PlainActionFuture<>(); + expectThrows(DelayRecoveryException.class, () -> { + handler.recoverToTarget(future); + future.actionGet(); + }); + verify(routingTable, times(3)).getByAllocationId(null); + assertFalse(phase1Called.get()); + assertFalse(prepareTargetForTranslogCalled.get()); + assertFalse(phase2Called.get()); + } }