From d4f0c2cfdb43c8ad9a643f8b5c0bd901be18125d Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 6 Sep 2024 14:38:21 +0530 Subject: [PATCH] [Remote State] Upload incremental cluster state on master re-election (#15145) * Upload incremental cluster state on master re-election Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../remote/RemoteStatePublicationIT.java | 60 ++- .../remotestore/BaseRemoteStoreRestoreIT.java | 25 -- .../RemoteStoreBaseIntegTestCase.java | 17 +- .../coordination/CoordinationState.java | 72 +++- .../PublicationTransportHandler.java | 17 +- .../RemoteStatePublishRequest.java | 51 +++ .../opensearch/gateway/GatewayMetaState.java | 54 ++- .../remote/RemoteClusterStateService.java | 44 +- .../coordination/CoordinationStateTests.java | 386 ++++++++++++++++-- .../GatewayMetaStatePersistedStateTests.java | 123 +++++- .../RemoteClusterStateServiceTests.java | 8 +- 12 files changed, 732 insertions(+), 126 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 26358d269a2ea..deb9b7d0bcad7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471)) - Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478)) - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) +- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145)) - Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index f43d16f7aef43..faab3645ae894 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -20,6 +20,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; +import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; @@ -43,6 +44,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,7 +76,7 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase { private static final String REMOTE_STATE_PREFIX = "!"; private static final String REMOTE_ROUTING_PREFIX = "_"; private boolean isRemoteStateEnabled = true; - private String isRemotePublicationEnabled = "true"; + private boolean isRemotePublicationEnabled = true; private boolean hasRemoteStateCharPrefix; private boolean hasRemoteRoutingCharPrefix; @@ -82,7 +84,7 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase { public void setup() { asyncUploadMockFsRepo = false; isRemoteStateEnabled = true; - isRemotePublicationEnabled = "true"; + isRemotePublicationEnabled = true; hasRemoteStateCharPrefix = randomBoolean(); hasRemoteRoutingCharPrefix = randomBoolean(); } @@ -112,6 +114,7 @@ protected Settings nodeSettings(int nodeOrdinal) { RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE ) + .put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled) .put( RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(), hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : "" @@ -341,6 +344,59 @@ public void doAfterNodes(int n, Client client) { }); } + public void testMasterReElectionUsesIncrementalUpload() throws IOException { + prepareCluster(3, 2, INDEX_NAME, 1, 1); + PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class); + GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry + .getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); + ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest(); + // force elected master to step down + internalCluster().stopCurrentClusterManagerNode(); + ensureStableCluster(4); + + persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class); + CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest(); + + // coordination metadata is updated, it will be unequal + assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata()); + // all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected + assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID()); + assertEquals(manifest.getIndices(), manifestAfterElection.getIndices()); + assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata()); + assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata()); + assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap()); + assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion()); + assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting()); + } + + public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException { + prepareCluster(3, 2, INDEX_NAME, 1, 2); + ensureStableCluster(5); + ensureGreen(INDEX_NAME); + // add two new nodes to the cluster, to update the voting config + internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY); + ensureStableCluster(7); + + internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> { + CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.LOCAL + ); + CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + if (remoteState != null) { + assertEquals( + localState.getLastAcceptedState().getLastCommittedConfiguration(), + remoteState.getLastAcceptedState().getLastCommittedConfiguration() + ); + assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size()); + } + }); + } + private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { // assert cluster state stats for data node DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java index 280fd13f0fdcf..e8df2c8686610 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map indexStats, String indexName protected void verifyRestoredData(Map indexStats, String indexName) throws Exception { verifyRestoredData(indexStats, indexName, true); } - - public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY); - } - - public void prepareCluster( - int numClusterManagerNodes, - int numDataOnlyNodes, - String indices, - int replicaCount, - int shardCount, - Settings settings - ) { - prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } - } - - public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); - internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index ba06bb463e5a8..bcb0d54c0a25c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) { } protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); - internalCluster().startDataOnlyNodes(numDataOnlyNodes); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } + prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY); } protected void prepareCluster( @@ -368,11 +362,16 @@ protected void prepareCluster( int shardCount, Settings settings ) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); - internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); + prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings); for (String index : indices.split(",")) { createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); ensureGreen(index); } } + + protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); + internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9c883175e3ee0..9cffc7051d756 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.Closeable; import java.io.IOException; @@ -104,6 +105,7 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); + // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && REMOTE_PUBLICATION_SETTING.get(settings) && localNode.isRemoteStatePublicationEnabled(); @@ -459,6 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.term() ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); + if (shouldUpdateRemotePersistedState(publishRequest)) { + updateRemotePersistedStateOnPublishRequest(publishRequest); + } assert getLastAcceptedState() == clusterState; return new PublishResponse(clusterState.term(), clusterState.version()); @@ -571,6 +576,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) { ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted(); + if (shouldCommitRemotePersistedState()) { + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); + } assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); } @@ -616,6 +624,33 @@ public void close() throws IOException { IOUtils.close(persistedStateRegistry); } + private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) { + return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null + && publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false; + } + + private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) { + if (publishRequest instanceof RemoteStatePublishRequest) { + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState()); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) + .setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest()); + } else { + // We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null); + } + } + + private boolean shouldCommitRemotePersistedState() { + return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null + && persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) + .getLastAcceptedState() + .getNodes() + .isLocalNodeElectedClusterManager() == false + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; + } + /** * Pluggable persistence layer for {@link CoordinationState}. * @@ -653,6 +688,22 @@ public interface PersistedState extends Closeable { */ PersistedStateStats getStats(); + /** + * Returns the last accepted {@link ClusterMetadataManifest}. + * + * @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest + * has been accepted yet. + */ + default ClusterMetadataManifest getLastAcceptedManifest() { + // return null by default, this method needs to be overridden wherever required + return null; + } + + /** + * Sets the last accepted {@link ClusterMetadataManifest}. + */ + default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {} + /** * Marks the last accepted cluster state as committed. * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, @@ -661,14 +712,7 @@ public interface PersistedState extends Closeable { */ default void markLastAcceptedStateAsCommitted() { final ClusterState lastAcceptedState = getLastAcceptedState(); - Metadata.Builder metadataBuilder = null; - if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { - final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata()) - .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) - .build(); - metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); - metadataBuilder.coordinationMetadata(coordinationMetadata); - } + Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState); // if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet, // the cluster uuid might not been known yet. assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false @@ -693,6 +737,18 @@ default void markLastAcceptedStateAsCommitted() { } } + default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) { + Metadata.Builder metadataBuilder = null; + if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata()) + .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) + .build(); + metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + metadataBuilder.coordinationMetadata(coordinationMetadata); + } + return metadataBuilder; + } + default void close() throws IOException {} } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index ca36011b3a0e9..cdf331b7bb577 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -199,7 +199,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); - final PublishWithJoinResponse response = acceptState(incomingState); + final PublishWithJoinResponse response = acceptState(incomingState, null); lastSeenClusterState.set(incomingState); return response; } else { @@ -230,7 +230,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque incomingState.stateUUID(), request.bytes().length() ); - final PublishWithJoinResponse response = acceptState(incomingState); + final PublishWithJoinResponse response = acceptState(incomingState, null); lastSeenClusterState.compareAndSet(lastSeen, incomingState); return response; } @@ -281,7 +281,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest true ); fullClusterStateReceivedCount.incrementAndGet(); - final PublishWithJoinResponse response = acceptState(clusterState); + final PublishWithJoinResponse response = acceptState(clusterState, manifest); lastSeenClusterState.set(clusterState); return response; } else { @@ -300,7 +300,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest transportService.getLocalNode().getId() ); compatibleClusterStateDiffReceivedCount.incrementAndGet(); - final PublishWithJoinResponse response = acceptState(clusterState); + final PublishWithJoinResponse response = acceptState(clusterState, manifest); lastSeenClusterState.compareAndSet(lastSeen, clusterState); return response; } @@ -314,7 +314,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest } } - private PublishWithJoinResponse acceptState(ClusterState incomingState) { + private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) { // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation) if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) { final PublishRequest publishRequest = currentPublishRequestToSelf.get(); @@ -324,6 +324,9 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(publishRequest); } } + if (manifest != null) { + return handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest)); + } return handlePublishRequest.apply(new PublishRequest(incomingState)); } @@ -539,7 +542,7 @@ public String executor() { } public void sendClusterState(DiscoveryNode destination, ActionListener listener) { - logger.info("sending cluster state over transport to node: {}", destination.getName()); + logger.debug("sending cluster state over transport to node: {}", destination.getName()); if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); sendFullClusterState(destination, listener); @@ -639,7 +642,7 @@ public class RemotePublicationContext extends PublicationContext { @Override public void sendClusterState(final DiscoveryNode destination, final ActionListener listener) { try { - logger.info("sending remote cluster state to node: {}", destination.getName()); + logger.debug("sending remote cluster state to node: {}", destination.getName()); final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) .getLastUploadedManifestFile(); final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java new file mode 100644 index 0000000000000..5667e6d67d062 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; + +import java.util.Objects; + +/** + * PublishRequest created by downloading the accepted {@link ClusterState} from Remote Store, using the published {@link ClusterMetadataManifest} + * + * @opensearch.internal + */ +public class RemoteStatePublishRequest extends PublishRequest { + private final ClusterMetadataManifest manifest; + + public RemoteStatePublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) { + super(acceptedState); + this.manifest = acceptedManifest; + } + + public ClusterMetadataManifest getAcceptedManifest() { + return manifest; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + RemoteStatePublishRequest that = (RemoteStatePublishRequest) o; + return Objects.equals(manifest, that.manifest); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), manifest); + } + + @Override + public String toString() { + return "RemoteStatePublishRequest{" + super.toString() + "manifest=" + manifest + "} "; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index bd56c9e1757c6..b3836edcd7d6c 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -697,8 +697,18 @@ public String getLastUploadedManifestFile() { return lastUploadedManifestFile; } + @Override + public ClusterMetadataManifest getLastAcceptedManifest() { + return lastAcceptedManifest; + } + @Override public void setLastAcceptedState(ClusterState clusterState) { + // for non leader node, update the lastAcceptedClusterState + if (clusterState == null || clusterState.getNodes().isLocalNodeElectedClusterManager() == false) { + lastAcceptedState = clusterState; + return; + } try { final RemoteClusterStateManifestInfo manifestDetails; // Decide the codec version @@ -735,7 +745,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == } assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; - lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); + setLastAcceptedManifest(manifestDetails.getClusterMetadataManifest()); lastAcceptedState = clusterState; lastUploadedManifestFile = manifestDetails.getManifestFileName(); } catch (Exception e) { @@ -744,6 +754,11 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest( } } + @Override + public void setLastAcceptedManifest(ClusterMetadataManifest manifest) { + this.lastAcceptedManifest = manifest; + } + @Override public PersistedStateStats getStats() { return remoteClusterStateService.getUploadStats(); @@ -767,7 +782,7 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState, int codec assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion; if (lastAcceptedState == null || lastAcceptedManifest == null - || lastAcceptedState.term() != clusterState.term() + || (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term()) || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT || lastAcceptedManifest.getCodecVersion() != codecVersion) { return true; @@ -781,19 +796,32 @@ public void markLastAcceptedStateAsCommitted() { assert lastAcceptedState != null : "Last accepted state is not present"; assert lastAcceptedManifest != null : "Last accepted manifest is not present"; ClusterState clusterState = lastAcceptedState; - if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false - && lastAcceptedState.metadata().clusterUUIDCommitted() == false) { + boolean shouldCommitVotingConfig = shouldCommitVotingConfig(); + boolean isClusterUUIDUnknown = lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID); + boolean isClusterUUIDCommitted = lastAcceptedState.metadata().clusterUUIDCommitted(); + if (shouldCommitVotingConfig || (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false)) { Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); - metadataBuilder.clusterUUIDCommitted(true); + if (shouldCommitVotingConfig) { + metadataBuilder = commitVotingConfiguration(lastAcceptedState); + } + if (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false) { + metadataBuilder.clusterUUIDCommitted(true); + } clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( - clusterState, - lastAcceptedManifest - ); - lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest(); + if (clusterState.getNodes().isLocalNodeElectedClusterManager()) { + final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( + clusterState, + lastAcceptedManifest, + shouldCommitVotingConfig + ); + assert committedManifestDetails != null; + setLastAcceptedManifest(committedManifestDetails.getClusterMetadataManifest()); + lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); + } else { + setLastAcceptedManifest(ClusterMetadataManifest.builder(lastAcceptedManifest).committed(true).build()); + } lastAcceptedState = clusterState; - lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); } catch (Exception e) { handleExceptionOnWrite(e); } @@ -804,6 +832,10 @@ public void close() throws IOException { remoteClusterStateService.close(); } + private boolean shouldCommitVotingConfig() { + return !lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()); + } + private void handleExceptionOnWrite(Exception e) { throw ExceptionsHelper.convertToRuntime(e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 3425550a9f548..e504c5abb46d3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -363,12 +363,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat * * @return {@link RemoteClusterStateManifestInfo} object containing uploaded manifest detail */ - @Nullable public RemoteClusterStateManifestInfo writeIncrementalMetadata( ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataManifest previousManifest ) throws IOException { + if (previousClusterState == null) { + throw new IllegalArgumentException("previousClusterState cannot be null"); + } + if (clusterState == null) { + throw new IllegalArgumentException("clusterState cannot be null"); + } + if (previousManifest == null) { + throw new IllegalArgumentException("previousManifest cannot be null"); + } logger.trace("WRITING INCREMENTAL STATE"); final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); @@ -376,7 +384,6 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( logger.error("Local node is not elected cluster manager. Exiting"); return null; } - assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); @@ -949,18 +956,41 @@ public RemoteClusterStateCleanupManager getCleanupManager() { } @Nullable - public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) - throws IOException { + public RemoteClusterStateManifestInfo markLastStateAsCommitted( + ClusterState clusterState, + ClusterMetadataManifest previousManifest, + boolean commitVotingConfig + ) throws IOException { assert clusterState != null : "Last accepted cluster state is not set"; if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; } assert previousManifest != null : "Last cluster metadata manifest is not set"; + UploadedMetadataAttribute uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata(); + if (commitVotingConfig) { + // update the coordination metadata if voting config is committed + uploadedCoordinationMetadata = writeMetadataInParallel( + clusterState, + emptyList(), + emptyMap(), + emptyMap(), + true, + false, + false, + false, + false, + false, + emptyMap(), + false, + emptyList(), + null + ).uploadedCoordinationMetadata; + } UploadedMetadataResults uploadedMetadataResults = new UploadedMetadataResults( previousManifest.getIndices(), previousManifest.getCustomMetadataMap(), - previousManifest.getCoordinationMetadata(), + uploadedCoordinationMetadata, previousManifest.getSettingsMetadata(), previousManifest.getTemplatesMetadata(), previousManifest.getTransientSettingsMetadata(), @@ -1762,6 +1792,10 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } + public boolean isRemotePublicationEnabled() { + return this.isPublicationEnabled; + } + public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { this.remoteStateReadTimeout = remoteStateReadTimeout; } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index d003b54adcccc..32cb95e0c04f6 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -72,20 +73,28 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; public class CoordinationStateTests extends OpenSearchTestCase { private DiscoveryNode node1; private DiscoveryNode node2; private DiscoveryNode node3; + private DiscoveryNode nodeWithPub; private ClusterState initialStateNode1; + private ClusterState initialStateNode2; private PersistedState ps1; private PersistedStateRegistry psr1; @@ -99,16 +108,18 @@ public void setupNodes() { node1 = createNode("node1"); node2 = createNode("node2"); node3 = createNode("node3"); + nodeWithPub = createNode( + "nodeWithPub", + Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "", + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "" + ) + ); initialStateNode1 = clusterState(0L, 0L, node1, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); - ClusterState initialStateNode2 = clusterState( - 0L, - 0L, - node2, - VotingConfiguration.EMPTY_CONFIG, - VotingConfiguration.EMPTY_CONFIG, - 42L - ); + initialStateNode2 = clusterState(0L, 0L, node2, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); ClusterState initialStateNode3 = clusterState( 0L, 0L, @@ -128,6 +139,10 @@ public void setupNodes() { } public static DiscoveryNode createNode(String id) { + return createNode(id, Collections.emptyMap()); + } + + public static DiscoveryNode createNode(String id, Map attributes) { final TransportAddress address = buildNewFakeTransportAddress(); return new DiscoveryNode( "", @@ -136,7 +151,7 @@ public static DiscoveryNode createNode(String id) { address.address().getHostString(), address.getAddress(), address, - Collections.emptyMap(), + attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT ); @@ -913,7 +928,7 @@ public void testSafety() { public void testHandlePrePublishAndCommitWhenRemoteStateDisabled() { final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); - final PersistedStateRegistry persistedStateRegistrySpy = Mockito.spy(persistedStateRegistry); + final PersistedStateRegistry persistedStateRegistrySpy = spy(persistedStateRegistry); final CoordinationState coordinationState = createCoordinationState(persistedStateRegistrySpy, node1, Settings.EMPTY); final VotingConfiguration initialConfig = VotingConfiguration.of(node1); final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); @@ -932,7 +947,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateDisabled() { public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final VotingConfiguration initialConfig = VotingConfiguration.of(node1); - final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); + final ClusterState clusterState = clusterStateWithClusterManager(0L, 0L, node1, node1, initialConfig, initialConfig, 42L); final String previousClusterUUID = "prev-cluster-uuid"; final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() .clusterTerm(0L) @@ -948,8 +963,9 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .previousClusterUUID(randomAlphaOfLength(10)) .clusterUUIDCommitted(true) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)) - .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); + when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)).thenReturn( + new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") + ); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); @@ -958,40 +974,21 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep new RemotePersistedState(remoteClusterStateService, previousClusterUUID) ); - String randomRepoName = "randomRepoName"; - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - randomRepoName - ); - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - randomRepoName - ); - - Settings settings = Settings.builder() - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName) - .put(stateRepoTypeAttributeKey, FsRepository.TYPE) - .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .build(); - - final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings); + final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, remoteStateSettings()); coordinationState.handlePrePublish(clusterState); Mockito.verify(remoteClusterStateService, Mockito.times(1)) .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); - Mockito.when(remoteClusterStateService.markLastStateAsCommitted(any(), any())) - .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); + when(remoteClusterStateService.markLastStateAsCommitted(any(), any(), eq(false))).thenReturn( + new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") + ); coordinationState.handlePreCommit(); ClusterState committedClusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).clusterUUIDCommitted(true).build()) .build(); - // Mockito.verify(remoteClusterStateService, Mockito.times(1)).markLastStateAsCommitted(committedClusterState, manifest); ArgumentCaptor clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class); - verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(clusterStateCaptor.capture(), any()); + verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(clusterStateCaptor.capture(), any(), eq(false)); assertThat(clusterStateCaptor.getValue().metadata().indices(), equalTo(committedClusterState.metadata().indices())); assertThat(clusterStateCaptor.getValue().metadata().clusterUUID(), equalTo(committedClusterState.metadata().clusterUUID())); assertThat(clusterStateCaptor.getValue().stateUUID(), equalTo(committedClusterState.stateUUID())); @@ -1006,6 +1003,271 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep ); } + public void testHandlePublishRequestOnFollowerWhenRemotePublicationEnabled() { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + // cluster manager is node1 and node2 is a follower node + VotingConfiguration initialConfig = VotingConfiguration.of(node1); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + coordinationState.handleStartJoin(startJoinRequest); + + ClusterState state2 = setValue( + ClusterState.builder(state1) + .metadata( + Metadata.builder(state1.metadata()) + .coordinationMetadata(CoordinationMetadata.builder(state1.coordinationMetadata()).term(newTerm).build()) + .build() + ) + .version(randomLongBetween(1, 10)) + .build(), + 43L + ); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(state2.version()) + .clusterUUID(state2.metadata().clusterUUID()) + .nodeId(node1.getId()) + .stateUUID(randomAlphaOfLength(10)) + .opensearchVersion(Version.CURRENT) + .committed(false) + .codecVersion(1) + .globalMetadataFileName(randomAlphaOfLength(10)) + .indices(Collections.emptyList()) + .previousClusterUUID(randomAlphaOfLength(10)) + .clusterUUIDCommitted(true) + .build(); + + PublishResponse publishResponse = coordinationState.handlePublishRequest(new RemoteStatePublishRequest(state2, manifest)); + assertEquals(state2.term(), publishResponse.getTerm()); + assertEquals(state2.version(), publishResponse.getVersion()); + verifyNoInteractions(remoteClusterStateService); + assertEquals(state2, persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertEquals(manifest, persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + } + + public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabled() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + VotingConfiguration initialConfig = VotingConfiguration.of(node1, nodeWithPub); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + Join v1 = cs1.handleStartJoin(startJoinRequest); + Join v2 = coordinationState.handleStartJoin(startJoinRequest); + assertTrue(coordinationState.handleJoin(v1)); + assertTrue(coordinationState.handleJoin(v2)); + assertTrue(coordinationState.electionWon()); + VotingConfiguration newConfig = VotingConfiguration.of(node1, nodeWithPub, node3); + ClusterState state2 = clusterState( + startJoinRequest.getTerm(), + 2L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + newConfig, + 7L + ); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(state2.version()) + .clusterUUID(state2.metadata().clusterUUID()) + .nodeId(node1.getId()) + .stateUUID(randomAlphaOfLength(10)) + .opensearchVersion(Version.CURRENT) + .committed(false) + .codecVersion(1) + .globalMetadataFileName(randomAlphaOfLength(10)) + .indices(Collections.emptyList()) + .previousClusterUUID(randomAlphaOfLength(10)) + .clusterUUIDCommitted(true) + .build(); + + PublishRequest publishRequest = coordinationState.handleClientValue(state2); + coordinationState.handlePublishRequest(new RemoteStatePublishRequest(publishRequest.getAcceptedState(), manifest)); + ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); + coordinationState.handleCommit(applyCommitRequest); + verifyNoInteractions(remoteClusterStateService); + assertTrue( + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState().metadata().clusterUUIDCommitted() + ); + assertTrue(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest().isCommitted()); + assertEquals(coordinationState.getLastCommittedConfiguration(), newConfig); + } + + public void testRemotePersistedStateResetsForPublicationEnabledAfterLocalPublication() { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + // cluster manager is node1 and nodeWithPub is a follower node + VotingConfiguration initialConfig = VotingConfiguration.of(node1); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + coordinationState.handleStartJoin(startJoinRequest); + + ClusterState state2 = setValue( + ClusterState.builder(state1) + .metadata( + Metadata.builder(state1.metadata()) + .coordinationMetadata(CoordinationMetadata.builder(state1.coordinationMetadata()).term(newTerm).build()) + .build() + ) + .version(randomLongBetween(1, 10)) + .build(), + 43L + ); + + PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2)); + assertEquals(state2.term(), publishResponse.getTerm()); + assertEquals(state2.version(), publishResponse.getVersion()); + verifyNoInteractions(remoteClusterStateService); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + } + + public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRemotePersistedState() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + VotingConfiguration initialConfig = VotingConfiguration.of(node1, nodeWithPub); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + Join v1 = cs1.handleStartJoin(startJoinRequest); + Join v2 = coordinationState.handleStartJoin(startJoinRequest); + assertTrue(coordinationState.handleJoin(v1)); + assertTrue(coordinationState.handleJoin(v2)); + assertTrue(coordinationState.electionWon()); + VotingConfiguration newConfig = VotingConfiguration.of(node1, nodeWithPub, node3); + ClusterState state2 = clusterState( + startJoinRequest.getTerm(), + 2L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + newConfig, + 7L + ); + + PublishRequest publishRequest = coordinationState.handleClientValue(state2); + coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState())); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); + PersistedState spyRPS = spy(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)); + coordinationState.handleCommit(applyCommitRequest); + verifyNoInteractions(spyRPS); + verifyNoInteractions(remoteClusterStateService); + } + public void testIsRemotePublicationEnabled_WithInconsistentSettings() { // create settings with remote state disabled but publication enabled Settings settings = Settings.builder() @@ -1042,6 +1304,30 @@ public static ClusterState clusterState( ); } + public static ClusterState clusterStateWithClusterManager( + long term, + long version, + DiscoveryNode localNode, + DiscoveryNode clusterManagerNode, + VotingConfiguration lastCommittedConfig, + VotingConfiguration lastAcceptedConfig, + long value + ) { + return clusterState( + term, + version, + DiscoveryNodes.builder() + .add(localNode) + .add(clusterManagerNode) + .localNodeId(localNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + .build(), + lastCommittedConfig, + lastAcceptedConfig, + value + ); + } + public static ClusterState clusterState( long term, long version, @@ -1090,4 +1376,30 @@ private static PersistedStateRegistry createPersistedStateRegistry(ClusterState persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, clusterState)); return persistedStateRegistry; } + + private static Settings remoteStateSettings() { + String randomRepoName = "randomRepoName"; + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + randomRepoName + ); + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + randomRepoName + ); + + Settings settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName) + .put(stateRepoTypeAttributeKey, FsRepository.TYPE) + .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + return settings; + } + + private static Settings remotePublicationSettings() { + return Settings.builder().put(remoteStateSettings()).put(REMOTE_PUBLICATION_SETTING_KEY, true).build(); + } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 9972bbfff5d66..5ea5241762753 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -115,6 +115,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -210,11 +211,15 @@ public void testSetCurrentTerm() throws IOException { } private ClusterState createClusterState(long version, Metadata metadata) { - return ClusterState.builder(clusterName) - .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()) - .version(version) - .metadata(metadata) - .build(); + return createClusterState(version, metadata, false); + } + + private ClusterState createClusterState(long version, Metadata metadata, boolean isClusterManagerNode) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()); + if (isClusterManagerNode) { + nodesBuilder.clusterManagerNodeId(localNode.getId()); + } + return ClusterState.builder(clusterName).nodes(nodesBuilder.build()).version(version).metadata(metadata).build(); } private ClusterState createClusterStateWithNodes(long version, Metadata metadata) { @@ -225,7 +230,12 @@ private ClusterState createClusterStateWithNodes(long version, Metadata metadata Sets.newHashSet(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), Version.V_2_13_0 ); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).add(oldNode).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(localNode) + .localNodeId(localNode.getId()) + .clusterManagerNodeId(localNode.getId()) + .add(oldNode) + .build(); return ClusterState.builder(clusterName).nodes(discoveryNodes).version(version).metadata(metadata).build(); } @@ -762,7 +772,8 @@ public void testRemotePersistedState() throws IOException { final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); remotePersistedState.setLastAcceptedState(clusterState); @@ -773,7 +784,8 @@ public void testRemotePersistedState() throws IOException { final ClusterState secondClusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); remotePersistedState.setLastAcceptedState(secondClusterState); @@ -783,11 +795,11 @@ public void testRemotePersistedState() throws IOException { assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); - when(remoteClusterStateService.markLastStateAsCommitted(Mockito.any(), Mockito.any())).thenReturn( + when(remoteClusterStateService.markLastStateAsCommitted(Mockito.any(), Mockito.any(), eq(false))).thenReturn( new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") ); remotePersistedState.markLastAcceptedStateAsCommitted(); - Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any()); + Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any(), eq(false)); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -825,7 +837,8 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ClusterState clusterState2 = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build(), + true ); final ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder() .clusterTerm(1L) @@ -840,7 +853,8 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ClusterState clusterState3 = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build(), + true ); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest3")); @@ -849,6 +863,73 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx } + public void testRemotePersistentState_FollowerNode() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(5L) + .committed(false) + .build(); + final String previousClusterUUID = "prev-cluster-uuid"; + RemotePersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); + + assertNull(remotePersistedState.getLastAcceptedState()); + assertNull(remotePersistedState.getLastAcceptedManifest()); + assertEquals(0, remotePersistedState.getCurrentTerm()); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder() + .coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()) + .clusterUUIDCommitted(true) + .build(), + false + ); + + remotePersistedState.setLastAcceptedState(clusterState); + remotePersistedState.setLastAcceptedManifest(manifest); + Mockito.verify(remoteClusterStateService, never()) + .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + + assertEquals(clusterState, remotePersistedState.getLastAcceptedState()); + assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); + assertEquals(manifest, remotePersistedState.getLastAcceptedManifest()); + + final ClusterState secondClusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder() + .coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()) + .clusterUUIDCommitted(false) + .build(), + false + ); + + remotePersistedState.setLastAcceptedState(secondClusterState); + Mockito.verify(remoteClusterStateService, never()) + .writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + + assertEquals(secondClusterState, remotePersistedState.getLastAcceptedState()); + assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); + assertFalse(remotePersistedState.getLastAcceptedManifest().isCommitted()); + + remotePersistedState.markLastAcceptedStateAsCommitted(); + Mockito.verify(remoteClusterStateService, never()).markLastStateAsCommitted(Mockito.any(), Mockito.any(), eq(false)); + + assertEquals(secondClusterState, remotePersistedState.getLastAcceptedState()); + assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); + assertFalse(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted()); + assertTrue(remotePersistedState.getLastAcceptedManifest().isCommitted()); + + final ClusterState thirdClusterState = ClusterState.builder(secondClusterState) + .metadata(Metadata.builder(secondClusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).build()) + .build(); + remotePersistedState.setLastAcceptedState(thirdClusterState); + remotePersistedState.markLastAcceptedStateAsCommitted(); + assertTrue(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted()); + assertTrue(remotePersistedState.getLastAcceptedManifest().isCommitted()); + } + public void testRemotePersistedStateNotCommitted() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; @@ -875,7 +956,8 @@ public void testRemotePersistedStateNotCommitted() throws IOException { final long clusterTerm = randomNonNegativeLong(); ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); clusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).clusterUUIDCommitted(false).build()) @@ -901,7 +983,8 @@ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOExcept final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); @@ -924,7 +1007,8 @@ public void testRemotePersistedStateFailureStats() throws IOException { final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); @@ -1025,7 +1109,8 @@ public void testGatewayForRemoteStateForNodeReplacement() throws IOException { false ) .clusterUUID(randomAlphaOfLength(10)) - .build() + .build(), + false ); when(remoteClusterStateService.getLastKnownUUIDFromRemote(clusterName.value())).thenReturn( previousState.metadata().clusterUUID() @@ -1071,7 +1156,8 @@ public void testGatewayForRemoteStateForNodeReboot() throws IOException { .coordinationMetadata(CoordinationMetadata.builder().term(randomLong()).build()) .put(indexMetadata, false) .clusterUUID(randomAlphaOfLength(10)) - .build() + .build(), + false ); gateway = newGatewayForRemoteState( remoteClusterStateService, @@ -1117,7 +1203,8 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I .put(indexMetadata, false) .clusterUUID(ClusterState.UNKNOWN_UUID) .persistentSettings(Settings.builder().put(Metadata.SETTING_READ_ONLY_SETTING.getKey(), true).build()) - .build() + .build(), + false ) ).nodes(DiscoveryNodes.EMPTY_NODES).build(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 608cc2e12b055..e875b1c5dc64e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -608,20 +608,20 @@ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOExc final RemoteClusterStateManifestInfo manifestDetails = remoteClusterStateService.writeIncrementalMetadata( clusterState, clusterState, - null + ClusterMetadataManifest.builder().build() ); Assert.assertThat(manifestDetails, nullValue()); assertEquals(0, remoteClusterStateService.getUploadStats().getSuccessCount()); } - public void testFailWriteIncrementalMetadataWhenTermChanged() { + public void testFailWriteIncrementalMetadataWhenManifestNull() { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build(); final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); assertThrows( - AssertionError.class, + IllegalArgumentException.class, () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null) ); } @@ -2520,7 +2520,7 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { List indices = List.of(uploadedIndexMetadata); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest) + final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest, false) .getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()