Skip to content

Commit

Permalink
Add tests and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Sep 2, 2024
1 parent b45ad8c commit e6bf8db
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,10 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT).get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ public interface PersistedState extends Closeable {
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getUploadStats();
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,14 +896,14 @@ public DiscoveryStats stats() {
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null) {
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
}
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getDownloadStats());
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getDownloadStats() {
return remoteClusterStateService.getDownloadStats();
public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
Expand Down Expand Up @@ -235,7 +239,8 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
boolean applyFullState = false;
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
Expand All @@ -248,7 +253,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
Expand All @@ -262,7 +266,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}

if (applyFullState == true) {
remoteClusterStateService.fullDownloadState();
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
Expand All @@ -282,7 +285,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
lastSeenClusterState.set(clusterState);
return response;
} else {
remoteClusterStateService.diffDownloadState();
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
Expand All @@ -303,9 +305,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
return response;
}
} catch (Exception e) {
remoteClusterStateService.readMetadataFailed();
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();
} else {
remoteClusterStateService.diffDownloadFailed();
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
// Note: These stats are not published yet, will come in future
return null;
}
Expand Down Expand Up @@ -745,7 +745,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return remoteClusterStateService.getUploadStats();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,8 +1364,8 @@ public ClusterState getClusterStateForManifest(
clusterState = ClusterState.builder(state).metadata(mb).build();
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDownloadSucceeded();
remoteStateStats.stateDownloadTook(durationMillis);
remoteStateStats.stateFullDownloadSucceeded();
remoteStateStats.stateFullDownloadTook(durationMillis);

return clusterState;
}
Expand Down Expand Up @@ -1456,8 +1456,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
.build();

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDownloadSucceeded();
remoteStateStats.stateDownloadTook(durationMillis);
remoteStateStats.stateDiffDownloadSucceeded();
remoteStateStats.stateDiffDownloadTook(durationMillis);

return clusterState;
}
Expand Down Expand Up @@ -1658,27 +1658,27 @@ public void writeMetadataFailed() {
remoteStateStats.stateUploadFailed();
}

public void readMetadataFailed() {
remoteStateStats.stateDownloadFailed();
public RemotePersistenceStats getRemoteStateStats() {
return remoteStateStats;
}

public void fullDownloadState() {
remoteStateStats.fullDownloadState();
public PersistedStateStats getUploadStats() {
return remoteStateStats.getUploadStats();
}

public void diffDownloadState() {
remoteStateStats.diffDownloadState();
public PersistedStateStats getFullDownloadStats() {
return remoteStateStats.getRemoteFullDownloadStats();
}

public RemotePersistenceStats getRemoteStateStats() {
return remoteStateStats;
public PersistedStateStats getDiffDownloadStats() {
return remoteStateStats.getRemoteDiffDownloadStats();
}

public PersistedStateStats getUploadStats() {
return remoteStateStats.getUploadStats();
public void fullDownloadFailed() {
remoteStateStats.stateFullDownloadFailed();
}

public PersistedStateStats getDownloadStats() {
return remoteStateStats.getDownloadStats();
public void diffDownloadFailed() {
remoteStateStats.stateDiffDownloadFailed();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
public class RemotePersistenceStats {

RemoteUploadStats remoteUploadStats;
RemoteDownloadStats remoteDownloadStats;
PersistedStateStats remoteDiffDownloadStats;
PersistedStateStats remoteFullDownloadStats;

final String FULL_DOWNLOAD_STATS = "remote_full_download";
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";

public RemotePersistenceStats() {
remoteUploadStats = new RemoteUploadStats();
remoteDownloadStats = new RemoteDownloadStats();
remoteDiffDownloadStats = new PersistedStateStats(DIFF_DOWNLOAD_STATS);
remoteFullDownloadStats = new PersistedStateStats(FULL_DOWNLOAD_STATS);
}

public void cleanUpAttemptFailed() {
Expand Down Expand Up @@ -61,32 +66,40 @@ public void stateUploadFailed() {
remoteUploadStats.stateFailed();
}

public void stateDownloadSucceeded() {
remoteDownloadStats.stateSucceeded();
public void stateFullDownloadSucceeded() {
remoteFullDownloadStats.stateSucceeded();
}

public void stateDownloadTook(long durationMillis) {
remoteDownloadStats.stateTook(durationMillis);
public void stateDiffDownloadSucceeded() {
remoteDiffDownloadStats.stateSucceeded();
}

public void stateDownloadFailed() {
remoteDownloadStats.stateFailed();
public void stateFullDownloadTook(long durationMillis) {
remoteFullDownloadStats.stateTook(durationMillis);
}

public PersistedStateStats getUploadStats() {
return remoteUploadStats;
public void stateDiffDownloadTook(long durationMillis) {
remoteDiffDownloadStats.stateTook(durationMillis);
}

public void stateFullDownloadFailed() {
remoteFullDownloadStats.stateFailed();
}

public PersistedStateStats getDownloadStats() {
return remoteDownloadStats;
public void stateDiffDownloadFailed() {
remoteDiffDownloadStats.stateFailed();
}

public PersistedStateStats getUploadStats() {
return remoteUploadStats;
}

public void diffDownloadState() {
remoteDownloadStats.diffDownloadState();
public PersistedStateStats getRemoteDiffDownloadStats() {
return remoteDiffDownloadStats;
}

public void fullDownloadState() {
remoteDownloadStats.fullDownloadState();
public PersistedStateStats getRemoteFullDownloadStats() {
return remoteFullDownloadStats;
}

}
Loading

0 comments on commit e6bf8db

Please sign in to comment.