From 96c0766d214b8a6acefacacefd5e3efd04b20ebe Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Thu, 24 Oct 2024 09:26:16 +0530 Subject: [PATCH] rare reroute with npe --- .../coordination/RareClusterStateIT.java | 155 ++++++++++++++++++ .../action/shard/ShardStateAction.java | 10 ++ .../cluster/service/MasterService.java | 2 +- .../gateway/ReplicaShardAllocator.java | 4 +- 4 files changed, 168 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java index b3cb15d028090..c6d2ca3561a10 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java @@ -32,36 +32,49 @@ package org.opensearch.cluster.coordination; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; import org.opensearch.OpenSearchParseException; import org.opensearch.Version; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingChangesObserver; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.index.Index; +import org.opensearch.core.transport.TransportResponse; import org.opensearch.discovery.Discovery; import org.opensearch.index.IndexService; import org.opensearch.index.mapper.DocumentMapper; import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.IndicesService; +import org.opensearch.plugins.Plugin; +import org.opensearch.snapshots.SnapshotsInfoService; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.BlockClusterStateProcessing; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; import java.util.List; @@ -71,6 +84,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.action.DocWriteResponse.Result.CREATED; +import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -409,4 +423,145 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED)); } + + public void testDisassociateNodesWhileShardInit() throws InterruptedException { + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s") + .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()); + internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + String node2 = internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + + final ClusterService clusterService = internalCluster().clusterService(clusterManagerName); + blockShardStartedResponse(clusterManagerName, clusterService); + + final String index = "index"; + + //create index with 3 primary and 1 replica each + prepareCreate(index).setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + //.put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries") + ).get(); + ensureGreen(index); + + // close to have some unassigned started shards shards.. + client().admin().indices().prepareClose(index).get(); + + //block so that replicas are always in init and not started + blockReplicaStart.set(true); + final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName); + clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState.Builder builder = ClusterState.builder(currentState); + // open index + final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index)) + .state(IndexMetadata.State.OPEN) + .build(); + + builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true)); + builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); + ClusterState updatedState = builder.build(); + RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + routingTable.addAsRecovery(updatedState.metadata().index(index)); + updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); + return allocationService.reroute(updatedState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(e.getMessage(), e); + } + }); + + ensureYellow(index); + waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3); + + logger.info("Initializing shards"); + logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)); + + //trigger 2nd reroute after shard in initialized + clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return allocationService.reroute(currentState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + ensureYellow(index); + waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3); + clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + //remove the primary node of replica shard which is in init + ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0); + ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId()); + + ClusterState.Builder builder = ClusterState.builder(currentState); + builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId())); + currentState = builder.build(); + logger.info("removed the node {}", primaryShard.currentNodeId()); + logger.info("shard {}", next); + return allocationService.disassociateDeadNodes(currentState, true, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + //sleep for reroute to get triggeredn + Thread.sleep(60 * 1000); + + waitUntil(() -> clusterService.state().nodes().getSize() == 3); + logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)); + blockReplicaStart.set(false); + + clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState.Builder builder = ClusterState.builder(currentState); + final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index)) + .state(IndexMetadata.State.OPEN) + .build(); + builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true)); + builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); + ClusterState updatedState = builder.build(); + RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + routingTable.addAsRecovery(updatedState.metadata().index(index)); + updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); + + return allocationService.reroute(updatedState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + + ensureGreen(index); + } + + AtomicBoolean blockReplicaStart = new AtomicBoolean(false); + private void blockShardStartedResponse(String master, ClusterService service) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); + primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> { + + if (blockReplicaStart.get()) { + ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request; + final ShardRouting matched = service.state().getRoutingTable().getByAllocationId(req.getShardId(), req.getAllocationId()); + + if (matched != null && matched.primary() == false) { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } else { + handler.messageReceived(request, channel, task); + } + } else { + handler.messageReceived(request, channel, task); + } + }); + } + + @Override + protected Collection> nodePlugins() { + return List.of(MockTransportService.TestPlugin.class); + } } diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java index cb5749a91d448..5ac280e612590 100644 --- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java @@ -843,6 +843,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { * @opensearch.internal */ public static class StartedShardEntry extends TransportRequest { + final ShardId shardId; final String allocationId; final long primaryTerm; @@ -883,6 +884,15 @@ public String toString() { message ); } + + public ShardId getShardId() { + return shardId; + } + + public String getAllocationId() { + return allocationId; + } + } /** diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 713de8cdd0fda..31bf2affe4a84 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -917,7 +917,7 @@ private ClusterTasksResult executeTasks(TaskInputs taskInputs, ClusterSt throw new AssertionError("update task submitted to ClusterManagerService cannot remove cluster-manager"); } } catch (Exception e) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}", previousClusterState.version(), diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index c30ee8479ac97..9f4137b4d8172 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -101,7 +101,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch( RoutingNodes routingNodes = allocation.routingNodes(); ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); if (primaryShard == null) { - logger.trace("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard); + logger.info("{}: no active primary shard found or allocated, letting actual allocation figure it out", shard); return null; } assert primaryShard.currentNodeId() != null; @@ -111,7 +111,7 @@ protected Runnable cancelExistingRecoveryForBetterMatch( if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard - logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); + logger.info("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard); return null; }