Skip to content

Commit

Permalink
Add tests and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Aug 28, 2024
1 parent cc0871e commit d6bca95
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteDownloadStats.REMOTE_DOWNLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,6 +255,7 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_DOWNLOAD)) continue;
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,34 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ public interface PersistedState extends Closeable {
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getUploadStats();
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,10 +896,9 @@ public DiscoveryStats stats() {
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null) {
if (persistedStateRegistry.getPersistedState(stateType).getUploadStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getUploadStats());
}
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException {
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
Expand All @@ -260,7 +260,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}

if (applyFullState == true) {
remoteClusterStateService.fullDownloadState();
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
Expand All @@ -278,9 +277,9 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
remoteClusterStateService.fullDownloadState();
return response;
} else {
remoteClusterStateService.diffDownloadState();
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
Expand All @@ -298,12 +297,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
remoteClusterStateService.diffDownloadState();
return response;
}
} catch (Exception e) {
remoteClusterStateService.readMetadataFailed();
if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e);
throw new RuntimeException("Runtime exception in reading remote cluster state", e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
// Note: These stats are not published yet, will come in future
return null;
}
Expand Down Expand Up @@ -740,7 +740,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}

@Override
public PersistedStateStats getUploadStats() {
public PersistedStateStats getStats() {
return remoteClusterStateService.getUploadStats();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

import java.util.concurrent.atomic.AtomicLong;

/**
* Download stats for remote publication
*
* @opensearch.internal
*/
public class RemoteDownloadStats extends PersistedStateStats {
static final String REMOTE_DOWNLOAD = "remote_download";
static final String DIFF_DOWNLOAD = "diff_download";
Expand All @@ -33,4 +38,11 @@ public void fullDownloadState() {
fullDownloadCount.incrementAndGet();
}

public long getDiffDownloadCount() {
return diffDownloadCount.get();
}

public long getFullDownloadCount() {
return fullDownloadCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@

import java.util.concurrent.atomic.AtomicLong;

/**
* Upload stats for remote state
*
* @opensearch.internal
*/
public class RemoteUploadStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
Expand Down
Loading

0 comments on commit d6bca95

Please sign in to comment.