Skip to content

Commit

Permalink
Fix cluster state publication
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed May 17, 2024
1 parent 3186372 commit 8912b0c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.version(),
clusterState.term()
);
logger.info("Setting last accepted state : term - {}, version - {}", clusterState.term(), clusterState.version());
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
assert getLastAcceptedState() == clusterState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
}

if (applyFullState == true) {
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId());
logger.debug("Downloaded full cluster state version [{}]", clusterState.version());
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get());
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId());
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
Expand Down Expand Up @@ -393,8 +393,7 @@ public void onFailure(Exception e) {
}
if (sendRemoteState && destination.isRemoteStoreNode()) {
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
}
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

package org.opensearch.gateway.remote;

import java.util.Locale;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -76,6 +78,28 @@ CheckedRunnable<IOException> getAsyncMetadataWriteAction(
);
}

public ToXContent readMetadata(ChecksumBlobStoreFormat componentMetadataBlobStore, String clusterName, String clusterUUID, String fileName) {
final BlobContainer remoteStateAttributeContainer = clusterStateAttributeContainer(clusterName, clusterUUID);
try {
// Fetch custom metadata
if (fileName != null) {
String[] splitPath = fileName.split("/");
return componentMetadataBlobStore.read(
remoteStateAttributeContainer,
splitPath[splitPath.length - 1],
blobStoreRepository.getNamedXContentRegistry()
);
} else {
return TemplatesMetadata.EMPTY_METADATA;
}
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while downloading Templates Metadata - %s", fileName),
e
);
}
}

private BlobContainer clusterStateAttributeContainer(String clusterName, String clusterUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/
return blobStoreRepository.blobStore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.CheckedRunnable;
Expand Down Expand Up @@ -775,10 +776,10 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
);
}

return getClusterStateForManifest(clusterName, clusterMetadataManifest.get());
return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId);
}

public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest) {
public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId) {
// todo make this async
// Fetch Global Metadata
Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterName, manifest.getClusterUUID(), manifest);
Expand All @@ -789,17 +790,20 @@ public ClusterState getClusterStateForManifest(String clusterName, ClusterMetada
manifest.getClusterUUID(),
manifest
);
DiscoveryNodes discoveryNodes = (DiscoveryNodes) remoteClusterStateAttributesManager.readMetadata(DISCOVERY_NODES_FORMAT, clusterName, manifest.getClusterUUID(), manifest.getDiscoveryNodesMetadata().getUploadedFilename());

Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); });

return ClusterState.builder(ClusterState.EMPTY_STATE)
.version(manifest.getStateVersion())
.stateUUID(manifest.getStateUUID())
.nodes(DiscoveryNodes.builder(discoveryNodes).localNodeId(localNodeId))
.metadata(Metadata.builder(globalMetadata).indices(indexMetadataMap).build())
.build();
}

public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState) {
public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId) {
assert manifest.getDiffManifest() != null;
ClusterStateDiffManifest diff = manifest.getDiffManifest();
ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState);
Expand Down Expand Up @@ -827,11 +831,18 @@ public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadata
TemplatesMetadata templatesMetadata = remoteGlobalMetadataManager.getTemplatesMetadata(clusterName, manifest.getClusterUUID(), manifest.getTemplatesMetadata().getUploadedFilename());
metadataBuilder.templates(templatesMetadata);
}
for (String customType : diff.getCustomMetadataUpdated().keySet()) {
Metadata.Custom custom = remoteGlobalMetadataManager.getCustomsMetadata(clusterName, manifest.getClusterUUID(), manifest.getCustomMetadataMap().get(customType).getUploadedFilename(), customType);
metadataBuilder.putCustom(customType, custom);
if (diff.isDiscoveryNodesUpdated()) {
DiscoveryNodes discoveryNodes = (DiscoveryNodes) remoteClusterStateAttributesManager.readMetadata(DISCOVERY_NODES_FORMAT, clusterName, manifest.getClusterUUID(), manifest.getDiscoveryNodesMetadata().getUploadedFilename());
clusterStateBuilder.nodes(DiscoveryNodes.builder(discoveryNodes).localNodeId(localNodeId));
}
return clusterStateBuilder.metadata(metadataBuilder).build();
if (diff.getCustomMetadataUpdated() != null) {
for (String customType : diff.getCustomMetadataUpdated().keySet()) {
Metadata.Custom custom = remoteGlobalMetadataManager.getCustomsMetadata(clusterName, manifest.getClusterUUID(),
manifest.getCustomMetadataMap().get(customType).getUploadedFilename(), customType);
metadataBuilder.putCustom(customType, custom);
}
}
return clusterStateBuilder.stateUUID(manifest.getStateUUID()).version(manifest.getStateVersion()).metadata(metadataBuilder).build();
}

/**
Expand Down

0 comments on commit 8912b0c

Please sign in to comment.