diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index cfbb4d34c3a38..bd5b694f4fe41 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -62,9 +62,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.action.shard.ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; @@ -199,6 +201,32 @@ public class BalancedShardsAllocator implements ShardsAllocator { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + BalancedShardsAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -213,6 +241,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; private long startTime; private RerouteService rerouteService; @@ -233,6 +262,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings)); + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -244,6 +274,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -342,6 +373,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } + protected boolean allocatorTimedOut() { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { @@ -438,10 +473,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { private void scheduleRerouteIfAllocatorTimedOut() { if (allocatorTimedOut()) { - assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out"; + if (rerouteService == null) { + logger.info("RerouteService not set to schedule reroute after allocator time out"); + return; + } rerouteService.reroute( "reroute after balanced shards allocator timed out", - Priority.HIGH, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after balanced shards allocator timed out completed"), e -> logger.debug("reroute after balanced shards allocator timed out failed", e) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c1f4e52706465..61806bcd74135 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, + BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING, BalancedShardsAllocator.PRIMARY_CONSTRAINT_THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, @@ -354,6 +355,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE, ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, + ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 9c38ea1df8a41..82229f244239f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -82,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue primaryShardsBatchGatewayAllocatorTimeout; private TimeValue replicaShardsBatchGatewayAllocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); private final ClusterManagerMetrics clusterManagerMetrics; @@ -145,6 +147,32 @@ public void validate(TimeValue timeValue) { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate existing shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + ShardsBatchGatewayAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private final RerouteService rerouteService; private final PrimaryShardBatchAllocator primaryShardBatchAllocator; private final ReplicaShardBatchAllocator replicaShardBatchAllocator; @@ -179,6 +207,8 @@ public ShardsBatchGatewayAllocator( this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout); this.clusterManagerMetrics = clusterManagerMetrics; + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -308,8 +338,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [P] timed out", + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -343,8 +373,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [R] timed out", + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -920,4 +950,8 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } + + protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index 45a0bd7b18afd..8899b4ee8f68d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -46,6 +46,7 @@ import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING; public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { @@ -108,7 +109,7 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled( listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -143,6 +144,49 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { System.nanoTime() ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.NORMAL, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(0, initializingShards.size()); + assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); + } + + public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduledWithHighPriority() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", "high"); + // passing 0 for timed out latch such that all shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + setupStateAndService(metadata, routingTable); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { if (randomBoolean()) { listener.onFailure(new OpenSearchException("simulated")); @@ -193,7 +237,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -237,7 +281,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -284,7 +328,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -326,7 +370,7 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -371,7 +415,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -416,7 +460,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -462,7 +506,7 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -522,7 +566,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -561,6 +605,32 @@ public void testAllocatorTimeout() { assertEquals(-1, ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); } + public void testFollowupPriorityValues() { + String settingKey = "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority"; + Settings build = Settings.builder().put(settingKey, "normal").build(); + assertEquals(Priority.NORMAL, FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "high").build(); + assertEquals(Priority.HIGH, FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "urgent").build(); + assertEquals(Priority.URGENT, FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + Settings wrongPriority = Settings.builder().put(settingKey, "immediate").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority) + ); + assertEquals("priority [IMMEDIATE] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]", iae.getMessage()); + + Settings wrongPriority2 = Settings.builder().put(settingKey, "random").build(); + IllegalArgumentException iae2 = expectThrows( + IllegalArgumentException.class, + () -> FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority2) + ); + assertEquals("No enum constant org.opensearch.common.Priority.RANDOM", iae2.getMessage()); + } + private RoutingTable buildRoutingTable(Metadata metadata) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (Map.Entry entry : metadata.getIndices().entrySet()) { diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index ebc2e59fa5a30..7a3b5f576449c 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.ClusterInfo; @@ -53,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; @@ -437,10 +437,51 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); + rerouteLatch.countDown(); + }; + CountDownLatch timedOutShardsLatch = new CountDownLatch(20); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 10); + assertEquals(1, rerouteLatch.getCount()); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 0); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners + final boolean terminated = terminate(threadPool); + assert terminated; + clusterService.close(); + } + + public void testCollectTimedOutShardsAndScheduleRerouteWithHighPriority_Success() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); + final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } assertEquals(Priority.HIGH, priority); rerouteLatch.countDown(); }; @@ -448,11 +489,13 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.HIGH); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners @@ -466,22 +509,29 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onFailure(new OpenSearchException("simulated")); + listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); rerouteLatch.countDown(); }; CountDownLatch timedOutShardsLatch = new CountDownLatch(20); testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners @@ -490,6 +540,35 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru clusterService.close(); } + public void testFollowupPriorityValues() { + String settingKey = "cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority"; + Settings build = Settings.builder().put(settingKey, "normal").build(); + assertEquals(Priority.NORMAL, ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "high").build(); + assertEquals(Priority.HIGH, ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "urgent").build(); + assertEquals(Priority.URGENT, ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + Settings wrongPriority = Settings.builder().put(settingKey, "immediate").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority) + ); + assertEquals( + "priority [IMMEDIATE] not supported for [" + ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]", + iae.getMessage() + ); + + Settings wrongPriority2 = Settings.builder().put(settingKey, "random").build(); + IllegalArgumentException iae2 = expectThrows( + IllegalArgumentException.class, + () -> ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority2) + ); + assertEquals("No enum constant org.opensearch.common.Priority.RANDOM", iae2.getMessage()); + } + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { if (count == 0) return; Metadata.Builder metadata = Metadata.builder();