Skip to content

Commit

Permalink
Update remote publication to skip download on master node
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed Jun 5, 2024
1 parent 0333312 commit 148eca5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptStateOnLocalNode(request);
}
final Optional<ClusterMetadataManifest> manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version);
if (manifestOptional.isPresent() == false) {
throw new IllegalStateException(
Expand All @@ -250,15 +253,17 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
}

if (applyFullState == true) {
logger.info("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(),
manifest.getStateUUID());
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true);
logger.debug("Downloaded full cluster state [{}]", clusterState);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId());
logger.debug("Downloaded full cluster state from diff [{}]", clusterState);
logger.info("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(),
manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID());
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId());
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
Expand All @@ -279,6 +284,18 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term
|| publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
}
PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest);
lastSeenClusterState.set(publishRequest.getAcceptedState());
return publishWithJoinResponse;
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemoteStateEnabled) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemoteStateEnabled);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterUUID);
}

@Override
public String toString() {
return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID
+ ", sourceNode=" + sourceNode + '}';
}

public String getClusterName() {
return clusterName;
}
Expand Down

0 comments on commit 148eca5

Please sign in to comment.