From 583671991be03d5ac724cd73bb2002b91ad86572 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Tue, 3 Dec 2024 13:27:47 +0530 Subject: [PATCH] Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state Signed-off-by: Pranshu Shukla --- .../discovery/DiscoveryDisruptionIT.java | 93 +++++++++++++++++++ .../remotestore/RemoteStoreNodeService.java | 15 +-- .../repositories/RepositoriesService.java | 6 ++ .../opensearch/test/InternalTestCluster.java | 27 +++++- .../test/OpenSearchIntegTestCase.java | 38 ++++++++ .../test/transport/MockTransportService.java | 30 ++++++ 6 files changed, 199 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java index 70124c8c46700..4938617a312fe 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java @@ -32,13 +32,19 @@ package org.opensearch.discovery; +import org.junit.Assert; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.coordination.JoinHelper; import org.opensearch.cluster.coordination.PublicationTransportHandler; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.disruption.ServiceDisruptionScheme; @@ -50,6 +56,11 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import java.util.Random; +import java.util.List; +import java.util.Arrays; + import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; @@ -250,4 +261,86 @@ public void testNodeNotReachableFromClusterManager() throws Exception { ensureStableCluster(3); } + /** + * Test Repositories Configured Node Join Commit failures. + */ + public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception { + final String remoteStateRepoName = "remote-state-repo"; + final String remoteRoutingTableRepoName = "routing-table-repo"; + + + Settings remotePublicationSettings = buildRemotePublicationNodeAttributes( + remoteStateRepoName, + ReloadableFsRepository.TYPE, + remoteRoutingTableRepoName, + ReloadableFsRepository.TYPE + ); + internalCluster().startClusterManagerOnlyNodes(3); + internalCluster().startDataOnlyNodes(3); + + String clusterManagerNode = internalCluster().getClusterManagerName(); + List nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames()).filter(node -> !node.equals(clusterManagerNode)).collect(Collectors.toList()); + + ensureStableCluster(6); + + MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + clusterManagerNode + ); + logger.info("Blocking Cluster Manager Commit Request on all nodes"); + nonClusterManagerNodes.forEach( + node -> { + TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node); + clusterManagerTransportService.addOpenSearchFailureException( + targetTransportService, + new FailedToCommitClusterStateException("Blocking Commit"), + PublicationTransportHandler.COMMIT_STATE_ACTION_NAME + ); + } + ); + + logger.info("Starting Node with remote publication settings"); + internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE); + + logger.info("Stopping current Cluster Manager"); + internalCluster().stopCurrentClusterManagerNode(); + ensureStableCluster(6); + + Random random = new Random(); + String randomNode = nonClusterManagerNodes.get(random.nextInt(nonClusterManagerNodes.size())); + + RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode).state().metadata().custom(RepositoriesMetadata.TYPE); + + Boolean isRemoteStateRepoConfigured = Boolean.FALSE; + Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE; + + for (RepositoryMetadata repo : repositoriesMetadata.repositories()) { + if (repo.name().equals(remoteStateRepoName)) { + isRemoteStateRepoConfigured = Boolean.TRUE; + } else if (repo.name().equals(remoteRoutingTableRepoName)) { + isRemoteRoutingTableRepoConfigured = Boolean.TRUE; + } + } + + Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured); + Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured); + + isRemoteStateRepoConfigured = Boolean.FALSE; + isRemoteRoutingTableRepoConfigured = Boolean.FALSE; + + RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode); + + if (repositoriesService.isRepositoryPresent(remoteStateRepoName)) { + isRemoteStateRepoConfigured = Boolean.TRUE; + } + if (repositoriesService.isRepositoryPresent(remoteRoutingTableRepoName)) { + isRemoteRoutingTableRepoConfigured = Boolean.TRUE; + } + + Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured); + Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured); + + logger.info("Stopping current Cluster Manager"); + } + } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index c1c041ce01198..52d43a4ec70b1 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -23,12 +23,7 @@ import org.opensearch.repositories.RepositoryException; import org.opensearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import java.util.*; import java.util.function.Supplier; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; @@ -183,6 +178,14 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode boolean repositoryAlreadyPresent = false; for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) { if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) { + // This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded but + // the commit operation failed, the cluster-state may have the repository metadata which is not applied into the + // repository service. This may lead to assertion failures down the line. + if (!repositoriesService.get().isRepositoryPresent(newRepositoryMetadata.name())) { + logger.warn("remote repository [{}] in cluster-state but repository-service but not present " + + "in repository-service, skipping checks", newRepositoryMetadata.name()); + break; + } try { // This will help in handling two scenarios - // 1. When a fresh cluster is formed and a node tries to join the cluster, the repository diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 9aec81536dbd0..13fb6ed8455d7 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -578,6 +578,10 @@ public Repository repository(String repositoryName) { throw new RepositoryMissingException(repositoryName); } + public Boolean isRepositoryPresent(final String repositoryName) { + return Objects.nonNull(repositories.get(repositoryName)); + } + public List repositoriesStats() { List activeRepoStats = getRepositoryStatsForActiveRepositories(); return activeRepoStats; @@ -904,6 +908,8 @@ public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMe Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings(); Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings(); + assert Objects.nonNull(repository) : String.format("repository [%s] not present in RepositoryService", currentRepositoryMetadata.name()); + List restrictedSettings = repository.getRestrictedSystemRepositorySettings() .stream() .map(setting -> setting.getKey()) diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index fa5fb736f518f..168a51adf11f6 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2322,10 +2322,25 @@ public List startNodes(int numOfNodes, Settings settings) { return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0])); } + + /** + * Starts multiple nodes with the given settings and returns their names + */ + public List startNodes(int numOfNodes, Settings settings, Boolean ignoreNodeJoin) { + return startNodes(ignoreNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0])); + } + /** * Starts multiple nodes with the given settings and returns their names */ public synchronized List startNodes(Settings... extraSettings) { + return startNodes(false, extraSettings); + } + + /** + * Starts multiple nodes with the given settings and returns their names + */ + public synchronized List startNodes(Boolean ignoreNodeJoin, Settings... extraSettings) { final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count()); final int defaultMinClusterManagerNodes; if (autoManageClusterManagerNodes) { @@ -2339,9 +2354,9 @@ public synchronized List startNodes(Settings... extraSettings) { && prevClusterManagerCount == 0 && newClusterManagerCount > 0 && Arrays.stream(extraSettings) - .allMatch(s -> DiscoveryNode.isClusterManagerNode(s) == false || ZEN2_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(s))) - ? RandomNumbers.randomIntBetween(random, 0, newClusterManagerCount - 1) - : -1; + .allMatch(s -> DiscoveryNode.isClusterManagerNode(s) == false || ZEN2_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(s))) + ? RandomNumbers.randomIntBetween(random, 0, newClusterManagerCount - 1) + : -1; final int numOfNodes = extraSettings.length; final int firstNodeId = nextNodeId.getAndIncrement(); @@ -2377,7 +2392,7 @@ public synchronized List startNodes(Settings... extraSettings) { nodes.add(nodeAndClient); } startAndPublishNodesAndClients(nodes); - if (autoManageClusterManagerNodes) { + if (autoManageClusterManagerNodes && !ignoreNodeJoin) { validateClusterFormed(); } return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); @@ -2422,6 +2437,10 @@ public List startDataOnlyNodes(int numNodes, Settings settings) { return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build()); } + public List startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) { + return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin); + } + public List startSearchOnlyNodes(int numNodes) { return startSearchOnlyNodes(numNodes, Settings.EMPTY); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 1ee856d3092f0..c76da83df6cc9 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -181,6 +181,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; +import reactor.util.annotation.NonNull; import java.io.IOException; import java.lang.Runtime.Version; @@ -2915,6 +2916,43 @@ protected static Settings buildRemoteStoreNodeAttributes( return settings.build(); } + protected Settings buildRemotePublicationNodeAttributes( + @NonNull String remoteStateRepoName, + @NonNull String remoteStateRepoType, + @NonNull String routingTableRepoName, + @NonNull String routingTableRepoType + ) { + String remoteStateRepositoryTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + remoteStateRepoName + ); + String routingTableRepositoryTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + routingTableRepoName + ); + String remoteStateRepositorySettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + remoteStateRepoName + ); + String routingTableRepositorySettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + routingTableRepoName + ); + + return Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, remoteStateRepoName) + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName) + .put(remoteStateRepositoryTypeAttributeKey, remoteStateRepoType) + .put(routingTableRepositoryTypeAttributeKey, routingTableRepoType) + .put(remoteStateRepositorySettingsAttributeKeyPrefix+"location", randomRepoPath().toAbsolutePath()) + .put(routingTableRepositorySettingsAttributeKeyPrefix+"location", randomRepoPath().toAbsolutePath()) + .build(); + } + public static String resolvePath(IndexId indexId, String shardId) { PathType pathType = PathType.fromCode(indexId.getShardPathType()); RemoteStorePathStrategy.SnapshotShardPathInput shardPathInput = new RemoteStorePathStrategy.SnapshotShardPathInput.Builder() diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 6bf5381b62cc9..89e90bb044cc4 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.node.DiscoveryNode; @@ -376,6 +377,35 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final }); } + /** + * Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions + */ + public void addOpenSearchFailureException(TransportService transportService, final OpenSearchException exception, final String... blockedActions) { + addOpenSearchFailureException(transportService, exception, new HashSet<>(Arrays.asList(blockedActions))); + } + + /** + * Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions + */ + public void addOpenSearchFailureException(TransportService transportService, OpenSearchException exception, final Set blockedActions) { + for (TransportAddress transportAddress : extractTransportAddresses(transportService)) { + addOpenSearchFailureException(transportAddress, exception, blockedActions); + } + } + + /** + * Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions + */ + public void addOpenSearchFailureException(TransportAddress transportAddress, OpenSearchException exception, final Set blockedActions) { + transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> { + if (blockedActions.contains(action)) { + logger.info("--> preventing {} request", action); + throw exception; + } + connection.sendRequest(requestId, action, request, options); + }); + } + /** * Adds a rule that will cause ignores each send request, simulating an unresponsive node * and failing to connect once the rule was added.