From 2a5b124ee8ef4376d62c484b6cd3ea1d98ca75d1 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Tue, 19 Sep 2023 09:11:20 +0530 Subject: [PATCH] Fix cluster chaining during bootstrap (#10020) * Fix clusterUUID chaining logic Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/GatewayMetaState.java | 43 ++++++- .../remote/ClusterMetadataManifest.java | 37 +++++- .../remote/RemoteClusterStateService.java | 100 ++++++++++++--- .../coordination/CoordinationStateTests.java | 36 +++++- .../GatewayMetaStatePersistedStateTests.java | 21 +++- .../remote/ClusterMetadataManifestTests.java | 21 +++- .../RemoteClusterStateServiceTests.java | 117 +++++++++++++++--- 7 files changed, 315 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index e42ac8daa3b1c..6b26af148b2ea 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -78,6 +78,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -171,11 +172,12 @@ public void start( // If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote // If there is no valid state on remote, continue with initial empty state // If there is a valid state, then restore index metadata using this state + String lastKnownClusterUUID = ClusterState.UNKNOWN_UUID; if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) { - String lastKnownClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote( + lastKnownClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote( clusterState.getClusterName().value() ); - if (!ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID)) { + if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) { // Load state from remote final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore( clusterState, @@ -186,7 +188,7 @@ public void start( clusterState = remoteRestoreResult.getClusterState(); } } - remotePersistedState = new RemotePersistedState(remoteClusterStateService); + remotePersistedState = new RemotePersistedState(remoteClusterStateService, lastKnownClusterUUID); } persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); } else { @@ -647,9 +649,11 @@ public static class RemotePersistedState implements PersistedState { private ClusterState lastAcceptedState; private ClusterMetadataManifest lastAcceptedManifest; private final RemoteClusterStateService remoteClusterStateService; + private String previousClusterUUID; - public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) { + public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService, final String previousClusterUUID) { this.remoteClusterStateService = remoteClusterStateService; + this.previousClusterUUID = previousClusterUUID; } @Override @@ -674,7 +678,26 @@ public void setLastAcceptedState(ClusterState clusterState) { try { final ClusterMetadataManifest manifest; if (shouldWriteFullClusterState(clusterState)) { - manifest = remoteClusterStateService.writeFullMetadata(clusterState); + if (clusterState.metadata().clusterUUIDCommitted() == true) { + final Optional latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); + if (latestManifest.isPresent()) { + // The previous UUID should not change for the current UUID. So fetching the latest manifest + // from remote store and getting the previous UUID. + previousClusterUUID = latestManifest.get().getPreviousClusterUUID(); + } else { + // When the user starts the cluster with remote state disabled but later enables the remote state, + // there will not be any manifest for the current cluster UUID. + logger.error( + "Latest manifest is not present in remote store for cluster UUID: {}", + clusterState.metadata().clusterUUID() + ); + previousClusterUUID = ClusterState.UNKNOWN_UUID; + } + } + manifest = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; @@ -723,11 +746,19 @@ public void markLastAcceptedStateAsCommitted() { try { assert lastAcceptedState != null : "Last accepted state is not present"; assert lastAcceptedManifest != null : "Last accepted manifest is not present"; + ClusterState clusterState = lastAcceptedState; + if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false + && lastAcceptedState.metadata().clusterUUIDCommitted() == false) { + Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + metadataBuilder.clusterUUIDCommitted(true); + clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); + } final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted( - lastAcceptedState, + clusterState, lastAcceptedManifest ); lastAcceptedManifest = committedManifest; + lastAcceptedState = clusterState; } catch (Exception e) { handleExceptionOnWrite(e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 040c0663efbd9..40b16f3d6323b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -42,6 +42,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField COMMITTED_FIELD = new ParseField("committed"); private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); + private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed"); private static long term(Object[] fields) { return (long) fields[0]; @@ -79,6 +80,10 @@ private static String previousClusterUUID(Object[] fields) { return (String) fields[8]; } + private static boolean clusterUUIDCommitted(Object[] fields) { + return (boolean) fields[9]; + } + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> new ClusterMetadataManifest( @@ -90,7 +95,8 @@ private static String previousClusterUUID(Object[] fields) { nodeId(fields), committed(fields), indices(fields), - previousClusterUUID(fields) + previousClusterUUID(fields), + clusterUUIDCommitted(fields) ) ); @@ -108,6 +114,7 @@ private static String previousClusterUUID(Object[] fields) { INDICES_FIELD ); PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); } private final List indices; @@ -119,6 +126,7 @@ private static String previousClusterUUID(Object[] fields) { private final String nodeId; private final boolean committed; private final String previousClusterUUID; + private final boolean clusterUUIDCommitted; public List getIndices() { return indices; @@ -156,6 +164,10 @@ public String getPreviousClusterUUID() { return previousClusterUUID; } + public boolean isClusterUUIDCommitted() { + return clusterUUIDCommitted; + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -165,7 +177,8 @@ public ClusterMetadataManifest( String nodeId, boolean committed, List indices, - String previousClusterUUID + String previousClusterUUID, + boolean clusterUUIDCommitted ) { this.clusterTerm = clusterTerm; this.stateVersion = version; @@ -176,6 +189,7 @@ public ClusterMetadataManifest( this.committed = committed; this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; + this.clusterUUIDCommitted = clusterUUIDCommitted; } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -188,6 +202,7 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.committed = in.readBoolean(); this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); + this.clusterUUIDCommitted = in.readBoolean(); } public static Builder builder() { @@ -215,6 +230,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endArray(); builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID()); + builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); return builder; } @@ -229,6 +245,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(committed); out.writeCollection(indices); out.writeString(previousClusterUUID); + out.writeBoolean(clusterUUIDCommitted); } @Override @@ -248,7 +265,8 @@ public boolean equals(Object o) { && Objects.equals(opensearchVersion, that.opensearchVersion) && Objects.equals(nodeId, that.nodeId) && Objects.equals(committed, that.committed) - && Objects.equals(previousClusterUUID, that.previousClusterUUID); + && Objects.equals(previousClusterUUID, that.previousClusterUUID) + && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted); } @Override @@ -262,7 +280,8 @@ public int hashCode() { opensearchVersion, nodeId, committed, - previousClusterUUID + previousClusterUUID, + clusterUUIDCommitted ); } @@ -291,6 +310,7 @@ public static class Builder { private String nodeId; private String previousClusterUUID; private boolean committed; + private boolean clusterUUIDCommitted; public Builder indices(List indices) { this.indices = indices; @@ -341,6 +361,11 @@ public Builder previousClusterUUID(String previousClusterUUID) { return this; } + public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) { + this.clusterUUIDCommitted = clusterUUIDCommitted; + return this; + } + public Builder() { indices = new ArrayList<>(); } @@ -355,6 +380,7 @@ public Builder(ClusterMetadataManifest manifest) { this.committed = manifest.committed; this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; + this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; } public ClusterMetadataManifest build() { @@ -367,7 +393,8 @@ public ClusterMetadataManifest build() { nodeId, committed, indices, - previousClusterUUID + previousClusterUUID, + clusterUUIDCommitted ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index cf750bb11f3f8..dddc5376803a5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -152,20 +152,13 @@ private BlobStoreTransferService getBlobStoreTransferService() { * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException { + public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; } - // should fetch the previous cluster UUID before writing full cluster state. - // Whenever a new election happens, a new leader will be elected and it might have stale previous UUID - final String previousClusterUUID = fetchPreviousClusterUUID( - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ); - // any validations before/after upload ? final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, @@ -436,7 +429,8 @@ private ClusterMetadataManifest uploadManifest( nodeId, committed, uploadedIndexMetadata, - previousClusterUUID + previousClusterUUID, + clusterState.metadata().clusterUUIDCommitted() ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); return manifest; @@ -582,7 +576,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) { try { Set clusterUUIDs = getAllClusterUUIDs(clusterName); Map latestManifests = getLatestManifestForAllClusterUUIDs(clusterName, clusterUUIDs); - List validChain = createClusterChain(latestManifests); + List validChain = createClusterChain(latestManifests, clusterName); if (validChain.isEmpty()) { return ClusterState.UNKNOWN_UUID; } @@ -623,7 +617,7 @@ private Map getLatestManifestForAllClusterUUIDs * @param manifestsByClusterUUID Map of latest ClusterMetadataManifest for every cluster UUID * @return List of cluster UUIDs. The first element is the most recent cluster UUID in the chain */ - private List createClusterChain(final Map manifestsByClusterUUID) { + private List createClusterChain(final Map manifestsByClusterUUID, final String clusterName) { final Map clusterUUIDGraph = manifestsByClusterUUID.values() .stream() .collect(Collectors.toMap(ClusterMetadataManifest::getClusterUUID, ClusterMetadataManifest::getPreviousClusterUUID)); @@ -637,18 +631,29 @@ private List createClusterChain(final Map 1) { - throw new IllegalStateException( - String.format( - Locale.ROOT, - "The system has ended into multiple valid cluster states in the remote store. " - + "Please check their latest manifest to decide which one you want to keep. Valid Cluster UUIDs: - %s", - validClusterUUIDs - ) + // If the valid cluster UUIDs are more that 1, it means there was some race condition where + // more then 2 cluster manager nodes tried to become active cluster manager and published + // 2 cluster UUIDs which followed the same previous UUID. + final Map manifestsByClusterUUIDTrimmed = trimClusterUUIDs( + manifestsByClusterUUID, + validClusterUUIDs, + clusterName ); + if (manifestsByClusterUUID.size() == manifestsByClusterUUIDTrimmed.size()) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "The system has ended into multiple valid cluster states in the remote store. " + + "Please check their latest manifest to decide which one you want to keep. Valid Cluster UUIDs: - %s", + validClusterUUIDs + ) + ); + } + return createClusterChain(manifestsByClusterUUIDTrimmed, clusterName); } final List validChain = new ArrayList<>(); String currentUUID = validClusterUUIDs.get(0); - while (!ClusterState.UNKNOWN_UUID.equals(currentUUID)) { + while (currentUUID != null && !ClusterState.UNKNOWN_UUID.equals(currentUUID)) { validChain.add(currentUUID); // Getting the previous cluster UUID of a cluster UUID from the clusterUUID Graph currentUUID = clusterUUIDGraph.get(currentUUID); @@ -656,8 +661,61 @@ private List createClusterChain(final Map trimClusterUUIDs( + final Map latestManifestsByClusterUUID, + final List validClusterUUIDs, + final String clusterName + ) { + final Map trimmedUUIDs = new HashMap<>(latestManifestsByClusterUUID); + for (String clusterUUID : validClusterUUIDs) { + ClusterMetadataManifest currentManifest = trimmedUUIDs.get(clusterUUID); + // Here we compare the manifest of current UUID to that of previous UUID + // In case currentUUID's latest manifest is same as previous UUIDs latest manifest, + // that means it was restored from previousUUID and no IndexMetadata update was performed on it. + if (ClusterState.UNKNOWN_UUID.equals(currentManifest.getPreviousClusterUUID())) { + if (currentManifest.getIndices().isEmpty()) { + trimmedUUIDs.remove(clusterUUID); + } + } else { + ClusterMetadataManifest previousManifest = trimmedUUIDs.get(currentManifest.getPreviousClusterUUID()); + if (isMetadataEqual(currentManifest, previousManifest, clusterName)) { + trimmedUUIDs.remove(clusterUUID); + } + } + } + return trimmedUUIDs; + } + + private boolean isMetadataEqual(ClusterMetadataManifest first, ClusterMetadataManifest second, String clusterName) { + // todo clusterName can be set as final in the constructor + if (first.getIndices().size() != second.getIndices().size()) { + return false; + } + final Map secondIndices = second.getIndices() + .stream() + .collect(Collectors.toMap(md -> md.getIndexName(), Function.identity())); + for (UploadedIndexMetadata uploadedIndexMetadata : first.getIndices()) { + final IndexMetadata firstIndexMetadata = getIndexMetadata(clusterName, first.getClusterUUID(), uploadedIndexMetadata); + final UploadedIndexMetadata secondUploadedIndexMetadata = secondIndices.get(uploadedIndexMetadata.getIndexName()); + if (secondUploadedIndexMetadata == null) { + return false; + } + final IndexMetadata secondIndexMetadata = getIndexMetadata(clusterName, second.getClusterUUID(), secondUploadedIndexMetadata); + if (firstIndexMetadata.equals(secondIndexMetadata) == false) { + return false; + } + } + return true; + } + private boolean isInvalidClusterUUID(ClusterMetadataManifest manifest) { - return !manifest.isCommitted() && manifest.getIndices().isEmpty(); + return !manifest.isClusterUUIDCommitted(); } /** @@ -729,6 +787,7 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { /** * Purges all remote cluster state against provided cluster UUIDs + * * @param clusterName name of the cluster * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged */ @@ -760,6 +819,7 @@ public void onFailure(Exception e) { /** * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * * @param clusterName name of the cluster * @param clusterUUID uuid of cluster state to refer to in remote * @param manifestsToRetain no of latest manifest files to keep in remote diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index d1c2dda615992..f37823d2c0c7d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static java.util.Collections.emptyMap; @@ -70,6 +71,9 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class CoordinationStateTests extends OpenSearchTestCase { @@ -925,6 +929,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final VotingConfiguration initialConfig = VotingConfiguration.of(node1); final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); + final String previousClusterUUID = "prev-cluster-uuid"; final ClusterMetadataManifest manifest = new ClusterMetadataManifest( 0L, 0L, @@ -934,13 +939,17 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep randomAlphaOfLength(10), false, Collections.emptyList(), - randomAlphaOfLength(10) + randomAlphaOfLength(10), + true ); - Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState)).thenReturn(manifest); + Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(manifest); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); - persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, new RemotePersistedState(remoteClusterStateService)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, previousClusterUUID) + ); String randomRepoName = "randomRepoName"; String stateRepoTypeAttributeKey = String.format( @@ -963,11 +972,28 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings); coordinationState.handlePrePublish(clusterState); - Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState); + Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); coordinationState.handlePreCommit(); - Mockito.verify(remoteClusterStateService, Mockito.times(1)).markLastStateAsCommitted(clusterState, manifest); + ClusterState committedClusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()).clusterUUIDCommitted(true).build()) + .build(); + // Mockito.verify(remoteClusterStateService, Mockito.times(1)).markLastStateAsCommitted(committedClusterState, manifest); + ArgumentCaptor clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class); + verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(clusterStateCaptor.capture(), any()); + assertThat(clusterStateCaptor.getValue().metadata().indices(), equalTo(committedClusterState.metadata().indices())); + assertThat(clusterStateCaptor.getValue().metadata().clusterUUID(), equalTo(committedClusterState.metadata().clusterUUID())); + assertThat(clusterStateCaptor.getValue().stateUUID(), equalTo(committedClusterState.stateUUID())); + assertThat( + clusterStateCaptor.getValue().coordinationMetadata().term(), + equalTo(committedClusterState.coordinationMetadata().term()) + ); + assertThat(clusterStateCaptor.getValue().version(), equalTo(committedClusterState.version())); + assertThat( + clusterStateCaptor.getValue().metadata().clusterUUIDCommitted(), + equalTo(committedClusterState.metadata().clusterUUIDCommitted()) + ); } public static CoordinationState createCoordinationState( diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 486717faaf864..c7ed1cb732154 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -716,10 +716,11 @@ Directory createDirectory(Path path) { public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any())).thenReturn(manifest); + final String previousClusterUUID = "prev-cluster-uuid"; + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(manifest); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest); - CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), nullValue()); assertThat(remotePersistedState.getCurrentTerm(), equalTo(0L)); @@ -731,7 +732,7 @@ public void testRemotePersistedState() throws IOException { ); remotePersistedState.setLastAcceptedState(clusterState); - Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -742,7 +743,7 @@ public void testRemotePersistedState() throws IOException { ); remotePersistedState.setLastAcceptedState(secondClusterState); - Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState); + Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -752,14 +753,22 @@ public void testRemotePersistedState() throws IOException { assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + assertThat(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted(), equalTo(false)); + final ClusterState thirdClusterState = ClusterState.builder(secondClusterState) + .metadata(Metadata.builder(secondClusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).build()) + .build(); + remotePersistedState.setLastAcceptedState(thirdClusterState); + remotePersistedState.markLastAcceptedStateAsCommitted(); + assertThat(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted(), equalTo(true)); } public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); - Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any()); + final String previousClusterUUID = "prev-cluster-uuid"; + Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); - CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService); + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 9f8dde5ba9d45..66426c2a880a3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -37,7 +37,8 @@ public void testClusterMetadataManifestXContent() throws IOException { "test-node-id", false, Collections.singletonList(uploadedIndexMetadata), - "prev-cluster-uuid" + "prev-cluster-uuid", + true ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -60,7 +61,8 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "B10RX1f5RJenMQvYccCgSQ", true, randomUploadedIndexMetadataList(), - "yfObdx8KSMKKrXf8UyHhM" + "yfObdx8KSMKKrXf8UyHhM", + true ); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -183,6 +185,21 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { ); } + { // Mutate cluster uuid committed + EqualsHashCodeTestUtils.checkEqualsAndHashCode( + initialManifest, + orig -> OpenSearchTestCase.copyWriteable( + orig, + new NamedWriteableRegistry(Collections.emptyList()), + ClusterMetadataManifest::new + ), + manifest -> { + ClusterMetadataManifest.Builder builder = ClusterMetadataManifest.builder(manifest); + builder.clusterUUIDCommitted(false); + return builder.build(); + } + ); + } } private List randomUploadedIndexMetadataList() { diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 9f5067420aab1..65166386733c6 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -135,7 +135,7 @@ public void teardown() throws Exception { public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); Assert.assertThat(manifest, nullValue()); } @@ -169,7 +169,7 @@ public void testWriteFullMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -190,6 +190,7 @@ public void testWriteFullMetadataSuccess() throws IOException { assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); } public void testWriteFullMetadataInParallelSuccess() throws IOException { @@ -205,7 +206,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -216,6 +217,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { .stateVersion(1L) .stateUUID("state-uuid") .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -226,6 +228,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 1); assertEquals(writeContextArgumentCaptor.getAllValues().size(), 1); @@ -266,7 +269,7 @@ public void testWriteFullMetadataInParallelFailure() throws IOException { remoteClusterStateService.start(); assertThrows( RemoteClusterStateService.IndexMetadataTransferException.class, - () -> remoteClusterStateService.writeFullMetadata(clusterState) + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); } @@ -571,12 +574,43 @@ public void testGetValidPreviousClusterUUID() throws IOException { public void testGetValidPreviousClusterUUIDForInvalidChain() throws IOException { Map clusterUUIDsPointers = Map.of( + "cluster-uuid2", "cluster-uuid1", - ClusterState.UNKNOWN_UUID, + "cluster-uuid3", + "cluster-uuid2", + "cluster-uuid5", + "cluster-uuid4" + ); + mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers); + + remoteClusterStateService.start(); + assertThrows(IllegalStateException.class, () -> remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")); + } + + public void testGetValidPreviousClusterUUIDWithMultipleChains() throws IOException { + Map clusterUUIDsPointers = Map.of( "cluster-uuid2", + "cluster-uuid1", + "cluster-uuid1", ClusterState.UNKNOWN_UUID, "cluster-uuid3", - "cluster-uuid2" + "cluster-uuid1" + ); + mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers); + + remoteClusterStateService.start(); + String previousClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster"); + assertThat(previousClusterUUID, equalTo("cluster-uuid3")); + } + + public void testGetValidPreviousClusterUUIDWithInvalidMultipleChains() throws IOException { + Map clusterUUIDsPointers = Map.of( + "cluster-uuid1", + ClusterState.UNKNOWN_UUID, + "cluster-uuid2", + "cluster-uuid1", + "cluster-uuid3", + ClusterState.UNKNOWN_UUID ); mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers); @@ -598,42 +632,92 @@ private void mockObjectsForGettingPreviousClusterUUID(Map cluste when(blobContainer3.path()).thenReturn(blobPath); mockBlobContainerForClusterUUIDs(uuidBlobContainer, clusterUUIDsPointers.keySet()); + List uploadedIndexMetadataList1 = List.of( + new UploadedIndexMetadata("index1", "index-uuid1", "key1"), + new UploadedIndexMetadata("index2", "index-uuid2", "key2") + ); final ClusterMetadataManifest clusterManifest1 = generateClusterMetadataManifest( "cluster-uuid1", clusterUUIDsPointers.get("cluster-uuid1"), - randomAlphaOfLength(10) + randomAlphaOfLength(10), + uploadedIndexMetadataList1 ); - mockBlobContainer(blobContainer1, clusterManifest1, Map.of()); + Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexMetadata indexMetadata1 = IndexMetadata.builder("index1") + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + IndexMetadata indexMetadata2 = IndexMetadata.builder("index2") + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + Map indexMetadataMap1 = Map.of("index-uuid1", indexMetadata1, "index-uuid2", indexMetadata2); + mockBlobContainer(blobContainer1, clusterManifest1, indexMetadataMap1); + List uploadedIndexMetadataList2 = List.of( + new UploadedIndexMetadata("index1", "index-uuid1", "key1"), + new UploadedIndexMetadata("index2", "index-uuid2", "key2") + ); final ClusterMetadataManifest clusterManifest2 = generateClusterMetadataManifest( "cluster-uuid2", clusterUUIDsPointers.get("cluster-uuid2"), - randomAlphaOfLength(10) + randomAlphaOfLength(10), + uploadedIndexMetadataList2 ); - mockBlobContainer(blobContainer2, clusterManifest2, Map.of()); + IndexMetadata indexMetadata3 = IndexMetadata.builder("index1") + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + IndexMetadata indexMetadata4 = IndexMetadata.builder("index2") + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + Map indexMetadataMap2 = Map.of("index-uuid1", indexMetadata3, "index-uuid2", indexMetadata4); + mockBlobContainer(blobContainer2, clusterManifest2, indexMetadataMap2); + List uploadedIndexMetadataList3 = List.of(new UploadedIndexMetadata("index1", "index-uuid1", "key1")); final ClusterMetadataManifest clusterManifest3 = generateClusterMetadataManifest( "cluster-uuid3", clusterUUIDsPointers.get("cluster-uuid3"), - randomAlphaOfLength(10) + randomAlphaOfLength(10), + uploadedIndexMetadataList3 ); - mockBlobContainer(blobContainer3, clusterManifest3, Map.of()); + IndexMetadata indexMetadata5 = IndexMetadata.builder("index1") + .settings(indexSettings) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + Map indexMetadataMap3 = Map.of("index-uuid1", indexMetadata5); + mockBlobContainer(blobContainer3, clusterManifest3, indexMetadataMap3); when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn( uuidBlobContainer, blobContainer1, blobContainer1, + blobContainer3, + blobContainer3, blobContainer2, blobContainer2, - blobContainer3, - blobContainer3 + blobContainer1, + blobContainer2, + blobContainer1, + blobContainer2 ); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); } - private ClusterMetadataManifest generateClusterMetadataManifest(String clusterUUID, String previousClusterUUID, String stateUUID) { + private ClusterMetadataManifest generateClusterMetadataManifest( + String clusterUUID, + String previousClusterUUID, + String stateUUID, + List uploadedIndexMetadata + ) { return ClusterMetadataManifest.builder() - .indices(List.of()) + .indices(uploadedIndexMetadata) .clusterTerm(1L) .stateVersion(1L) .stateUUID(stateUUID) @@ -642,6 +726,7 @@ private ClusterMetadataManifest generateClusterMetadataManifest(String clusterUU .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID(previousClusterUUID) .committed(true) + .clusterUUIDCommitted(true) .build(); }