From 148eca559eed927a6fabd3833ccad553f84b6cd6 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 5 Jun 2024 08:22:35 +0530 Subject: [PATCH] Update remote publication to skip download on master node --- .../PublicationTransportHandler.java | 23 ++++++++++++++++--- .../coordination/RemotePublishRequest.java | 6 +++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index d553324bbf49d..bcad827517033 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -229,6 +229,9 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { + if (transportService.getLocalNode().equals(request.getSourceNode())) { + return acceptStateOnLocalNode(request); + } final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version); if (manifestOptional.isPresent() == false) { throw new IllegalStateException( @@ -250,15 +253,17 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish } if (applyFullState == true) { + logger.info("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(), + manifest.getStateUUID()); ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true); - logger.debug("Downloaded full cluster state [{}]", clusterState); fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { - ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId()); - logger.debug("Downloaded full cluster state from diff [{}]", clusterState); + logger.info("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(), + manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID()); + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId()); compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); @@ -279,6 +284,18 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(new PublishRequest(incomingState)); } + private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) { + // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation) + final PublishRequest publishRequest = currentPublishRequestToSelf.get(); + if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term + || publishRequest.getAcceptedState().version() != remotePublishRequest.version) { + throw new IllegalStateException("publication to self failed for " + remotePublishRequest); + } + PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest); + lastSeenClusterState.set(publishRequest.getAcceptedState()); + return publishWithJoinResponse; + } + public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) { final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java index b1981c0b4b1a0..e59f7cedd98e5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -38,6 +38,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(clusterUUID); } + @Override + public String toString() { + return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID + + ", sourceNode=" + sourceNode + '}'; + } + public String getClusterName() { return clusterName; }