Skip to content

Commit

Permalink
Merge branch 'main' into issue13171-main
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed May 2, 2024
2 parents d53a378 + ef841dd commit 28492b5
Show file tree
Hide file tree
Showing 7 changed files with 753 additions and 220 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ on:
- 'dependabot/**'
pull_request_target:
types: [opened, synchronize, reopened]
paths-ignore:
- 'release-notes/**'
- '.github/**'
- '**.md'

permissions:
contents: read # to fetch code (actions/checkout)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public void testRemotePrimaryRelocation() throws Exception {
int finalCurrentDoc1 = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);

// Change direction to remote store
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("--> relocating from {} to {} ", docRepNodes, remoteNode);
client().admin()
.cluster()
Expand Down Expand Up @@ -179,6 +183,10 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), "10s"))
.get();

// Change direction to remote store
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.MoveDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;

import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -47,7 +56,7 @@ protected void setClusterMode(String mode) {
}

// set the migration direction for cluster [remote_store, docrep, none]
public void setDirection(String direction) {
protected void setDirection(String direction) {
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), direction));
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
Expand Down Expand Up @@ -79,7 +88,7 @@ protected String allNodesExcept(String except) {
return exclude.toString();
}

// create a new test index
// create a new test index with un-allocated primary and no replicas
protected void prepareIndexWithoutReplica(Optional<String> name) {
String indexName = name.orElse(TEST_INDEX);
internalCluster().client()
Expand All @@ -96,6 +105,33 @@ protected void prepareIndexWithoutReplica(Optional<String> name) {
.actionGet();
}

// create a new test index with allocated primary and 1 unallocated replica
public void prepareIndexWithAllocatedPrimary(DiscoveryNode primaryShardNode, Optional<String> name) {
String indexName = name.orElse(TEST_INDEX);
internalCluster().client()
.admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put("index.routing.allocation.include._name", primaryShardNode.getName())
.put("index.routing.allocation.exclude._name", allNodesExcept(primaryShardNode.getName()))
)
.setWaitForActiveShards(ActiveShardCount.ONE)
.execute()
.actionGet();

ensureYellowAndNoInitializingShards(TEST_INDEX);

logger.info(" --> verify allocation of primary shard");
assertAllocation(true, primaryShardNode);

logger.info(" --> verify non-allocation of replica shard");
assertNonAllocation(false);
}

protected ShardRouting getShardRouting(boolean isPrimary) {
IndexShardRoutingTable table = internalCluster().client()
.admin()
Expand All @@ -110,6 +146,130 @@ protected ShardRouting getShardRouting(boolean isPrimary) {
return (isPrimary ? table.primaryShard() : table.replicaShards().get(0));
}

// obtain decision for allocation/relocation of a shard to a given node
protected Decision getDecisionForTargetNode(
DiscoveryNode targetNode,
boolean isPrimary,
boolean includeYesDecisions,
boolean isRelocation
) {
ClusterAllocationExplanation explanation = internalCluster().client()
.admin()
.cluster()
.prepareAllocationExplain()
.setIndex(TEST_INDEX)
.setShard(0)
.setPrimary(isPrimary)
.setIncludeYesDecisions(includeYesDecisions)
.get()
.getExplanation();

Decision requiredDecision = null;
List<NodeAllocationResult> nodeAllocationResults;
if (isRelocation) {
MoveDecision moveDecision = explanation.getShardAllocationDecision().getMoveDecision();
nodeAllocationResults = moveDecision.getNodeDecisions();
} else {
AllocateUnassignedDecision allocateUnassignedDecision = explanation.getShardAllocationDecision().getAllocateDecision();
nodeAllocationResults = allocateUnassignedDecision.getNodeDecisions();
}

for (NodeAllocationResult nodeAllocationResult : nodeAllocationResults) {
if (nodeAllocationResult.getNode().equals(targetNode)) {
for (Decision decision : nodeAllocationResult.getCanAllocateDecision().getDecisions()) {
if (decision.label().equals(NAME)) {
requiredDecision = decision;
break;
}
}
}
}

assertNotNull(requiredDecision);
return requiredDecision;
}

// get allocation and relocation decisions for all nodes
protected void excludeAllNodes() {
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(
Settings.builder()
.put("index.routing.allocation.include._name", "")
.put("index.routing.allocation.exclude._name", allNodesExcept(null))
)
.execute()
.actionGet()
);
}

protected void includeAllNodes() {
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings(TEST_INDEX)
.setSettings(
Settings.builder()
.put("index.routing.allocation.exclude._name", "")
.put("index.routing.allocation.include._name", allNodesExcept(null))
)
.execute()
.actionGet()
);
}

protected void attemptAllocation(@Nullable String targetNodeName) {
Settings.Builder settingsBuilder;
if (targetNodeName != null) {
settingsBuilder = Settings.builder()
.put("index.routing.allocation.include._name", targetNodeName)
.put("index.routing.allocation.exclude._name", allNodesExcept(targetNodeName));
} else {
String clusterManagerNodeName = internalCluster().client()
.admin()
.cluster()
.prepareState()
.execute()
.actionGet()
.getState()
.getNodes()
.getClusterManagerNode()
.getName();
// to allocate freely among all nodes other than cluster-manager node
settingsBuilder = Settings.builder()
.put("index.routing.allocation.include._name", allNodesExcept(clusterManagerNodeName))
.put("index.routing.allocation.exclude._name", clusterManagerNodeName);
}
internalCluster().client().admin().indices().prepareUpdateSettings(TEST_INDEX).setSettings(settingsBuilder).execute().actionGet();
}

// verify that shard does not exist at targetNode
protected void assertNonAllocation(boolean isPrimary) {
if (isPrimary) {
ensureRed(TEST_INDEX);
} else {
ensureYellowAndNoInitializingShards(TEST_INDEX);
}
ShardRouting shardRouting = getShardRouting(isPrimary);
assertFalse(shardRouting.active());
assertNull(shardRouting.currentNodeId());
assertEquals(ShardRoutingState.UNASSIGNED, shardRouting.state());
}

// verify that shard exists at targetNode
protected void assertAllocation(boolean isPrimary, @Nullable DiscoveryNode targetNode) {
ShardRouting shardRouting = getShardRouting(isPrimary);
assertTrue(shardRouting.active());
assertNotNull(shardRouting.currentNodeId());
if (targetNode != null) {
assertEquals(targetNode.getId(), shardRouting.currentNodeId());
}
}

// create a snapshot
public static SnapshotInfo createSnapshot(String snapshotRepoName, String snapshotName, String... indices) {
SnapshotInfo snapshotInfo = internalCluster().client()
Expand Down Expand Up @@ -194,4 +354,5 @@ public static void assertRemoteStoreBackedIndex(String indexName) {
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,7 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteDataAttributePresent(settings) == false
if ((isRemoteDataAttributePresent(settings) == false && isMigratingToRemoteStore(clusterSettings) == false)
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,32 +95,38 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
);
}

if (migrationDirection.equals(Direction.REMOTE_STORE) == false) {
// docrep migration direction is currently not supported
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
boolean remoteSettingsBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());

if (migrationDirection.equals(Direction.NONE)) {
// remote backed indices on docrep nodes and non remote backed indices on remote nodes are not allowed
boolean isNoDecision = remoteSettingsBackedIndex ^ targetNode.isRemoteStoreNode();
String reason = String.format(Locale.ROOT, " for %sremote store backed index", remoteSettingsBackedIndex ? "" : "non ");
return allocation.decision(
Decision.YES,
isNoDecision ? Decision.NO : Decision.YES,
NAME,
getDecisionDetails(true, shardRouting, targetNode, " for non remote_store direction")
getDecisionDetails(!isNoDecision, shardRouting, targetNode, reason)
);
}

// check for remote store backed indices
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
boolean remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
if (remoteStoreBackedIndex && targetNode.isRemoteStoreNode() == false) {
// allocations and relocations must be to a remote node
String reason = String.format(
Locale.ROOT,
" because a remote store backed index's shard copy can only be %s to a remote node",
((shardRouting.assignedToNode() == false) ? "allocated" : "relocated")
);
return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, shardRouting, targetNode, reason));
}
} else if (migrationDirection.equals(Direction.DOCREP)) {
// docrep migration direction is currently not supported
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, shardRouting, targetNode, " for DOCREP direction"));
} else {
// check for remote store backed indices
if (remoteSettingsBackedIndex && targetNode.isRemoteStoreNode() == false) {
// allocations and relocations must be to a remote node
String reason = String.format(
Locale.ROOT,
" because a remote store backed index's shard copy can only be %s to a remote node",
((shardRouting.assignedToNode() == false) ? "allocated" : "relocated")
);
return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, shardRouting, targetNode, reason));
}

if (shardRouting.primary()) {
return primaryShardDecision(shardRouting, targetNode, allocation);
if (shardRouting.primary()) {
return primaryShardDecision(shardRouting, targetNode, allocation);
}
return replicaShardDecision(shardRouting, targetNode, allocation);
}
return replicaShardDecision(shardRouting, targetNode, allocation);
}

// handle scenarios for allocation of a new shard's primary copy
Expand Down
Loading

0 comments on commit 28492b5

Please sign in to comment.