diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java new file mode 100644 index 0000000000000..a9a41be3674d7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/RemoteStoreRepositoryRegistrationHelper.java @@ -0,0 +1,229 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.repository; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; +import org.opensearch.repositories.RepositoriesService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * RemoteStore Repository Registration helper + */ +public class RemoteStoreRepositoryRegistrationHelper { + + public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "cluster.remote_store.segment"; + public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "cluster.remote_store.translog"; + public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type"; + public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.settings"; + + private static void validateAttributeNonNull(DiscoveryNode joiningNode, String attributeKey) { + String attributeValue = joiningNode.getAttributes().get(attributeKey); + if (attributeValue == null || attributeValue.isEmpty()) { + throw new IllegalStateException("joining node [" + joiningNode + "] doesn't have the node attribute [" + attributeKey + "]"); + } + } + + /** + * A node will be declared as remote store node if it has any of the remote store node attributes. + * The method validates that the joining node has any of the remote store node attributes or not. + * @param joiningNode + * @return boolean value on the basis of remote store node attributes. + */ + public static boolean isRemoteStoreNode(DiscoveryNode joiningNode) { + Map joiningNodeAttributes = joiningNode.getAttributes(); + String segmentRepositoryName = joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + String segmentRepositoryTypeAttributeKey = String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName); + String segmentRepositorySettingsAttributeKey = String.format( + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, + segmentRepositoryName + ); + String translogRepositoryName = joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + String translogRepositoryTypeAttributeKey = String.format( + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + translogRepositoryName + ); + String translogRepositorySettingsAttributeKey = String.format( + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, + translogRepositoryName + ); + + boolean remoteStoreNode = joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) != null + || joiningNodeAttributes.get(segmentRepositoryTypeAttributeKey) != null + || joiningNodeAttributes.get(segmentRepositorySettingsAttributeKey) != null + || joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY) != null + || joiningNodeAttributes.get(translogRepositoryTypeAttributeKey) != null + || joiningNodeAttributes.get(translogRepositorySettingsAttributeKey) != null; + + if (remoteStoreNode) { + validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + validateAttributeNonNull(joiningNode, segmentRepositoryTypeAttributeKey); + validateAttributeNonNull(joiningNode, segmentRepositorySettingsAttributeKey); + validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + validateAttributeNonNull(joiningNode, translogRepositoryTypeAttributeKey); + validateAttributeNonNull(joiningNode, translogRepositorySettingsAttributeKey); + } + + return remoteStoreNode; + } + + private static void compareAttribute(DiscoveryNode joiningNode, DiscoveryNode existingNode, String attributeKey) { + String joiningNodeAttribute = joiningNode.getAttributes().get(attributeKey); + String existingNodeAttribute = existingNode.getAttributes().get(attributeKey); + + if (existingNodeAttribute.equals(joiningNodeAttribute) == false) { + throw new IllegalStateException( + "joining node [" + + joiningNode + + "] has node attribute [" + + attributeKey + + "] value [" + + joiningNodeAttribute + + "] which is different than existing node [" + + existingNode + + "] value [" + + existingNodeAttribute + + "]" + ); + } + } + + // TODO: See a better way to compare the remote store node attributes. + public static void compareNodeAttributes(DiscoveryNode joiningNode, DiscoveryNode existingNode) { + String segmentRepositoryName = existingNode.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + String translogRepositoryName = existingNode.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + + compareAttribute(joiningNode, existingNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + compareAttribute( + joiningNode, + existingNode, + String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName) + ); + compareAttribute( + joiningNode, + existingNode, + String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName) + ); + compareAttribute(joiningNode, existingNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + compareAttribute( + joiningNode, + existingNode, + String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, translogRepositoryName) + ); + compareAttribute( + joiningNode, + existingNode, + String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, translogRepositoryName) + ); + } + + private static Settings buildSettings(String stringSettings) { + Settings.Builder settings = Settings.builder(); + + String[] stringKeyValue = stringSettings.split(","); + for (int i = 0; i < stringKeyValue.length; i++) { + String[] keyValue = stringKeyValue[i].split(":"); + settings.put(keyValue[0].trim(), keyValue[1].trim()); + } + + return settings.build(); + } + + // TODO: Add logic to mark these repository as System Repository once thats merged. + // Visible For testing + public static RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { + String type = node.getAttributes().get(String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); + String settings = node.getAttributes().get(String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, name)); + + validateAttributeNonNull(node, String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); + validateAttributeNonNull(node, String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, name)); + + return new RepositoryMetadata(name, type, buildSettings(settings)); + + } + + /** + * Validated or adds the remote store repository to cluster state if it doesn't exist. + * @param joiningNode + * @param currentState + * @return updated cluster state + */ + public static ClusterState validateOrAddRemoteStoreRepository(DiscoveryNode joiningNode, ClusterState currentState) { + List existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values()); + + ClusterState newState = ClusterState.builder(currentState).build(); + + // TODO: Mutating cluster state like this can be dangerous, this will need refactoring. + if (existingNodes.size() == 0) { + validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + + newState = updateClusterStateWithRepositoryMetadata( + currentState, + buildRepositoryMetadata(joiningNode, joiningNode.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY)) + ); + newState = updateClusterStateWithRepositoryMetadata( + newState, + buildRepositoryMetadata(joiningNode, joiningNode.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY)) + ); + return newState; + } else { + compareNodeAttributes(joiningNode, existingNodes.get(0)); + } + + return newState; + } + + private static ClusterState updateClusterStateWithRepositoryMetadata( + ClusterState currentState, + RepositoryMetadata newRepositoryMetadata + ) { + RepositoriesService.validate(newRepositoryMetadata.name()); + + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE); + if (repositories == null) { + repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata)); + } else { + List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); + + for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { + if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) { + if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) { + // Previous version is the same as this one no update is needed. + return new ClusterState.Builder(currentState).build(); + } else { + throw new IllegalStateException( + "new repository metadata [" + + newRepositoryMetadata + + "] supplied by joining node is different from existing repository metadata [" + + repositoryMetadata + + "]" + ); + } + } else { + repositoriesMetadata.add(repositoryMetadata); + } + } + repositoriesMetadata.add(newRepositoryMetadata); + repositories = new RepositoriesMetadata(repositoriesMetadata); + } + mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java new file mode 100644 index 0000000000000..a94e80592ef40 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/repository/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Restore remote store transport handler. */ +package org.opensearch.action.admin.cluster.remotestore.repository; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 0274073ddfdc7..9af5230b95b86 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -605,6 +605,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback // we are checking source node commission status here to reject any join request coming from a decommissioned node // even before executing the join task to fail fast JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata()); + + JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation); } sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback); } else { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 564819a70111d..1f18f8962b2dd 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -59,6 +59,9 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.compareNodeAttributes; +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreNode; +import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -140,6 +143,11 @@ public ClusterTasksResult execute(ClusterState currentState, List jo ClusterState.Builder newState; if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) { + DiscoveryNode joiningNode = joiningNodes.get(0).node(); + if (isRemoteStoreNode(joiningNode)) { + // TODO: Mutating cluster state like this can be dangerous, this will need refactoring. + currentState = validateOrAddRemoteStoreRepository(joiningNode, currentState); + } return results.successes(joiningNodes).build(currentState); } else if (currentNodes.getClusterManagerNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeClusterManagerTask)) { assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask) : "becoming a cluster-manager but election is not finished " @@ -160,6 +168,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo } DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); + ClusterState intermediateState; assert nodesBuilder.isLocalNodeElectedClusterManager(); @@ -176,6 +185,12 @@ public ClusterTasksResult execute(ClusterState currentState, List jo logger.debug("received a join request for an existing node [{}]", joinTask.node()); } else { final DiscoveryNode node = joinTask.node(); + if (isRemoteStoreNode(node)) { + // TODO: Mutating cluster state like this can be dangerous or anti pattern, this will need + // refactoring. + intermediateState = validateOrAddRemoteStoreRepository(node, newState.build()); + newState = ClusterState.builder(intermediateState); + } try { if (enforceMajorVersion) { ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion); @@ -187,6 +202,8 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // we have added the same check in handleJoinRequest method and adding it here as this method // would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness ensureNodeCommissioned(node, currentState.metadata()); + + ensureRemoteStoreNodesCompatibility(node, currentState); nodesBuilder.add(node); nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); @@ -422,6 +439,45 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) } } + /** + * The method ensures two conditions - + * 1. The joining node is remote store if it is joining a remote store cluster. + * 2. The joining node is non-remote store if it is joining a non-remote store cluster. + * A remote store node is the one which holds the all the remote store attributes and a remote store cluster is + * the one which has only homogeneous remote store nodes with same node attributes + * + * @param joiningNode + * @param currentState + */ + public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, ClusterState currentState) { + List existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values()); + + /** + * If there are no node in the cluster state we will No op the compatibility check as at this point we + * cannot determine if this is a remote store cluster or non-remote store cluster. + */ + if (existingNodes.size() == 0) { + return; + } + + /** + * TODO: The below check is valid till we support migration, once we start supporting migration a remote + * store node will be able to join a non remote store cluster and vice versa. #7986 + */ + if (isRemoteStoreNode(joiningNode)) { + if (isRemoteStoreNode(existingNodes.get(0))) { + DiscoveryNode existingNode = existingNodes.get(0); + compareNodeAttributes(joiningNode, existingNode); + } else { + throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"); + } + } else { + if (isRemoteStoreNode(existingNodes.get(0))) { + throw new IllegalStateException("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"); + } + } + } + public static Collection> addBuiltInJoinValidators( Collection> onJoinValidators ) { @@ -430,6 +486,7 @@ public static Collection> addBuiltInJoin ensureNodesCompatibility(node.getVersion(), state.getNodes()); ensureIndexCompatibility(node.getVersion(), state.getMetadata()); ensureNodeCommissioned(node, state.getMetadata()); + ensureRemoteStoreNodesCompatibility(node, state); }); validators.addAll(onJoinValidators); return Collections.unmodifiableCollection(validators); diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index f18dc63013abf..29778562b2efa 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -598,7 +598,7 @@ private Repository createRepository(RepositoryMetadata repositoryMetadata, Map JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joiningNode, currentState) + ); + assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster")); + } + + public void testJoinClusterWithRemoteStoreNodeJoiningRemoteStoreCluster() { + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joiningNode, currentState); + } + + public void testPreventJoinClusterWithRemoteStoreNodeWithDifferentAttributesJoiningRemoteStoreCluster() { + Map existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + existingNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + for (Map.Entry nodeAttribute : existingNodeAttributes.entrySet()) { + if (nodeAttribute.getKey() != REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY + && nodeAttribute.getKey() != REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY) { + remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue() + "-new"); + validateAttributes(remoteStoreNodeAttributes, nodeAttribute, currentState, existingNode); + remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue()); + } + } + } + + public void testPreventJoinClusterWithRemoteStoreNodeWithDifferentNameAttributesJoiningRemoteStoreCluster() { + Map existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + existingNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + for (Map.Entry nodeAttribute : existingNodeAttributes.entrySet()) { + if (REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY.equals(nodeAttribute.getKey())) { + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO + "new", TRANSLOG_REPO); + validateAttributes(remoteStoreNodeAttributes, nodeAttribute, currentState, existingNode); + } else if (REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY.equals(nodeAttribute.getKey())) { + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO + "new"); + validateAttributes(remoteStoreNodeAttributes, nodeAttribute, currentState, existingNode); + } + } + } + + public void testPreventJoinClusterWithNonRemoteStoreNodeJoiningRemoteStoreCluster() { + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(Collections.emptyMap()); + Exception e = assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joiningNode, currentState) + ); + assertTrue(e.getMessage().equals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster")); + } + + public void testPreventJoinClusterWithRemoteStoreNodeWithPartialAttributesJoiningRemoteStoreCluster() { + Map existingNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + final DiscoveryNode existingNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + existingNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .build(); + + for (Map.Entry nodeAttribute : existingNodeAttributes.entrySet()) { + remoteStoreNodeAttributes.put(nodeAttribute.getKey(), null); + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes); + Exception e = assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joiningNode, currentState) + ); + assertTrue( + e.getMessage().equals("joining node [" + joiningNode + "] doesn't have the node attribute [" + nodeAttribute.getKey() + "]") + ); + remoteStoreNodeAttributes.put(nodeAttribute.getKey(), nodeAttribute.getValue()); + } + } + + public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception { + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(clusterManagerNode, "_FINISH_ELECTION_")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2); + } + + public void testUpdatesClusterStateWithMultiNodeCluster() throws Exception { + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO); + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(clusterManagerNode, SEGMENT_REPO); + final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(clusterManagerNode, TRANSLOG_REPO); + List repositoriesMetadata = new ArrayList<>() { + { + add(segmentRepositoryMetadata); + add(translogRepositoryMetadata); + } + }; + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata))) + .build(); + + final DiscoveryNode joiningNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(joiningNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2); + } + + public void testUpdatesClusterStateWithSingleNodeClusterAndSameRepository() throws Exception { + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(COMMON_REPO, COMMON_REPO); + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(clusterManagerNode, "_FINISH_ELECTION_")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validateRepositoryMetadata(result.resultingState, clusterManagerNode, 1); + } + + public void testUpdatesClusterStateWithMultiNodeClusterAndSameRepository() throws Exception { + Map remoteStoreNodeAttributes = remoteStoreNodeAttributes(COMMON_REPO, COMMON_REPO); + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(clusterManagerNode, COMMON_REPO); + List repositoriesMetadata = new ArrayList<>() { + { + add(segmentRepositoryMetadata); + } + }; + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata))) + .build(); + + final DiscoveryNode joiningNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(joiningNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validateRepositoryMetadata(result.resultingState, clusterManagerNode, 1); + } + + private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories) + throws Exception { + + final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); + assertTrue(repositoriesMetadata.repositories().size() == expectedRepositories); + if (repositoriesMetadata.repositories().size() == 2) { + final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(existingNode, SEGMENT_REPO); + final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(existingNode, TRANSLOG_REPO); + for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { + if (repositoryMetadata.name().equals(segmentRepositoryMetadata.name())) { + assertTrue(segmentRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(segmentRepositoryMetadata.name())) { + assertTrue(translogRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } + } + } else if (repositoriesMetadata.repositories().size() == 1) { + final RepositoryMetadata repositoryMetadata = buildRepositoryMetadata(existingNode, COMMON_REPO); + assertTrue(repositoryMetadata.equalsIgnoreGenerations(repositoriesMetadata.repositories().get(0))); + } else { + throw new Exception("Stack overflow example: checkedExceptionThrower"); + } + } + private DiscoveryNode newDiscoveryNode(Map attributes) { return new DiscoveryNode( randomAlphaOfLength(10), @@ -337,4 +680,56 @@ private DiscoveryNode newDiscoveryNode(Map attributes) { Version.CURRENT ); } + + private static final String SEGMENT_REPO = "segment-repo"; + private static final String TRANSLOG_REPO = "translog-repo"; + private static final String COMMON_REPO = "remote-repo"; + + private Map remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) { + return new HashMap<>() { + { + put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName); + put(String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepoName), "s3"); + put( + String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, segmentRepoName), + "bucket:segment_bucket,base_path:/segment/path" + ); + put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName); + putIfAbsent(String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, translogRepoName), "s3"); + putIfAbsent( + String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, translogRepoName), + "bucket:translog_bucket,base_path:/translog/path" + ); + } + }; + } + + private void validateAttributes( + Map remoteStoreNodeAttributes, + Map.Entry existingNodeAttribute, + ClusterState currentState, + DiscoveryNode existingNode + ) { + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes); + Exception e = assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joiningNode, currentState) + ); + assertTrue( + e.getMessage() + .equals( + "joining node [" + + joiningNode + + "] has node attribute [" + + existingNodeAttribute.getKey() + + "] value [" + + remoteStoreNodeAttributes.get(existingNodeAttribute.getKey()) + + "] which is different than existing node [" + + existingNode + + "] value [" + + existingNodeAttribute.getValue() + + "]" + ) + ); + } }