Skip to content

Commit

Permalink
using the routing allocation to cancel existing recoveries
Browse files Browse the repository at this point in the history
Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv committed Oct 28, 2024
1 parent 72559bf commit bc1edaf
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand All @@ -48,29 +49,39 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.index.Index;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexService;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.BlockClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
}

public void testDisassociateNodesWhileShardInit() throws InterruptedException {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder()
.put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build());
String node2 = internalCluster().startDataOnlyNode(
Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()
);

final ClusterService clusterService = internalCluster().clusterService(clusterManagerName);
blockShardStartedResponse(clusterManagerName, clusterService);

final String index = "index";

// create index with 3 primary and 1 replica each
prepareCreate(index).setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
// .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries")
).get();
ensureGreen(index);

// close to have some unassigned started shards shards..
client().admin().indices().prepareClose(index).get();

// block so that replicas are always in init and not started
blockReplicaStart.set(true);
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName);
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
// open index
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();

builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
ClusterState state = allocationService.reroute(updatedState, "reroute");
return state;
}

@Override
public void onFailure(String source, Exception e) {
logger.error(e.getMessage(), e);
}
});

ensureYellow(index);
assertTrue(waitUntil(() -> {
ClusterState state = clusterService.state();
return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3;

}));

logger.info("Initializing shards");
logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));

// trigger 2nd reroute after shard in initialized
clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return allocationService.reroute(currentState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureYellow(index);
assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3));
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// remove the primary node of replica shard which is in init
ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0);
ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId());

ClusterState.Builder builder = ClusterState.builder(currentState);
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId()));
currentState = builder.build();
logger.info("removed the node {}", primaryShard.currentNodeId());
logger.info("shard {}", next);
ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute");
return state;
}

@Override
public void onFailure(String source, Exception e) {}
});
assertTrue(waitUntil(() -> {
ClusterState state = clusterService.state();
logger.info("current state {} ", state);
return clusterService.state().nodes().getSize() == 3;

}));

logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
blockReplicaStart.set(false);

clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState.Builder builder = ClusterState.builder(currentState);
final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index))
.state(IndexMetadata.State.OPEN)
.build();
builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metadata().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();

return allocationService.reroute(updatedState, "reroute");
}

@Override
public void onFailure(String source, Exception e) {}
});

ensureGreen(index);
}

AtomicBoolean blockReplicaStart = new AtomicBoolean(false);

private void blockShardStartedResponse(String master, ClusterService service) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> {

if (blockReplicaStart.get()) {
ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request;
String stringRep = req.toString();
logger.info("ShardStateAction.StartedShardEntry {}", stringRep);

String incomingRequest = req.toString();
Optional<ShardRouting> matchReplica = service.state()
.routingTable()
.allShardsSatisfyingPredicate(r -> !r.primary())
.getShardRoutings()
.stream()
.filter(r -> r.allocationId() != null)
.filter(r -> incomingRequest.contains(r.allocationId().getId()))
.findAny();

if (matchReplica.isPresent()) {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} else {
handler.messageReceived(request, channel, task);
}
} else {
handler.messageReceived(request, channel, task);
}
});
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,13 @@ private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integ
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
if (shardData.getValue() != null) {
ShardId shardId = shardData.getKey();
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
if (shardIdKey.get(shardId) != null) {// the response might be for shard which is no longer present in cache
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
Expand Down Expand Up @@ -51,13 +52,25 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator {
*/
public void processExistingRecoveries(RoutingAllocation allocation, List<List<ShardRouting>> shardBatches) {
List<Runnable> shardCancellationActions = new ArrayList<>();
Map<ShardId, List<ShardRouting>> initReplicasFromRouting = new HashMap<>();
allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).stream().filter(r -> !r.primary()).forEach(r -> {
initReplicasFromRouting.putIfAbsent(r.shardId(), new ArrayList<>());
initReplicasFromRouting.get(r.shardId()).add(r);
});

// iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch
for (List<ShardRouting> shardBatch : shardBatches) {
List<ShardRouting> eligibleShards = new ArrayList<>();
List<ShardRouting> ineligibleShards = new ArrayList<>();
// iterate over shards to check for match for each of those
for (ShardRouting shard : shardBatch) {
if (shard != null && !shard.primary()) {
// check if the shard is in Initializing state in RoutingTable
// as the batch is not refreshed yet
if (!initReplicasFromRouting.containsKey(shard.shardId())) {
logger.trace("skipping the shardRouting {} as the state is updated in routing table", shard);
continue;
}
// need to iterate over all the nodes to find matching shard
if (shouldSkipFetchForRecovery(shard)) {
// shard should just be skipped for fetchData, no need to remove from batch
Expand All @@ -72,11 +85,19 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<List<Sh
continue; // still fetching
}
for (ShardRouting shard : eligibleShards) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState);

Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(shard, allocation, nodeShardStores);
if (cancellationAction != null) {
shardCancellationActions.add(cancellationAction);
for (ShardRouting initShardsFromAllocation : initReplicasFromRouting.get(shard.shardId())) {
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(
initShardsFromAllocation,
shardState
);
Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(
initShardsFromAllocation,
allocation,
nodeShardStores
);
if (cancellationAction != null) {
shardCancellationActions.add(cancellationAction);
}
}
}
}
Expand Down

0 comments on commit bc1edaf

Please sign in to comment.