Skip to content

Commit

Permalink
Handle routingTable repo check in JoinTaskExecutor
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <[email protected]>
  • Loading branch information
Himshikha Gupta committed May 24, 2024
1 parent 89c33bc commit df64105
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,23 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
assert existingNodes.isEmpty() == false;

CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
List<String> reposToSkip = new ArrayList<>();
// Skip checking for remote routing table repo if feature is not enabled.
if(!RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(metadata.settings())) {
String joiningNodeRepoName = joiningNode.getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
String existingNodeRepoName = existingNodes.get(0).getAttributes().get(RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY);
if(joiningNodeRepoName != null){
reposToSkip.add(joiningNodeRepoName);
}
if(existingNodeRepoName != null) {
reposToSkip.add(existingNodeRepoName);
}
}
if (STRICT.equals(remoteStoreCompatibilityMode)) {

DiscoveryNode existingNode = existingNodes.get(0);
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, reposToSkip);
} else {
if (existingNode.isRemoteStoreNode()) {
throw new IllegalStateException(
Expand All @@ -538,18 +550,18 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
}
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, reposToSkip));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, List<String> reposToSkip) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
if (existingRemoteStoreNodeAttribute.equalsWithRepoSkip(joiningRemoteStoreNodeAttribute, reposToSkip) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

Expand Down Expand Up @@ -164,6 +165,30 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) {
return true;
}

/**
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
* This will support
* @param other other repositories metadata
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
*/
public boolean equalsIgnoreGenerationsWithRepoSkip(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream().filter(repo-> !reposToSkip.contains(repo.name())).collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream().filter(repo-> !reposToSkip.contains(repo.name())).collect(Collectors.toList());
if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
for (int i = 0; i < currentRepositories.size(); i++) {
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
return repositories.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,15 @@ public class RemoteRoutingTableService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;

public RemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings
Settings settings
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.clusterSettings = clusterSettings;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public RemoteClusterStateService(
this.indexMetadataUploadListeners = indexMetadataUploadListeners;

if (isRemoteRoutingTableEnabled(settings)) {
this.remoteRoutingTableService = new RemoteRoutingTableService(repositoriesService, settings, clusterSettings);
this.remoteRoutingTableService = new RemoteRoutingTableService(repositoriesService, settings);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na

// Repository metadata built here will always be for a system repository.
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);
settings.put("repositories.fs.location", "./fsdata");

return new RepositoryMetadata(name, type, settings.build(), cryptoMetadata);
}
Expand Down Expand Up @@ -197,7 +198,7 @@ public static boolean isRemoteStoreClusterStateEnabled(Settings settings) {
&& isRemoteClusterStateAttributePresent(settings);
}

public static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
return settings.getByPrefix(Node.NODE_ATTRIBUTES.getKey() + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
.isEmpty() == false;
}
Expand Down Expand Up @@ -252,6 +253,21 @@ public int hashCode() {
return hashCode;
}

/**
* Checks if 2 instances are equal, with option to skip check for a list of repos.
*
* @param o other instance
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances are equal, not including the repositories in both instances if they are part of reposToSkip.
*/
public boolean equalsWithRepoSkip(Object o, List<String> reposToSkip) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RemoteStoreNodeAttribute that = (RemoteStoreNodeAttribute) o;
return this.getRepositoriesMetadata().equalsIgnoreGenerationsWithRepoSkip(that.getRepositoriesMetadata(), reposToSkip);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -72,6 +75,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -944,6 +948,99 @@ public void testNodeJoinInMixedMode() {
JoinTaskExecutor.ensureNodesCompatibility(joiningNode2, currentNodes, metadata);
}

public void testRemoteRoutingTableDisabledNodeJoin() {

final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();

DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testRemoteRoutingTableDisabledNodeJoinRepoPresentInJoiningNode() {
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO),
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.build();

Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
DiscoveryNode joiningNode = newDiscoveryNode(attr);
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testRemoteRoutingTableEnabledNodeJoinRepoPresentInExistingNode() {
Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
attr,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
final Settings settings = Settings.builder()
.put(RemoteRoutingTableService.REMOTE_ROUTING_TABLE_ENABLED_SETTING.getKey(), "true")
.put(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO)
.put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true")
.build();
final Settings nodeSettings = Settings.builder().put(FeatureFlags.REMOTE_ROUTING_TABLE_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Metadata metadata = Metadata.builder().persistentSettings(settings).build();
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.metadata(metadata)
.build();

DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO));
assertThrows(
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
}

public void testRemoteRoutingTableEnabledNodeJoinRepoPresentInBothNode() {
Map<String, String> attr = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO);
attr.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
final DiscoveryNode existingNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
attr,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
final Settings settings = Settings.builder()
.put(RemoteRoutingTableService.REMOTE_ROUTING_TABLE_ENABLED_SETTING.getKey(), "true")
.put(Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO)
.put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true")
.build();
final Settings nodeSettings = Settings.builder().put(FeatureFlags.REMOTE_ROUTING_TABLE_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Metadata metadata = Metadata.builder().persistentSettings(settings).build();
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build())
.metadata(metadata)
.build();

DiscoveryNode joiningNode = newDiscoveryNode(attr);
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
throws Exception {

Expand Down Expand Up @@ -985,6 +1082,7 @@ private DiscoveryNode newDiscoveryNode(Map<String, String> attributes) {
private static final String TRANSLOG_REPO = "translog-repo";
private static final String CLUSTER_STATE_REPO = "cluster-state-repo";
private static final String COMMON_REPO = "remote-repo";
private static final String ROUTING_TABLE_REPO = "routing-table-repo";

private Map<String, String> remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) {
return remoteStoreNodeAttributes(segmentRepoName, translogRepoName, CLUSTER_STATE_REPO);
Expand Down Expand Up @@ -1049,6 +1147,28 @@ private Map<String, String> remoteStateNodeAttributes(String clusterStateRepo) {
};
}

private Map<String, String> remoteRoutingTableAttributes(String repoName) {
String routingTableRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
repoName
);
String routingTableRepositorySettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
repoName
);

return new HashMap<>() {
{
put(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, repoName);
putIfAbsent(routingTableRepositoryTypeAttributeKey, "s3");
putIfAbsent(routingTableRepositorySettingsAttributeKeyPrefix + "bucket", "state_bucket");
putIfAbsent(routingTableRepositorySettingsAttributeKeyPrefix + "base_path", "/state/path");
}
};
}

private void validateAttributes(Map<String, String> remoteStoreNodeAttributes, ClusterState currentState, DiscoveryNode existingNode) {
DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes);
Exception e = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void setup() {
Settings nodeSettings = Settings.builder().put(REMOTE_ROUTING_TABLE_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

remoteRoutingTableService = new RemoteRoutingTableService(repositoriesServiceSupplier, settings, clusterSettings);
remoteRoutingTableService = new RemoteRoutingTableService(repositoriesServiceSupplier, settings);
}

@After
Expand All @@ -69,8 +69,7 @@ public void testFailInitializationWhenRemoteRoutingDisabled() {
AssertionError.class,
() -> new RemoteRoutingTableService(
repositoriesServiceSupplier,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
settings
)
);
}
Expand Down
Loading

0 comments on commit df64105

Please sign in to comment.